前言
https://kafka.apache.org/downloads
ZK配置
安装zookeeper
kafka依赖zookeeper,安装包内已内置 使用内置的可以跳过该步骤
也可自己单独下载
https://zookeeper.apache.org/releases.html#download
启动内置ZK
Windows
启动ZK
1 | %KAFKA_HOME%/bin/windows/zookeeper-server-start.bat %KAFKA_HOME%/config/zookeeper.properties |
进入
1 | %KAFKA_HOME%/bin/windows/zookeeper-shell.bat localhost |
CentOS
1 | $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties |
输入命令
1 | #查看zk的根目录kafka相关节点 |
常用命令
- 显示根目录下文件:
ls /
使用 ls 命令来查看当前 ZooKeeper 中所包含的内容 - 显示根目录下文件:
ls2 /
查看当前节点数据并能看到更新次数等数据 - 创建文件,并设置初始内容:
create /zk "test"
创建一个新的 znode节点“ zk ”以及与它关联的字符串 - 获取文件内容:
get /zk
确认 znode 是否包含我们所创建的字符串 - 修改文件内容:
set /zk "zkbak"
对 zk 所关联的字符串进行设置 - 删除文件:
delete /zk
将刚才创建的 znode 删除 - 退出客户端:
quit
- 帮助命令:
help
Kafka单机环境搭建
安装kafka
1 | wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz |
添加环境变量
CentOS
创建配置文件
1 | vi /etc/profile.d/kafka.sh |
加入:
1 | export KAFKA_HOME=/data/tools/bigdata/kafka_2.12-2.8.1 |
配置立即生效
1 | source /etc/profile |
查看ZK_HOME
1 | echo $KAFKA_HOME |
Win
名称 | 路径 |
---|---|
KAFKA_HOME | D:\Tools\bigdata\kafka_2.12-2.8.1 |
Path | %KAFKA_HOME%\bin |
修改配置
修改配置文件 config/server.properties
1 | #broker.id属性在kafka集群中必须要是唯一 |
服务器内访问
1 | listeners=PLAINTEXT://192.168.7.101:9092 |
内网访问
1 | listeners=PLAINTEXT://192.168.7.101:9092 |
在ZK中可以看到
路径/brokers/ids/0
中的值 和broker.id=0
对应。
查看
1 | get /brokers/ids/0 |
值
1 | {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.7.101:9092"],"jmx_port":-1,"features":{},"host":"192.168.7.101","timestamp":"1670583689047","port":9092,"version":5} |
外网访问
1 | listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT |
其中110.110.110.110:9092
是公网ip:端口
advertised_listeners
监听器会注册在 zookeeper 中;
当我们对 192.168.7.101:9092
请求建立连接,kafka 服务器会通过 zookeeper 中注册的监听器,找到 INTERNAL监听器,然后通过 listeners 中找到对应的 通讯 ip 和 端口 192.168.7.101:9092
;
同理:
当我们对 110.110.110.110:9092
请求建立连接,kafka 服务器会通过 zookeeper 中注册的监听器,找到 EXTERNAL 监听器,然后通过 listeners 中找到对应的 通讯 ip 和 端口 192.168.7.101:9093
;
总结:
advertised_listeners 是对外暴露的服务端口,kafka组件之间通讯用的是 listeners。
启动Kafka
Linux
启动脚本语法:
1 | kafka-server-start.sh [-daemon] server.properties |
可以看到,server.properties的配置路径是一个强制的参数,-daemon表示以后台进程运行,否则ssh客户端退出后,就会停止服务。
注意
在启动kafka时会使用linux主机名关联的ip地址,所以需要把主机名和linux的ip映射配置到本地host里
查看主机名称1
hostname
修改主机名称:
1 | vi /etc/hostname |
查看本机对应的IP
1 | hostname -i |
设置映射
1 | vi /etc/hosts |
添加
1 | 192.168.7.101 hadoop01 |
启动kafka,运行日志在logs目录的server.log文件里
1 | #后台启动,不会打印日志到控制台 |
或者用
1 | $KAFKA_HOME/bin/kafka-server-start.sh config/server.properties & |
停止kafka
1 | $KAFKA_HOME/bin/kafka-server-stop.sh |
查看服务是否可用
1 | lsof -i:9092 |
Win
启动kafka,运行日志在logs目录的server.log文件里
1 | %KAFKA_HOME%/bin/windows/kafka-server-start.bat %KAFKA_HOME%/config/server.properties |
停止kafka
1 | %KAFKA_HOME%/bin/windows/kafka-server-stop.bat |
命令测试消息
Linux
主题
创建主题
1 | $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test |
列出所有主题
1 | $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181 |
查看topic情况
1 | $KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test |
消费者的消费偏移量是消费者自己维护的,查看主题的消费偏移量
1 | $KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g1 |
消息
发送消息
1 | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test |
消费消息,默认是消费最新的消息
1 | $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test |
分区
创建多个分区主题
1 | $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test |
扩容分区
1 | $KAFKA_HOME/bin/kafka-topics.sh -alter --partitions 3 --zookeeper localhost:2181 --topic test |
Win
1、创建主题
1 | %KAFKA_HOME%/bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test |
2、列出所有主题
1 | %KAFKA_HOME%/bin/windows/kafka-topics.bat --list --zookeeper localhost:2181 |
3、发送消息
1 | %KAFKA_HOME%/bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic test |
4、消费消息,默认是消费最新的消息
1 | %KAFKA_HOME%/bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test |
5、创建多个分区主题
1 | %KAFKA_HOME%/bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test |
扩容分区
1 | %KAFKA_HOME%/bin/windows/kafka-topics.bat -alter --partitions 3 --zookeeper localhost:2181 --topic test |
6、查看topic情况
1 | %KAFKA_HOME%/bin/windows/kafka-topics.bat --describe --zookeeper localhost:2181 --topic test |
7、消费者的消费偏移量是消费者自己维护的,查看主题的消费偏移量
1 | %KAFKA_HOME%/bin/windows/kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group test |
Java测试
依赖
1 | <dependency> |
消息生产者
1 | package cn.psvmc.kafka; |
消息消费者
1 | package cn.psvmc.kafka; |
Kafka集群的搭建
集群服务器
zookeeper 1台:192.168.7.100
kafka 2台: 192.168.7.101
和 192.168.7.102
修改配置文件
在192.168.7.101服务器上面
1 | vi config/server.properties |
内容如下
1 | #broker.id属性在kafka集群中必须要是唯一 |
在192.168.7.102服务器上面
1 | vi config/server.properties |
内容如下
1 | #broker.id属性在kafka集群中必须要是唯一 |
分别启动两台kafka
1 | bin/kafka-server-start.sh -daemon config/server.properties |
测试
创建一个新的topic,副本数设置为3,分区数设置为2
1 | $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.7.100:2181 --replication-factor 2 --partitions 2 --topic z-topic |
查看topic信息
1 | $KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper 192.168.7.100:2181 --topic z-topic |
发送消息
1 | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.7.101:9092,192.168.7.102:9092 --topic z-topic |
消费消息
1 | $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 192.168.7.101:9092,192.168.7.102:9092 --from-beginning --topic z-topic |