首页
技术小册
AIGC
面试刷题
技术文章
MAGENTO
云计算
视频课程
源码下载
PDF书籍
「涨薪秘籍」
登录
注册
Kafka简介
Kafka工作流程
Kafka存储机制
Kafka分区机制
Kafka生产者与消费者
Kafka ack机制
安装Kafka
Producer消息缓存模型
指定副本作为Leader
Kafka的ISR伸缩机制
acks和mir配置详解
Kafka主动修改分区Leader
Kafka向Server发起请求
当前位置:
首页>>
技术小册>>
kafka入门到实战
小册名称:kafka入门到实战
一、安装JDK环境 1.1、下载jdk jdk下载链接: http://www.jdkdownload.com/ 1.2、上传jdk到linux 把下载好的jdk源码上传到/usr/local/java目录下 ![](/uploads/images/20230710/f3b0d3d9128f20fec05e82e8dd2952b1.png) 1.3、解压jdk 使用命令解压jdk的压缩包 ```bash tar -zxvf jdk-8u181-linux-x64.tar.gz ``` ![](/uploads/images/20230710/6c5dbe413d7d517a1fbc2a83d6b7570f.png) 1.4、配置环境变量 vim /etc/profile ```bash export JAVA_HOME=/usr/local/java/jdk1.8.0_181 #jdk安装目录 export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin export PATH=$PATH:${JAVA_PATH} ``` 把上面几行配置到文件中 ![](/uploads/images/20230710/f58522455c48caad5e737cffecbe4e53.png) 使用命令刷新文件 ```bash source /etc/profile ``` 1.5、测试jdk安装是否成功 使用命令 ```bash java -version ``` 出现版本即安装成功 ![](/uploads/images/20230710/9aa39cc5fa360d528d982f79502d7437.png) 二、安装Zookeper 2.1、下载安装包 zookeper下载链接: https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.5.9/ 2.2、创建安装目录 ```bash #创建安装目录 mkdir -p /opt/zookeeper #移动到目录 cd /opt/zookeepe ``` 上传zookeeper安装包到服务器 解压缩 ```bash tar -zxvf zookeeper-3.4.14.tar.gz ``` 2.3、修改配置 ```bash #移到配置目录 cd /opt/zookeeper/zookeeper-3.4.14/conf/ #复制配置文件 cp zoo_sample.cfg zoo.cfg #修改及添加以下配置 tickTime=2000 initLimit=10 syncLimit=5 dataDir=/opt/zookeeper/zoodata dataLogDir=/opt/zookeeper/zoodatalog clientPort=2181 server.0=127.0.0.1:2888:3888 #多节点 集群 #server.1=127.0.0.1:4888:5888 #server.2=127.0.0.1:5888:6888 ``` #配置说明 - tickTime:客户端会话超时时间,默认2000毫秒。 - initLimit:配置客户端初始化可接受多少个心跳监测,默认10,即10*tickTime(默认2000),表示20s没有连接上集群的配置则连接失败。 - syncLimit:配置Leader和follwer之间,允许多少个请求应答长度,默认5,即5*tickTime(默认2000),表示默认10sLeader和Follwer之间如果消息5次没有发送成功就不尝试了。 - dataDir:配置存储快照文件的目录。 - dataLogDir:配置事务日志存储的目录。 - clientPort:服务默认端口,默认2181。 - server.X=A:B:C 其中X是一个数字,表示这是第几号server,A是该server所在的IP地址,B配置该server和集群中的leader交换消息所使用的端口,C配置选举leader时所使用的端口。 2.4、创建节点的myid: ```bash #创建dataDir目录 mkdir -p /opt/zookeeper/zoodata #移动到目录 cd /opt/zookeeper/zoodata #把节点号写入myid文件(各个节点分别配置) echo 0 > myid #配置端口防火墙(各个节点分别配置) firewall-cmd --zone=public --add-port=2181/tcp --permanent firewall-cmd --reload ``` 2.5、启动ZooKeeper ```bash #移到执行目录 cd /opt/zookeeper/zookeeper-3.4.14/bin/ #启动服务 ./zkServer.sh start ``` ```bash #重启 ./zkServer.sh restart #关闭 ./zkServer.sh stop #查看状态 ./zkServer.sh staus #启动的时候,查看后台信息 ./zkServer.sh start-foreground & ``` 客户端链接 ```bash ./zkCli.sh ``` ![](/uploads/images/20230710/fa07ccfbc056e83a505a6b0d07ba849a.png) 基本操作 ```bash #创建节点 create /test test1 #获取节点数据 get /test #更新节点 set /test test2 #删除节点 delete /test #递归删除数据,将子目录的数据也删除掉 rmr /test #查看节点 ls / #查看输入过的命令 history ``` 三、安装Kafka 3.1、下载kafka kafka下载链接: https://kafka.apache.org/downloads 3.2、创建工作目录&启动 ```bash #创建目录 mkdir /opt/kafka cd /opt/kafka ``` 上传kafka的压缩包到服务器。 解压并修改名称: ```bash tar -zxvf kafka_2.11-1.0.0.tgz mv kafka_2.11-1.0.0 kafka_2.11 ``` ```bash cd kafka_2.11 #创个日志文件夹 mkdir logs #修改配置文件 cd /opt/kafka/kafka_2.11/config vim server.properties # broker.id属性在kafka集群中必须要是唯一 broker.id=0 # kafka部署的机器ip和提供服务的端口号,切勿设0.0.0.0可能报错 listeners=PLAINTEXT://192.xxx.xx.xx:9092 # kafka的消息存储文件 log.dir=/usr/local/data/kafka‐logs # kafka 连接 zookeeper 的地址 zookeeper.connect=192.xxx.xx.xx:2181 # 启动服务 , 运行的日志打印在 logs 目录里的server.log 里 # 后台启动,不会打印日志到控制台 1:bin/kafka‐server‐start.sh ‐daemon config/server.properties 2:bin/kafka-server-start.sh config/server.properties & # 启动成功后,可以进入zookeeper 查看kafka节点 bin/zk.Cli.sh ls / # 停止kafka bin/kafka‐server‐stop.sh ``` 3.3、开启远程链接 ```bash vim config/server.properties # 把下述配置注释 listeners=PLAINTEXT://:127.0.0.1:9092 # 配置下述内容 advertised.listeners=PLAINTEXT://127.0.0.1:9092 ``` 3.4、创建和查看topic ```bash //创建 ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1 //查看 ./bin/kafka-topics.sh --list --zookeeper localhost:2181 ``` 用Kafka的console-producer在topic test1 生产消息 新开一个终端窗口: ```bash ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1 ``` ![](/uploads/images/20230710/3b5543d3686561baf3e8cfa56a12be80.png) 用Kafka的console-consumer 消费topic test1的消息 新开一个shell窗口 ```bash ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --from-beginning ``` ![](/uploads/images/20230710/c2e0108bd18e1f16e0b18e68e7511b23.png) 至此,我们的Kafka安装完成。
上一篇:
Kafka ack机制
下一篇:
Producer消息缓存模型
该分类下的相关小册推荐:
Kafka核心源码解读
Kafka 原理与源码精讲
消息队列入门与进阶
Kafka面试指南
Kafka核心技术与实战