前言
https://kafka.apache.org/downloads
ZK配置
安装zookeeper
kafka依赖zookeeper,安装包内已内置 使用内置的可以跳过该步骤
也可自己单独下载
https://zookeeper.apache.org/releases.html#download
启动内置ZK
Windows
启动ZK
1  | %KAFKA_HOME%/bin/windows/zookeeper-server-start.bat %KAFKA_HOME%/config/zookeeper.properties  | 
进入
1  | %KAFKA_HOME%/bin/windows/zookeeper-shell.bat localhost  | 
CentOS
1  | $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties  | 
输入命令
1  | #查看zk的根目录kafka相关节点  | 
常用命令
- 显示根目录下文件: 
ls /使用 ls 命令来查看当前 ZooKeeper 中所包含的内容 - 显示根目录下文件: 
ls2 /查看当前节点数据并能看到更新次数等数据 - 创建文件,并设置初始内容:
create /zk "test"创建一个新的 znode节点“ zk ”以及与它关联的字符串 - 获取文件内容: 
get /zk确认 znode 是否包含我们所创建的字符串 - 修改文件内容: 
set /zk "zkbak"对 zk 所关联的字符串进行设置 - 删除文件: 
delete /zk将刚才创建的 znode 删除 - 退出客户端: 
quit - 帮助命令:
help 
Kafka单机环境搭建
安装kafka
1  | wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz  | 
添加环境变量
CentOS
创建配置文件
1  | vi /etc/profile.d/kafka.sh  | 
加入:
1  | export KAFKA_HOME=/data/tools/bigdata/kafka_2.12-2.8.1  | 
配置立即生效
1  | source /etc/profile  | 
查看ZK_HOME
1  | echo $KAFKA_HOME  | 
Win
| 名称 | 路径 | 
|---|---|
| KAFKA_HOME | D:\Tools\bigdata\kafka_2.12-2.8.1 | 
| Path | %KAFKA_HOME%\bin | 
修改配置
修改配置文件 config/server.properties
1  | #broker.id属性在kafka集群中必须要是唯一  | 
服务器内访问
1  | listeners=PLAINTEXT://192.168.7.101:9092  | 
内网访问
1  | listeners=PLAINTEXT://192.168.7.101:9092  | 
在ZK中可以看到
路径/brokers/ids/0中的值 和broker.id=0对应。
查看
1  | get /brokers/ids/0  | 
值
1  | {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.7.101:9092"],"jmx_port":-1,"features":{},"host":"192.168.7.101","timestamp":"1670583689047","port":9092,"version":5}  | 
外网访问
1  | listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT  | 
其中110.110.110.110:9092是公网ip:端口
advertised_listeners 监听器会注册在 zookeeper 中;
当我们对 192.168.7.101:9092 请求建立连接,kafka 服务器会通过 zookeeper 中注册的监听器,找到 INTERNAL监听器,然后通过 listeners 中找到对应的 通讯 ip 和 端口 192.168.7.101:9092;
同理:
当我们对 110.110.110.110:9092 请求建立连接,kafka 服务器会通过 zookeeper 中注册的监听器,找到 EXTERNAL 监听器,然后通过 listeners 中找到对应的 通讯 ip 和 端口 192.168.7.101:9093;
总结:
advertised_listeners 是对外暴露的服务端口,kafka组件之间通讯用的是 listeners。
启动Kafka
Linux
启动脚本语法:
1  | kafka-server-start.sh [-daemon] server.properties  | 
可以看到,server.properties的配置路径是一个强制的参数,-daemon表示以后台进程运行,否则ssh客户端退出后,就会停止服务。
注意
在启动kafka时会使用linux主机名关联的ip地址,所以需要把主机名和linux的ip映射配置到本地host里
查看主机名称1
hostname
修改主机名称:
1  | vi /etc/hostname  | 
查看本机对应的IP
1  | hostname -i  | 
设置映射
1  | vi /etc/hosts  | 
添加
1  | 192.168.7.101 hadoop01  | 
启动kafka,运行日志在logs目录的server.log文件里
1  | #后台启动,不会打印日志到控制台  | 
或者用
1  | $KAFKA_HOME/bin/kafka-server-start.sh config/server.properties &  | 
停止kafka
1  | $KAFKA_HOME/bin/kafka-server-stop.sh  | 
查看服务是否可用
1  | lsof -i:9092  | 
Win
启动kafka,运行日志在logs目录的server.log文件里
1  | %KAFKA_HOME%/bin/windows/kafka-server-start.bat %KAFKA_HOME%/config/server.properties  | 
停止kafka
1  | %KAFKA_HOME%/bin/windows/kafka-server-stop.bat  | 
命令测试消息
Linux
主题
创建主题
1  | $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test  | 
列出所有主题
1  | $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181  | 
查看topic情况
1  | $KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test  | 
消费者的消费偏移量是消费者自己维护的,查看主题的消费偏移量
1  | $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g1  | 
消息
发送消息
1  | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  | 
消费消息,默认是消费最新的消息
1  | $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test  | 
分区
创建多个分区主题
1  | $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test  | 
扩容分区
1  | $KAFKA_HOME/bin/kafka-topics.sh -alter --partitions 3 --zookeeper localhost:2181 --topic test  | 
Win
1、创建主题
1  | %KAFKA_HOME%/bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test  | 
2、列出所有主题
1  | %KAFKA_HOME%/bin/windows/kafka-topics.bat --list --zookeeper localhost:2181  | 
3、发送消息
1  | %KAFKA_HOME%/bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic test  | 
4、消费消息,默认是消费最新的消息
1  | %KAFKA_HOME%/bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test  | 
5、创建多个分区主题
1  | %KAFKA_HOME%/bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test  | 
扩容分区
1  | %KAFKA_HOME%/bin/windows/kafka-topics.bat -alter --partitions 3 --zookeeper localhost:2181 --topic test  | 
6、查看topic情况
1  | %KAFKA_HOME%/bin/windows/kafka-topics.bat --describe --zookeeper localhost:2181 --topic test  | 
7、消费者的消费偏移量是消费者自己维护的,查看主题的消费偏移量
1  | %KAFKA_HOME%/bin/windows/kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group test  | 
Java测试
依赖
1  | <dependency>  | 
消息生产者
1  | package cn.psvmc.kafka;  | 
消息消费者
1  | package cn.psvmc.kafka;  | 
Kafka集群的搭建
集群服务器
zookeeper 1台:192.168.7.100
kafka 2台: 192.168.7.101 和 192.168.7.102
修改配置文件
在192.168.7.101服务器上面
1  | vi config/server.properties  | 
内容如下
1  | #broker.id属性在kafka集群中必须要是唯一  | 
在192.168.7.102服务器上面
1  | vi config/server.properties  | 
内容如下
1  | #broker.id属性在kafka集群中必须要是唯一  | 
分别启动两台kafka
1  | bin/kafka-server-start.sh -daemon config/server.properties  | 
测试
创建一个新的topic,副本数设置为3,分区数设置为2
1  | $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.7.100:2181 --replication-factor 2 --partitions 2 --topic z-topic  | 
查看topic信息
1  | $KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper 192.168.7.100:2181 --topic z-topic  | 
发送消息
1  | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.7.101:9092,192.168.7.102:9092 --topic z-topic  | 
消费消息
1  | $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 192.168.7.101:9092,192.168.7.102:9092 --from-beginning --topic z-topic  |