介绍
Kafka是一种开源的分布式流处理平台,最初由LinkedIn开发并于2011年开源。它被设计用于高吞吐量、低延迟的数据传输,以处理实时数据流。kafka的设计目标是提供一种可持久化的、高效的、容错的发布/订阅消息系统。
kafka的核心概念是消息日志。它将消息topic(主题)的形式进行组织、每个topic可以有多个分区(partition),每个分区有可以有多个副本(replica)。生产者(producer)将消息发布到指定的topic中,而消费者(consumer)可以订阅一个或多个topic,并从分区中读取消息。
kafka具有以下特性:
- 高吞吐量:kafka能够处理大规模的消息流,并具有很高的写入和读取吞吐量。它通过批量处理和顺序写入磁盘等优化策略来实现高吞吐量的消息传递。
- 可扩展性:kafka的分区和副本机制使得它能够水平扩展,以适应不断增长的数据流量。通过增加分区和副本,可以提高存储容量和处理能力。
- 持久性:kafka将消息持久化到磁盘上,即使消费者离线一段时间,也能够保留消息。这使得kafka可以作为可靠的数据存储系统使用。
- 容错性:kafka通过将消息复制到多个副本中来提供容错能力。当某个副本发生故障时,可以从其他副本中继续读取消息,确保数据的可靠性和可用性。
- 多语言支持:kafka提供了多种编程语言的客户端API,包括Java、Python、Go等,使得开发者可以使用自己熟悉的语言与kafka进行交互。
- 实时处理:kafka支持实时流处理,可以与其他流处理框架(如Apache Storm,Apache Flink)集成,用于实时数据分析和处理。它可以作为一个可靠的数据管道,将实时数据流传递给留处理引擎进行实时计算和分析。
- 可靠性保证:kafka提供了多种机制来确保消息的可靠传递,包括消息复制、持久化存储、消息确认机制等。这些机制可以保证消息不会丢失,并且能够在消费者和生产者之间提供端到端的可靠性保证。
总之,kafka是一个高性能、可扩展、持久化、容错的分布式流处理平台,具有广泛的应用场景和丰富的特性,使得它成为处理大规模实时数据流的理想选择。
安装前准备
环境
- centos 7.x
- java 1.8.0_201
- Hadoop 3.3.4
- Zookeeper 3.7.1
- HBase 2.4.17
规划
| IP地址 |
主机名 |
kafka |
| 172.16.77.202 |
hadoop-master-202 |
broker,controller |
| 172.16.77.203 |
hadoop-node-203 |
broker,controller |
| 172.16.77.204 |
hadoop-node-204 |
broker,controller |
安装kafka
下载kafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
#下载kafka
wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
#解压至安装目录
tar -zxvf kafka_2.13-3.4.0.tgz -C /home/hadoop/software/
#安装包下主要目录和文件:
bin:包含了kafka的可执行文件,如启动脚本、管理工具等。你可以在这里找到用于启动kafka服务器和执行
其他管理任务的脚本文件。
config:包含了kafka的配置文件,用于配置kafka服务器的各种参数和选项。你可以在这里修改配置文件以
满足你的需求,如设置监听端口、存储路径、日志配置等。
libs:包含了kafka的依赖库文件、包括kafka的核心库以及其所依赖的第三方库。这些库文件被用于编译和
运行kafka服务器和客户端。
stie-docs:包含了kafka的官方文档,包括用户指南、API文档等。你可以在这里找到详细的文档来了解kafka
的使用和配置。
#将kafka安装目录分配至hadoop用户
chown -R hadoop:hadoop /home/hadoop/software/
|
配置环境变量
此处按自己的需求来,没有必须在/etc/profile添加 正常在~/.bashrc下添加就行,这里都加上了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
vim /etc/profile
......
#kafka
export KAFKA_HOME=/home/hadoop/software/kafka_2.13-3.4.0
export PATH=$KAFKA_HOME/bin:$PATH
##加载环境环境,测试是否安装成功
source /etc/profile
echo $KAFKA_HOME
#出现以下内容,说明正常
/home/hadoop/software/kafka_2.13-3.4.0
|
1
2
3
4
5
6
7
8
9
10
|
su - hadoop
vim ~/.bashrc
......
#kafka
export KAFKA_HOME=/home/hadoop/software/kafka_2.13-3.4.0
export PATH=$KAFKA_HOME/bin:$PATH
##加载环境环境
source ~/.bashrc
|
配置kafka
配置kraft/server.perties
这是使用fraft模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
cd $KAFKA_HOME/config
vim kraft/server.properties
......
############################# Server Basics #############################
# 这个服务器的角色,设置这个让我们进入 KRaft 模式
#broker为消费代理节点,controller为控制器节点
#通过将process.roles配置设置为broker,controller,该Kafka服务器将同时扮演broker和controller两个角色
process.roles=broker,controller
#节点ID,自己设置每个节点的值要不同
node.id=1
# Controller节点配置,控制器节点的连接字符串,用于管理状态的节点(替换Zookeeper作用)
controller.quorum.voters=1@172.16.77.202:9093,2@172.16.77.203:9093,3@172.16.77.204:9093
############################# Socket Server Settings #############################
# 服务监听地址端口
# 组合节点(即那些具有`process.role=broker,controller'的节点)必须至少在这里列出控制器监听器
# listeners 配置项用于设置监听的地址
#PLAINTEXT 是监听器的名称,用于普通的数据传输。 broker节点
#CONTROLLER 是监听器的名称,用于控制器节点之间的通信。 controller节点
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# 用于设置 Kafka Broker 之间通信的监听器的名称。
inter.broker.listener.name=PLAINTEXT
# 使用IP端口,每个节点填写自己节点的IP
# 用于设置 Kafka Broker 对外发布的地址列表,这些地址会被告知给 Kafka 客户端,
# 客户端通过这些地址连接到 Kafka Broker 并进行数据读写。
# 如果没有设置,它使用listener的值。
advertised.listeners=PLAINTEXT://172.16.77.202:9092
# 控制器使用的监听器名称的逗号分隔列表。
# 如果在listener.security.protocol.map中未设置显式映射,则默认使用 PLAINTEXT 协议。
# 如果在 KRaft 模式下运行,则必须使用此选项。
controller.listener.names=CONTROLLER
# 将侦听器名称映射到安全协议,默认情况下它们是相同的
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 服务器用于接收网络请求和向网络发送响应的线程数 默认为3
num.network.threads=3
# 服务器用于处理请求的线程数,其中可能包括磁盘 I/O 默认为8
num.io.threads=8
# 套接字服务器使用的发送缓冲区 (SO_SNDBUF) 默认值为102400 100KB
socket.send.buffer.bytes=102400
# 套接字服务器使用的接收缓冲区 (SO_RCVBUF) 默认值为102400 100KB
socket.receive.buffer.bytes=102400
# 套接字服务器接受请求的最大大小(防止 OOM) 默认值为104857600 100MB 这里设置为500MB
socket.request.max.bytes=524288000
############################# Log Basics #############################
# 用于存储日志文件位置,用逗号分隔目录列表
log.dirs=/home/hadoop/software/kafka_2.13-3.4.0/kafka-logs
#每个主题的默认日志分区数。更多的分区可以实现更大的并行消费,但也会导致broker之间的文件更多。默认为1
num.partitions=1
#在启动时进行日志恢复和关闭时进行刷写操作时,每个数据目录使用的线程数配置项。
#建议在使用 RAID 阵列的安装中增加此值。 默认为1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# 组元数据内部主题"__consumer_offsets "和"__transaction_state "的复制因子
# 对于开发测试以外的任何情况,建议使用大于 1 的值来确保可用性,如 3。
# 配置用于保存消费者位移的特殊主题(__consumer_offsets)的副本数量。默认为1
offsets.topic.replication.factor=1
# 用于设置事务状态(__transaction_state)主题的复制因子。
transaction.state.log.replication.factor=1
#事务状态日志的最小副本同步因子。
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
#默认情况下,消息会立即写入文件系统,但只有在需要时才会通过 fsync() 同步到操作系统缓存。
#以下配置项控制数据刷写到磁盘的方式。在这里需要考虑以下几个重要的权衡:
#数据持久性(Durability):如果不使用复制机制,未刷写的数据可能会丢失。
#延迟(Latency):非常大的刷写间隔可能会导致延迟峰值,因为在刷写时需要刷写大量数据。
#吞吐量(Throughput):刷写通常是最昂贵的操作,较小的刷写间隔可能会导致过多的寻址操作。
#下面的设置允许配置刷写策略,可以根据时间间隔或每个 N 条消息来刷写数据(或两者兼而有之)。这可以在全局范围内进行配置,并可以在每个主题上进行覆盖设置。
#在强制将数据刷新到磁盘之前接受的消息数量。
#log.flush.interval.messages=10000
#用于设置消息在日志中存放的最长时间,超过该时间后将强制执行数据刷新操作。
#单位毫秒
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
#以下配置控制日志分段的处理。可以根据一定的时间或已累积的大小设置策略,来删除日志分段。
#只要满足其中的任一条件,就会删除一个日志分段。删除始终从日志的末尾开始。
#设置日志文件的最小保存时间(以小时为单位)。当日志文件的年龄达到或超过该配置值时,它将成为符合删除条件的候选文件。
#默认值是一周
log.retention.hours=168
#用于设置基于日志文件大小的保留策略。该配置项定义了日志文件的最小保存大小(以字节为单位)。
#当日志文件的总大小超过或等于该配置值时,Kafka 将开始删除最旧的日志段,直到剩余的日志段的总大小
#低于 log.retention.bytes。 默认值是1073741824 1GB
# 如果要将 log.retention.bytes 参数设置为 10 GB,则应将其值设置为 10737418240。
#log.retention.bytes=1073741824
#用于设置日志段文件的最大大小(以字节为单位)。当一个日志段文件达到该大小时,
#Kafka 将创建一个新的日志段文件来继续写入消息。
# 默认值为1073741824 1GB
log.segment.bytes=1073741824
#设置检查日志段是否符合保留策略并可以删除的时间间隔(以毫秒为单位)。默认值300000(5分钟)
log.retention.check.interval.ms=300000
|
分发目录
将kafka安装目录拷贝到各个节点上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
#将hbase安装目录分配至hadoop用户
chown -R hadoop:hadoop /home/hadoop/software/
#切换到hadoop用户
su - hadoop
#查看路径
pwd
/home/hadoop
#传输kafka
scp -r software/kafka_2.13-3.4.0/ hadoop@172.16.77.203:/home/hadoop/software/
scp -r software/kafka_2.13-3.4.0/ hadoop@172.16.77.204:/home/hadoop/software/
#传输bashrc
scp ~/.bashrc hadoop@172.16.77.203:~/.bashrc
scp ~/.bashrc hadoop@172.16.77.204:~/.bashrc
######注意#####
#分发目录后,需要在203、204上修改kraft/server.properties
##203下
vim kraft/server.properties
......
# 节点ID,自己设置每个节点的值要不同
node.id=2
# 使用IP端口,每个节点填写自己节点的IP
advertised.listeners=PLAINTEXT://172.16.77.203:9092
#204下
......
# 节点ID,自己设置每个节点的值要不同
node.id=3
# 使用IP端口,每个节点填写自己节点的IP
advertised.listeners=PLAINTEXT://172.16.77.204:9092
|
初始化集群
在其中一台服务器上生成一个uuid
1
2
3
4
5
|
cd $KAFKA_HOME
bin/kafka-storage.sh random-uuid
8Hv_zdcXQt-W8Bxvn2sIxw
|
使用上面生成的uuid格式化kafka存储目录,所有节点上都需要执行
1
2
3
|
cd $KAFKA_HOME
bin/kafka-storage.sh format -t 8Hv_zdcXQt-W8Bxvn2sIxw -c config/kraft/server.properties
|
启动kafka
三台都需要启动
1
2
3
4
5
6
7
8
9
10
11
12
|
cd $KAFKA_HOME
#启动后服务会占用9092和9093端口
bin/kafka-server-start.sh -daemon config/kraft/server.properties
#jps查看
jps
......
7640 Kafka
#关闭
bin/kafka-server-stop.sh
|
测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
cd $KAFKA_HOME
#创建主题
#创建一个名为test的主题,该主题包含三个分区,每个分区有3个副本
bin/kafka-topics.sh --bootstrap-server 172.16.77.202:9092 --create --topic test --partitions 3 --replication-factor 3
#查看主题
bin/kafka-topics.sh --bootstrap-server 172.16.77.202:9092 --describe test
Topic: test TopicId: f6-o3s5GQPyHSe-YPABumw PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
|