kafka安装文档
- Notes
- October 24, 2023
目录
- 关闭防火墙
systemctl stop firewalld
- 需安装Zookeeper组件具体要求同Zookeeper任务要求,并与Kafka环境适配,启动Zookeeper并截图保存结果:
- 在master,slave1,slave2分别启动zookeeper
zkServer.sh start
- 查询3台机器的zookeeper启动状态
zkServer.sh status
- zookeeper 配置文件
/opt/software/zookeeper/conf/zoo.cfg
,数据文件夹设置为/opt/software/zookeeper/zkdata
,zoo.cfg
需要和 kafka 的zookeeper.properties
适配
vi /opt/module/zookeeper/conf/zoo.cfg
需要查看的配置内容:
dataDir=/opt/module/zookeeper/zkdata
clientPort=2181
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888
- 解压kafka安装包到
/opt/module
路径,并修改解压后文件夹名为 kafka ,截图并保存结果:
tar -xvzf /opt/software/kafka1.0.0.tar.gz -C /opt/module
mv /opt/module/kafka_2.11-1.0.0 /opt/module/kafka
- 设置kafka环境变量, 并使环境变量只对当前root用户生效,截图并保存结果:
vi /root/.bash_profile
添加以下内容:
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
刷新生效:
source /root/.bash_profile
- 修改kafka相应文件, 截图并保存结果:
- 在 master 上修改文件zookeeper.properties
vi /opt/module/kafka/config/zookeeper.properties
配置文件内容:
clientPort=2181
dataDir=/opt/module/kafka/data
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888
在kafka安装文件夹中建立 data 文件夹,与 dataDir=/opt/module/kafka/data
对应
mkdir /opt/module/kafka/data
vi /opt/module/kafka/data/myid
- 在 master 上修改
server.properties
vi /opt/module/kafka/config/server.properties
修改以下内容:
broker.id=0
listeners=PLAINTEXT://192.168.152.240:9092
advertised.listeners=PLAINTEXT://192.168.152.240:9092
log.dirs=/usr/local/src/kafka/data
zookeeper.connect=192.168.152.240:2181,192.168.152.241:2181,192.168.152.242:2181
文件分发:
scp -r /opt/module/kafka slave1:/opt/module
scp -r /opt/module/kafka slave2:/opt/module
scp /root/.bash_profile slave1:/root
scp /root/.bash_profile slave2:/root
在其他节点刷新变量文件生效:
source /root/.bash_profile
- 在 slave1 上修改
myid
文件为 2
cat>/opt/module/kafka/data/myid<<EOF
2
EOF
修改 server.properties
文件为以下内容:
broker.id=1
listeners=PLAINTEXT://192.168.152.241:9092
advertised.listeners=PLAINTEXT://192.168.152.241:9092
log.dirs=/usr/local/src/kafka/data zookeeper.connect=192.168.152.240:2181,192.168.152.241:2181,192.168.152.242:2181
在 slave2 上修改 myid
文件为 3
cat>/opt/module/kafka/data/myid<<EOF
3
EOF
修改 server.properties
文件为以下内容:
broker.id=1
listeners=PLAINTEXT://192.168.152.242:9092
advertised.listeners=PLAINTEXT://192.168.152.242:9092
log.dirs=/usr/local/src/kafka/data zookeeper.connect=192.168.152.240:2181,192.168.152.241:2181,192.168.152.242:2181
- 启动 kafka 并保存命令输出结果, 截图并保存结果:
分别在master, slave1, slave2 上执行
kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
jps
输出以下内容为正常启动:
2198 QuorumPeerMain
6732 Jps
3758 Kafka
- 创建指定topic, 截图并保存结果:
kafka-topics.sh --zookeeper master01:2181,slave01:2181,slave02:2181 --create --partitions 3 --replication-factor 3 --topic test
输出以下内容为正常启动:
Created topic "test-topic".
- 查看所有topic信息, 并截图保存结果:
kafka-topics.sh --list --zookeeper master01:2181
输出:
test
kafka-topics.sh -zookeeper master01:2181,slave01:2181,slave02:2181 --describe --topic test
输出:
Topic:test-topic PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,0
Topic: test-topic Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: test-topic Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1
- 启动指定生产者(producer), 并截图保存结果:
注意: 控制台发送者的broker-list的ip需要与server.properties中listeners处的一致, 控制台消费者就能接收到消息
kafka-console-producer.sh --broker-list 192.168.152.240:9092 --topic test
- 启动消费者(sonsumer), 并截图保存结果:
再开启一个master连接, 输入:
kafka-console-consumer.sh --bootstrap-server 192.168.152.240:9092 --topic test
注: kafka旧版本连接服务器的参数为 --zookeeper
, 新版为 --bootstrap-server
输出:
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]
- 测试生产者(producer), 并截图保存结果:
kafka-console-producer.sh --broker-list 192.168.152.240:9092 --topic test
[root@master01 ~]# kafka-console-producer.sh --broker-list 192.168.152.240:9092 --topic test
>111
>222
>333
>444
>555
>666
>777
>888
- 测试消费者(consumer), 并截图保存结果
消费者上显示:
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
111
222
333
444
555
666
777
888
基于命令行方式使用kafka
1. 三台机器上启动zookeeper
zkServer.sh start
2. 三台机器上启动kafka服务
kafka-server-start.sh -daemon /usr/local/src/kafka/config/server.properties
3. 查看进程
jps
4. 创建主题
kafka-topics.sh --create \
--topic itcasttopic \
--partitions 3 \
--replication-factor 2 \
--zookeeper master01:2181,slave01:2181,slave02:2181
5. 创建生产者,来生产消息。在master01上输入
kafka-console-producer.sh \
--broker-list master01:9092,slave01:9092,slave02:9092 \
--topic itcasttopic
6. 创建消费者,来消费消息。在slave01上输入
kafka-console-consumer.sh \
--from-beginning \
--topic itcasttopic
测试消费者(consumer)(kafka2.11-2.0.0命令有所不同)
kafka-console-consumer.sh \
--from-beginning \
--topic test \
--bootstrap-server 192.168.152.245:9092