大数据环境搭建-Kafka及Zookeeper

前言

https://kafka.apache.org/downloads

ZK配置

安装zookeeper

kafka依赖zookeeper,安装包内已内置 使用内置的可以跳过该步骤

也可自己单独下载

https://zookeeper.apache.org/releases.html#download

启动内置ZK

Windows

启动ZK

1
%KAFKA_HOME%/bin/windows/zookeeper-server-start.bat %KAFKA_HOME%/config/zookeeper.properties

进入

1
%KAFKA_HOME%/bin/windows/zookeeper-shell.bat localhost

CentOS

1
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties

输入命令

1
2
3
4
5
#查看zk的根目录kafka相关节点
ls /
#查看kafka节点
ls /brokers
ls /brokers/topics

常用命令

  1. 显示根目录下文件: ls / 使用 ls 命令来查看当前 ZooKeeper 中所包含的内容
  2. 显示根目录下文件: ls2 / 查看当前节点数据并能看到更新次数等数据
  3. 创建文件,并设置初始内容:create /zk "test" 创建一个新的 znode节点“ zk ”以及与它关联的字符串
  4. 获取文件内容: get /zk 确认 znode 是否包含我们所创建的字符串
  5. 修改文件内容: set /zk "zkbak" 对 zk 所关联的字符串进行设置
  6. 删除文件: delete /zk 将刚才创建的 znode 删除
  7. 退出客户端: quit
  8. 帮助命令:help

Kafka单机环境搭建

安装kafka

1
2
3
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -xzf kafka_2.12-2.8.1.tgz -C /data/tools/bigdata/
cd /data/tools/bigdata/kafka_2.12-2.8.1

添加环境变量

CentOS

创建配置文件

1
vi /etc/profile.d/kafka.sh

加入:

1
2
export KAFKA_HOME=/data/tools/bigdata/kafka_2.12-2.8.1 
export PATH=$KAFKA_HOME/bin:$PATH

配置立即生效

1
source /etc/profile

查看ZK_HOME

1
echo $KAFKA_HOME

Win

名称 路径
KAFKA_HOME D:\Tools\bigdata\kafka_2.12-2.8.1
Path %KAFKA_HOME%\bin

修改配置

修改配置文件 config/server.properties

1
2
3
4
5
6
7
8
#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://:9092
#kafka的消息存储文件
log.dir=/kafka/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=localhost:2181

服务器内访问

1
listeners=PLAINTEXT://192.168.7.101:9092

内网访问

1
2
listeners=PLAINTEXT://192.168.7.101:9092
advertised.listeners=PLAINTEXT://192.168.7.101:9092

在ZK中可以看到

路径/brokers/ids/0中的值 和broker.id=0对应。

查看

1
get /brokers/ids/0

1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.7.101:9092"],"jmx_port":-1,"features":{},"host":"192.168.7.101","timestamp":"1670583689047","port":9092,"version":5}

外网访问

1
2
3
4
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
listeners=INTERNAL://192.168.7.101:9092,EXTERNAL://192.168.7.101:9093
advertised.listeners=INTERNAL://192.168.7.101:9092,EXTERNAL://110.110.110.110:9092
inter.broker.listener.name=INTERNAL

其中110.110.110.110:9092公网ip:端口

advertised_listeners 监听器会注册在 zookeeper 中;

当我们对 192.168.7.101:9092 请求建立连接,kafka 服务器会通过 zookeeper 中注册的监听器,找到 INTERNAL监听器,然后通过 listeners 中找到对应的 通讯 ip 和 端口 192.168.7.101:9092

同理:

当我们对 110.110.110.110:9092 请求建立连接,kafka 服务器会通过 zookeeper 中注册的监听器,找到 EXTERNAL 监听器,然后通过 listeners 中找到对应的 通讯 ip 和 端口 192.168.7.101:9093

总结:

advertised_listeners 是对外暴露的服务端口,kafka组件之间通讯用的是 listeners。

启动Kafka

Linux

启动脚本语法:

1
kafka-server-start.sh [-daemon] server.properties

可以看到,server.properties的配置路径是一个强制的参数,-daemon表示以后台进程运行,否则ssh客户端退出后,就会停止服务。
注意

在启动kafka时会使用linux主机名关联的ip地址,所以需要把主机名和linux的ip映射配置到本地host里

查看主机名称

1
hostname

修改主机名称:

1
vi /etc/hostname

查看本机对应的IP

1
hostname -i

设置映射

1
vi /etc/hosts

添加

1
192.168.7.101 hadoop01

启动kafka,运行日志在logs目录的server.log文件里

1
2
#后台启动,不会打印日志到控制台
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

或者用

1
$KAFKA_HOME/bin/kafka-server-start.sh config/server.properties &

停止kafka

1
$KAFKA_HOME/bin/kafka-server-stop.sh

查看服务是否可用

1
2
3
lsof -i:9092

netstat -tunlp|grep :9092

Win

启动kafka,运行日志在logs目录的server.log文件里

1
%KAFKA_HOME%/bin/windows/kafka-server-start.bat %KAFKA_HOME%/config/server.properties

停止kafka

1
%KAFKA_HOME%/bin/windows/kafka-server-stop.bat

命令测试消息

Linux

主题

创建主题

1
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

列出所有主题

1
$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181

查看topic情况

1
$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

消费者的消费偏移量是消费者自己维护的,查看主题的消费偏移量

1
$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g1

消息

发送消息

1
2
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.7.101:9092 --topic test

消费消息,默认是消费最新的消息

1
2
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 192.168.7.101:9092 --topic test

分区

创建多个分区主题

1
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test

扩容分区

1
$KAFKA_HOME/bin/kafka-topics.sh -alter --partitions 3 --zookeeper localhost:2181 --topic test

Win

1、创建主题

1
%KAFKA_HOME%/bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2、列出所有主题

1
%KAFKA_HOME%/bin/windows/kafka-topics.bat --list --zookeeper localhost:2181

3、发送消息

1
%KAFKA_HOME%/bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic test

4、消费消息,默认是消费最新的消息

1
%KAFKA_HOME%/bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

5、创建多个分区主题

1
%KAFKA_HOME%/bin/windows/kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test

扩容分区

1
%KAFKA_HOME%/bin/windows/kafka-topics.bat -alter --partitions 3 --zookeeper localhost:2181 --topic test

6、查看topic情况

1
%KAFKA_HOME%/bin/windows/kafka-topics.bat --describe --zookeeper localhost:2181 --topic test

7、消费者的消费偏移量是消费者自己维护的,查看主题的消费偏移量

1
%KAFKA_HOME%/bin/windows/kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group test

Java测试

依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>

消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package cn.psvmc.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ZKfProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.7.101:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG,"-1");
Producer<String, String> producer = new KafkaProducer<>(props);
int num = 0;
while (num < 100) {
ProducerRecord<String, String> record = new ProducerRecord<>("test", "numtest","Num:" + num);
producer.send(record);
System.out.println("发送:" + num);
num += 1;
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
}
producer.close();
}
}

消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package cn.psvmc.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class ZKfConsumer {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.7.101:9092");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//earliest 从最早的开始(不记录提交点)
//latest 从最新的开始(记录提交点)
//none 报错
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
//消费者订阅
consumer.subscribe(Collections.singleton("test"));
//一个消费者组G1里只有一个消费者
while (true){
ConsumerRecords<String, String> poll = consumer.poll(100);
for (ConsumerRecord<String,String> record: poll) {
System.out.println(record.offset() + "\t" + record.key() + "\t" + record.value());
}
}
}
}

Kafka集群的搭建

集群服务器

zookeeper 1台:192.168.7.100
kafka 2台: 192.168.7.101192.168.7.102

修改配置文件

在192.168.7.101服务器上面

1
vi config/server.properties

内容如下

1
2
3
4
5
6
7
#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.7.101:9092
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=192.168.7.100:2181

在192.168.7.102服务器上面

1
vi config/server.properties

内容如下

1
2
3
4
5
6
7
#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.7.102:9092
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=192.168.7.100:2181

分别启动两台kafka

1
2
3
bin/kafka-server-start.sh -daemon config/server.properties

bin/kafka-server-start.sh -daemon config/server.properties

测试

创建一个新的topic,副本数设置为3,分区数设置为2

1
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.7.100:2181 --replication-factor 2 --partitions 2 --topic z-topic

查看topic信息

1
$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper 192.168.7.100:2181 --topic z-topic

发送消息

1
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.7.101:9092,192.168.7.102:9092 --topic z-topic

消费消息

1
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 192.168.7.101:9092,192.168.7.102:9092 --from-beginning --topic z-topic