- A+
所属分类:Kafka
Apache Kafka是一个开源流处理平台,由LinkedIn公司开发,并于2011年成为Apache软件基金会的一部分。Kafka被设计为高吞吐量、可扩展、可持久化的实时数据流处理系统。
Kafka的主要特性
高吞吐量:能够处理大量数据;
可扩展性:可以水平扩展;
持久性:数据可以持久化到磁盘;
容错性:即使节点失败,也能保证数据不丢失。Kafka组件
Producer:生产者,发布消息到Kafka主题;
Consumer:消费者,订阅主题并处理消息;
Broker:Kafka服务器,存储数据;
Zookeeper:协调Kafka集群。优势
高性能: 通过分区和并行处理实现高吞吐量;
灵活的数据存储: 支持多种数据格式和自定义序列化;
强大的社区支持: 广泛的用户和开发者社区。Kafka的主要应用场景简化为:实时数据流处理,日志聚合,消息队列,微服务通信,物联网数据处理。
JDK安装
wget https://repo.huaweicloud.com/java/jdk/8u202-b08/jdk-8u202-linux-x64.tar.gz
tar xvf jdk-8u202-linux-x64.tar.gz -C /usr/local/
ln -s /usr/local/jdk1.8.0_202/ /usr/local/jdk
vim /etc/profile.d/jdk.sh
export JAVA_HOME=/usr/local/jdk
export JRE_HOME=/usr/local/jdk/jre
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
source /etc/profile //重新加载环境变量
java -version
# java version "1.8.0_202"
# Java(TM) SE Runtime Environment (build 1.8.0_202-b08)
# Java HotSpot(TM) 64-Bit Server VM (build 25.202-b08, mixed mode)
Zookeeper安装
wget https://mirrors.huaweicloud.com/apache/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz tar zxvf apache-zookeeper-3.5.7-bin.tar.gz mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7 ln -s /usr/local/zookeeper-3.5.7/ /usr/local/zookeeper cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
修改配置文件zoo.cfg
vim /usr/local/zookeeper/conf/zoo.cfg //需要修改的内容
dataDir=/usr/local/zookeeper/data //指定ZooKeeper存放数据的目录
dataLogDir=/usr/local/zookeeper/logs //存放日志目录
clientPort=2181 #客户端连接ZooKeeper服务器的端口,默认为2181
mkdir -p /usr/local/zookeeper/data //创建数据目录
mkdir -p /usr/local/zookeeper/logs //创建日志目录
Zookeeper开机自启文件
vim /usr/lib/systemd/system/zookeeper.service
[Unit]
Description=ZooKeeper Service
After=network.target
[Service]
Type=forking
Environment=JAVA_HOME=/usr/local/jdk
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
Restart=always
[Install]
WantedBy=default.target
重新加载 systemd
管理器的主配置文件并启动
systemctl daemon-reload
systemctl enable --now zookeeper.service
Zookeeper配置环境变量
vim /etc/profile.d/zookeeper.sh
export ZK_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZK_HOME/bin
source /etc/profile //重新加载环境变量

Kafka安装
wget https://mirrors.huaweicloud.com/apache/kafka/3.8.0/kafka_2.12-3.8.0.tgz
tar zxvf kafka_2.12-3.8.0.tgz -C /usr/local/
ln -s /usr/local/kafka_2.12-3.8.0/ /usr/local/kafka
cp -a /usr/local/kafka/config/server.properties /usr/local/kafka/config/server.properties-bak
//备份配置文件,防止改错
修改配置文件server.properties
vim /usr/local/kafka/config/server.properties //需要修改的内容
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0 //broker的唯一标识符
# The port the socket server listens on
listeners=PLAINTEXT://:9092 //broker监听的协议、地址和端口
# A comma-separated list of directories under which to store log files
log.dirs=/usr/local/kafka/logs //Kafka存储消息日志文件的目录
Kafka开机自启文件
vim /usr/lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target zookeeper.service
After=network.target zookeeper.service
[Service]
Type=simple
Environment="JAVA_HOME=/usr/local/jdk"
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
User=root
Group=root
Restart=on-failure
[Install]
WantedBy=multi-user.target
重新加载 systemd
管理器的主配置文件并启动
systemctl daemon-reload
systemctl enable --now kafka.service
Kafka配置环境变量
vim /etc/profile.d/kafka.sh
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile //重新加载环境变量

Kafka常用命令
kafka-topics.sh --create --topic name --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 //创建主题
kafka-topics.sh --list --bootstrap-server localhost:9092 //列出所有主题
kafka-topics.sh --describe --topic name --bootstrap-server localhost:9092 //描述主题
kafka-topics.sh --delete --topic name --bootstrap-server localhost:9092 //删除主题谨慎操作
Kafka主题管理命令及常用选项
- 创建主题
--create:启动创建主题模式。
--topic <主题名称>:指定要创建的主题名称。
--partitions <数量>:设置主题的分区数。
--replication-factor <数量>:设置每个分区的副本数。
--bootstrap-server <地址:端口>:指定Kafka服务器的连接地址和端口。- 列出所有主题
--list:列出所有主题。
--bootstrap-server <地址:端口>:指定Kafka服务器的连接地址和端口。- 描述主题
--describe:获取主题的详细信息。
--topic <主题名称>:指定要描述的具体主题(可选,不指定则描述所有主题)。
--bootstrap-server <地址:端口>:指定Kafka服务器的连接地址和端口。- 删除主题
--delete:启动删除主题模式。
--topic <主题名称>:指定要删除的主题名称。
--bootstrap-server <地址:端口>:指定Kafka服务器的连接地址和端口。
注意:--bootstrap-server 是一个通用选项,用于所有操作以指定Kafka集群的连接点。每个命令都需要这个选项来知道如何连接到Kafka。
kafka-console-producer.sh --broker-list localhost:9092 --topic name // 启动一个生产者,发送消息到指定主题
>first message //第一条消息
>second message //第二条消息
Ctrl +c //退出
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic name --group my-consumer-group //启动一个消费者,从指定主题读取消息
Ctrl +c //退出
kafka-configs.sh --bootstrap-server localhost:9092 --describe --topic name //查看 topic 配置
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list //列出所有消费者组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group //描述消费者组
常用选项
- 启动一个生产者
--bootstrap-server : 指定 Kafka broker 的地址和端口。
--topic : 指定要发送消息的主题。- 启动一个消费者
--bootstrap-server : 指定 Kafka broker 的地址和端口。
--topic : 指定要从哪个主题读取消息。
--group : 指定消费者所属的消费者组 ID。- 查看 topic 配置
--bootstrap-server : 指定 Kafka broker 的地址和端口。
--topic : 指定要查看配置的主题。
--describe: 描述主题的详细信息。- 列出所有消费者组
--bootstrap-server : 指定 Kafka broker 的地址和端口。
--list: 列出所有消费者组。- 描述消费者组
--bootstrap-server : 指定 Kafka broker 的地址和端口。
--describe: 描述消费者组的详细信息。
--group : 指定要描述的消费者组 ID。
shell安装Kafka
#!/bin/sh
wget https://repo.huaweicloud.com/java/jdk/8u202-b08/jdk-8u202-linux-x64.tar.gz
/usr/bin/tar xvf jdk-8u202-linux-x64.tar.gz -C /usr/local/
/usr/bin/ln -s /usr/local/jdk1.8.0_202/ /usr/local/jdk
/usr/bin/cat > /etc/profile.d/jdk.sh << "EOF"
export JAVA_HOME=/usr/local/jdk
export JRE_HOME=/usr/local/jdk/jre
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
EOF
source /etc/profile
wget https://mirrors.huaweicloud.com/apache/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz
/usr/bin/tar zxvf apache-zookeeper-3.5.7-bin.tar.gz
/usr/bin/mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7
/usr/bin/ln -s /usr/local/zookeeper-3.5.7/ /usr/local/zookeeper
/usr/bin/cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
/usr/bin/sed -i 's|dataDir=/tmp/zookeeper|dataDir=/data/zookeeper|g' /usr/local/zookeeper/conf/zoo.cfg
/usr/bin/cat > /usr/lib/systemd/system/zookeeper.service << EOF
[Unit]
Description=ZooKeeper Service
After=network.target
[Service]
Type=forking
Environment=JAVA_HOME=/usr/local/jdk
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
Restart=always
[Install]
WantedBy=default.target
EOF
/usr/bin/systemctl daemon-reload
/usr/bin/systemctl enable --now zookeeper.service
/usr/bin/cat > /etc/profile.d/zookeeper.sh << "EOF"
export ZK_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZK_HOME/bin
EOF
source /etc/profile
wget https://mirrors.huaweicloud.com/apache/kafka/3.8.0/kafka_2.12-3.8.0.tgz
/usr/bin/tar zxvf kafka_2.12-3.8.0.tgz -C /usr/local/
/usr/bin/ln -s /usr/local/kafka_2.12-3.8.0/ /usr/local/kafka
/usr/bin/cp -a /usr/local/kafka/config/server.properties /usr/local/kafka/config/server.properties-bak
/usr/bin/sed -i 's/^#listeners/listeners/' /usr/local/kafka/config/server.properties
/usr/bin/sed -i 's/log.dirs=\/tmp\/kafka-logs/log.dirs=\/usr\/local\/kafka\/logs\/kafka-logs/' /usr/local/kafka/config/server.properties
/usr/bin/cat > /usr/lib/systemd/system/kafka.service << EOF
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target zookeeper.service
After=network.target zookeeper.service
[Service]
Type=simple
Environment="JAVA_HOME=/usr/local/jdk"
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
User=root
Group=root
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
/usr/bin/systemctl daemon-reload
/usr/bin/systemctl enable --now kafka.service
/usr/bin/cat > /etc/profile.d/kafka.sh << "EOF"
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile