大数据环境搭建-Kafka及Zookeeper

前言

https://kafka.apache.org/downloads

单机环境搭建

安装zookeeper

kafka依赖zookeeper,安装包内已内置 使用内置的可以跳过该步骤

也可自己单独下载

https://zookeeper.apache.org/releases.html#download

安装kafka

1
2
3
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -xzf kafka_2.12-2.8.1.tgz
cd kafka_2.12-2.8.1

添加环境变量

名称 路径
KAFKA_HOME D:\Tools\bigdata\kafka_2.12-2.8.1
Path %KAFKA_HOME%\bin

启动ZK

启动ZK

1
%KAFKA_HOME%/bin/windows/zookeeper-server-start.bat %KAFKA_HOME%/config/zookeeper.properties

进入

1
%KAFKA_HOME%/bin/windows/zookeeper-shell.bat localhost

输入命令

1
2
3
4
5
#查看zk的根目录kafka相关节点
ls /
#查看kafka节点
ls /brokers
ls /brokers/topics

常用命令

  1. 显示根目录下文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容
  2. 显示根目录下文件: ls2 / 查看当前节点数据并能看到更新次数等数据
  3. 创建文件,并设置初始内容:create /zk "test" 创建一个新的 znode节点“ zk ”以及与它关联的字符串
  4. 获取文件内容: get /zk 确认 znode 是否包含我们所创建的字符串
  5. 修改文件内容: set /zk "zkbak" 对 zk 所关联的字符串进行设置
  6. 删除文件: delete /zk 将刚才创建的 znode 删除
  7. 退出客户端: quit
  8. 帮助命令:help

修改配置

修改配置文件 config/server.properties

1
2
3
4
5
6
7
8
#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://localhost:9092
#kafka的消息存储文件
log.dir=/kafka/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=localhost:2181

启动Kafka

Linux

启动脚本语法:

1
kafka-server-start.sh [-daemon] server.properties

可以看到,server.properties的配置路径是一个强制的参数,-daemon表示以后台进程运行,否则ssh客户端退出后,就会停止服务。
(注意,在启动kafka时会使用linux主机名关联的ip地址,所以需要把主机名和linux的ip映射配置到本地host里,用vim /etc/hosts)

查看主机名称
hostname 查看主机名
hostname -i:查看本机对应的IP
修改主机名称:

1
vi /etc/hostname

启动kafka,运行日志在logs目录的server.log文件里

1
bin/kafka-server-start.sh -daemon config/server.properties  #后台启动,不会打印日志到控制台

或者用

1
bin/kafka-server-start.sh config/server.properties &

停止kafka

1
bin/kafka-server-stop.sh

Win

启动kafka,运行日志在logs目录的server.log文件里

1
%KAFKA_HOME%/bin/windows/kafka-server-start.bat %KAFKA_HOME%/config/server.properties

停止kafka

1
%KAFKA_HOME%/bin/windows/kafka-server-stop.bat

命令测试消息

Linux

1、创建主题

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2、列出所有主题

1
bin/kafka-topics.sh --list --zookeeper localhost:2181

3、发送消息

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

4、消费消息,默认是消费最新的消息

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

5、创建多个分区主题

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test1

扩容分区

1
bin/kafka-topics.sh -alter --partitions 3 --zookeeper localhost:2181 --topic test1

6、查看topic情况

1
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

7、消费者的消费偏移量是消费者自己维护的,查看主题的消费偏移量

1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test

Win

1、创建主题

1
%KAFKA_HOME%/bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2、列出所有主题

1
%KAFKA_HOME%/bin/windows/kafka-topics.bat --list --zookeeper localhost:2181

3、发送消息

1
%KAFKA_HOME%/bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic test

4、消费消息,默认是消费最新的消息

1
%KAFKA_HOME%/bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

5、创建多个分区主题

1
%KAFKA_HOME%/bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test1

扩容分区

1
%KAFKA_HOME%/bin/windows/kafka-topics.bat -alter --partitions 3 --zookeeper localhost:2181 --topic test1

6、查看topic情况

1
%KAFKA_HOME%/bin/windows/kafka-topics.bat --describe --zookeeper localhost:2181 --topic test

7、消费者的消费偏移量是消费者自己维护的,查看主题的消费偏移量

1
%KAFKA_HOME%/bin/windows/kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group test

集群的搭建

集群服务器

zookeeper 1台:192.168.10.10
kafka 2台: 192.168.10.11192.168.10.12

修改配置文件

在192.168.10.11服务器上面

1
vi config/server.properties

内容如下

1
2
3
4
5
6
7
#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.10.11:9093
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=192.168.10.10:2181

在192.168.10.12服务器上面

1
vi config/server.properties

内容如下

1
2
3
4
5
6
7
#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.10.12:9093
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=192.168.10.10:2181

分别启动两台kafka

1
2
3
bin/kafka-server-start.sh -daemon config/server.properties

bin/kafka-server-start.sh -daemon config/server.properties

测试

创建一个新的topic,副本数设置为3,分区数设置为2

1
bin/kafka-topics.sh --create --zookeeper 192.168.10.10:2181 --replication-factor 2 --partitions 2 --topic my-topic

查看topic信息

1
bin/kafka-topics.sh --describe --zookeeper 192.168.10.10:2181 --topic my-topic

向my-topic主题发送消息

1
bin/kafka-console-producer.sh --broker-list 192.168.10.11:9092,192.168.10.12:9092 --topic my-topic

消费消息

1
bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.11:9092,192.168.10.12:9092 --from-beginning --topic my-topic