Kafka集群搭建

  • A+
所属分类:Kafka

Kafka集群介绍

Kafka是一个分布式、可扩展、高吞吐量的消息队列系统,广泛应用于日志收集、实时监控、数据管道、流处理等领域。Kafka集群是由多个Kafka服务器组成的集合,它们协同工作以提供高可用性、可扩展性和容错性。

组件介绍
Kafka Brokers:独立服务器,存储处理消息,维护分区副本,处理读写请求;
Zookeeper(旧版):分布式协调服务,管理集群元数据,Kafka 2.4.0前必需;
KRaft模式(新式):Kafka 2.4.0引入,无需Zookeeper,自行管理集群元数据;
Topics:消息类别或feeds,生产者发布,消费者订阅,分类存储消息;
Partitions:主题分区,有序不可变消息序列,分布存储,实现负载均衡和容错;
Replication:数据复制机制,分区多副本,分布不同Broker,保证数据可靠性;
Kafka Connect:数据传输工具,连接Kafka与其他系统,如数据库、日志收集系统;
Kafka Streams:实时流处理库,构建Kafka上流处理应用,实现复杂数据处理;
Producers:消息发送者,应用程序或系统,向Kafka集群发布消息;
Consumers:消息接收者,应用程序或系统,从Kafka集群订阅并处理消息;
Consumer Groups:消费者组,共同消费主题消息,Kafka确保消息只发送给组内一个消费者。

集群特点
高可用性:通过多个Broker和Replication机制,Kafka集群可以在某个Broker失败时继续提供服务;
可扩展性:可以轻松添加更多的Broker到集群中,以处理更多的消息和更高的吞吐量;
容错性:Partition和Replication机制确保了在硬件故障或网络问题发生时,数据不会丢失;
高性能:Kafka设计为高吞吐量的消息系统,可以处理数千个消息/秒的传输速率;
分布式:Kafka集群是分布式的,可以在多个数据中心或云区域中部署。

应用场景
日志收集:集中存储和处理来自多个源的大量日志数据;
实时监控:实时监控和分析系统指标和事件;
数据管道:在系统之间可靠地传输数据;
流处理:构建实时数据流处理应用程序。

主机IP组件
kafka-node01192.168.22.14JDK,Zookeeper,Kafka
kafka-node02192.168.22.15JDK,Zookeeper,Kafka
kafka-node03192.168.22.16JDK,Zookeeper,Kafka

所有节点配置hosts vim /etc/hosts

192.168.22.14 kafka-node01
192.168.22.15 kafka-node02
192.168.22.16 kafka-node03

所有节点安装JDK

wget https://repo.huaweicloud.com/java/jdk/8u202-b08/jdk-8u202-linux-x64.tar.gz
tar xvf jdk-8u202-linux-x64.tar.gz -C /usr/local/
ln -s /usr/local/jdk1.8.0_202/ /usr/local/jdk
vim /etc/profile.d/jdk.sh
export JAVA_HOME=/usr/local/jdk
export JRE_HOME=/usr/local/jdk/jre
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
source /etc/profile

搭建Zookeeper集群

所有节点安装Zookeeper

wget https://mirrors.huaweicloud.com/apache/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz
tar zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7
ln -s /usr/local/zookeeper-3.5.7/ /usr/local/zookeeper
cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg

修改配置文件zoo.cfg

vim /usr/local/zookeeper/conf/zoo.cfg
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
server.1=kafka-node01:2888:3888
server.2=kafka-node02:2888:3888
server.3=kafka-node03:2888:3888

创建数据目录和日志目录

mkdir -p /usr/local/zookeeper/data/
mkdir -p /usr/local/zookeeper/logs/

在每个节点创建myid文件

echo 1 > /usr/local/zookeeper/data/myid        //kafka-node01执行
echo 2 > /usr/local/zookeeper/data/myid        //kafka-node02执行
echo 3 > /usr/local/zookeeper/data/myid        //kafka-node03执行

Zookeeper开机自启文件

vim /usr/lib/systemd/system/zookeeper.service
[Unit]
Description=Zookeeper Service
After=network.target

[Service]
Type=forking
Environment="JAVA_HOME=/usr/local/jdk"
Environment="ZOOKEEPER_HOME=/usr/local/zookeeper"
Environment="ZOO_LOG_DIR=/usr/local/zookeeper/logs"
Environment="ZOO_CONF_DIR=/usr/local/zookeeper/conf"
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart
PIDFile=/usr/local/zookeeper/data/zookeeper_server.pid
Restart=on-failure

[Install]
WantedBy=multi-user.target

开机自启并启动Zookeeper

systemctl daemon-reload
systemctl start zookeeper.service
systemctl enable zookeeper.service

Zookeeper配置环境变量

vim /etc/profile.d/zookeeper.sh
export ZK_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZK_HOME/bin
source /etc/profile

查看zookeeper服务状态,在每个节点执行zkServer.sh status,leader(领导者)follower(追随者)

Kafka集群搭建
Kafka集群搭建
Kafka集群搭建

搭建Kafka集群

所有节点安装Kafka

wget https://mirrors.huaweicloud.com/apache/kafka/3.8.0/kafka_2.12-3.8.0.tgz
tar zxvf kafka_2.12-3.8.0.tgz -C /usr/local/
ln -s /usr/local/kafka_2.12-3.8.0/ /usr/local/kafka
cp -a /usr/local/kafka/config/server.properties /usr/local/kafka/config/server.properties-bak

修改配置文件server.properties

vim /usr/local/kafka/config/server.properties
broker.id=1        //每个Kafka节点的唯一标识,另外两个节点需要修改
log.dirs=/usr/local/kafka/logs        //Kafka日志存储路径
zookeeper.connect=192.168.22.14:2181,192.168.22.15:2181,192.168.22.16:2181        //ZooKeeper集群地址
listeners=PLAINTEXT://0.0.0.0:9092        //Kafka服务监听端口
advertised.listeners=PLAINTEXT://kafka-node01:9092        //客户端连接时使用的地址和端口,另外两个节点改为自己的主机名
num.partitions=3        //默认分区数
default.replication.factor=3        //默认副本数,需手动添加

#日志保留策略
log.retention.hours=168        //日志保留小时数
log.retention.bytes=1073741824        //日志保留字节数
log.segment.bytes=1073741824        //日志段大小
log.retention.check.interval.ms=300000        //检查日志保留策略的时间间隔

#性能调优
num.network.threads=3        //处理网络IO的线程数
num.io.threads=8        //处理网络请求的线程数
num.replica.fetchers=1        //副本拉取线程数,需手动添加
socket.send.buffer.bytes=102400        //发送缓冲区大小
socket.receive.buffer.bytes=102400        //接收缓冲区大小
socket.request.max.bytes=104857600        //请求最大字节数

Kafka开机自启文件

vim /usr/lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
Environment=JAVA_HOME=/usr/local/jdk
Environment=KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties
Restart=on-failure
RestartSec=30
User=root
Group=root

[Install]
WantedBy=multi-user.target

开机自启并启动Kafka

systemctl daemon-reload
systemctl enable --now kafka.service

Kafka配置环境变量

vim /etc/profile.d/kafka.sh
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile

验证集群

kafka-topics.sh --bootstrap-server 192.168.22.14:9092,192.168.22.15:9092,192.168.22.16:9092 --create --topic name --partitions 3 --replication-factor 2        //创建一个名为name的主题

--bootstrap-server 192.168.22.15:9092:指定Kafka集群的启动服务器地址;
--create:指示脚本执行创建主题的操作;
--topic name:指定要创建的主题名称;
--partitions 3:指定主题的分区数;
--replication-factor 2:指定每个分区的副本数。

kafka-topics.sh --bootstrap-server 192.168.22.14:9092 --describe --topic name        //描述Kafka集群中的主题的详细信息

--bootstrap-server 192.168.22.14:9092:指定单个Kafka节点地址,客户端通过它发现整个集群;
--describe:获取主题的详细信息;
--topic name:指定要查看的主题名称。

Kafka集群搭建

启动一个生产者,发送消息到指定主题

kafka-console-producer.sh --broker-list localhost:9092 --topic name        //在kafka-node01执行
Kafka集群搭建

启动一个消费者,从指定主题读取消息

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic name --group my-consumer-group        //在kafka-node02执行
Kafka集群搭建
xxx