Kafka
2026/1/1大约 4 分钟
Kafka基本概念和功能
Kafka是一个开源的分布式事件流平台,它提供了高吞吐量、低延迟的消息传递机制,可以处理海量的实时数据。下面是Kafka的基本概念和功能:
- 发布/订阅消息系统:Kafka提供了发布/订阅消息系统功能,可以让生产者将消息发布到主题中,让消费者从主题中订阅消息。
- 分布式存储:Kafka提供了分布式存储功能,可以将消息持久化到磁盘中,并支持数据的副本复制,确保数据的可靠性。
- 高吞吐量:Kafka提供了高吞吐量功能,可以处理每秒数百万条消息,适用于大规模的数据处理场景。
- 低延迟:Kafka提供了低延迟功能,可以在毫秒级别内传递消息,适用于实时数据处理场景。
- 流处理:Kafka提供了流处理功能,可以使用Kafka Streams进行实时数据处理和转换。
为什么要用Kafka?
使用Kafka有以下几个优点:
- 高吞吐量:Kafka可以处理每秒数百万条消息,适用于大规模的数据处理场景。
- 低延迟:Kafka可以在毫秒级别内传递消息,适用于实时数据处理场景。
- 可靠性:Kafka支持数据的副本复制和持久化,可以确保数据的可靠性。
- 可扩展性:Kafka支持水平扩展,可以通过增加Broker来扩展Kafka集群的容量。
- 生态系统丰富:Kafka有着丰富的生态系统,可以与Spark、Flink、Storm等流处理框架集成。
总之,使用Kafka可以帮助开发者构建高吞吐量、低延迟的实时数据处理系统,提高应用程序的可用性、灵活性和可维护性。
Kafka应用场景?
Kafka主要应用于日志收集、指标监控、流式处理、事件溯源、消息系统和数据集成等场景,可以帮助开发者构建高吞吐量、低延迟的实时数据处理系统。
Kafka核心组件
Kafka的核心组件包括:
- Producer:生产者,用于发送消息到Kafka。
- Consumer:消费者,用于从Kafka接收消息。
- Broker:Kafka服务器,用于存储消息和处理客户端请求。
- Topic:主题,用于分类存储消息。
- Partition:分区,是Topic的物理分区,一个Topic可以有多个Partition。
- Consumer Group:消费者组,一组消费者共同消费一个Topic的消息。
Kafka核心架构图
+------------------+ +------------------+ +------------------+
| Producer | | Producer | | Producer |
| | | | | |
+------------------+ +------------------+ +------------------+
| | |
| | |
| | |
+-------------------------------------------------------------+
| Kafka Cluster |
| +--------------+ +--------------+ +--------------+ |
| | Broker 1 | | Broker 2 | | Broker 3 | |
| | Topic A-P0 | | Topic A-P1 | | Topic A-P2 | |
| | Topic B-P0 | | Topic B-P1 | | Topic B-P2 | |
| +--------------+ +--------------+ +--------------+ |
+-------------------------------------------------------------+
| | |
| | |
| | |
+------------------+ +------------------+ +------------------+
| Consumer | | Consumer | | Consumer |
| Group 1 | | Group 2 | | Group 3 |
+------------------+ +------------------+ +------------------+在上面的架构图中,分别展示了生产者将消息发送到Kafka集群,Kafka集群将消息存储在不同的Broker和Partition中,然后消费者组从Kafka集群中消费消息。
Kafka安装与测试
Windows安装
1、下载Kafka
从Kafka官网下载最新版本的Kafka(推荐使用3.5.0及以上版本)。
2、解压文件
将下载的压缩包解压到本地目录,例如 D:\kafka。
3、启动Kafka(KRaft模式,无需Zookeeper)
3.1 生成集群ID
cd D:\kafka\bin\windows
kafka-storage.bat random-uuid3.2 格式化存储
kafka-storage.bat format -t <生成的集群ID> -c ..\..\config\kraft\server.properties3.3 启动Kafka服务器
kafka-server-start.bat ..\..\config\kraft\server.propertiesLinux安装
1、下载Kafka
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz2、解压文件
tar -xzf kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.03、启动Kafka(KRaft模式,无需Zookeeper)
3.1 生成集群ID
bin/kafka-storage.sh random-uuid3.2 格式化存储
bin/kafka-storage.sh format -t <生成的集群ID> -c config/kraft/server.properties3.3 启动Kafka服务器
bin/kafka-server-start.sh config/kraft/server.propertiesDocker安装
1、传统模式(带Zookeeper)
docker pull bitnami/kafka:3.5.0
docker pull bitnami/zookeeper:3.8.1
# 启动Zookeeper
docker run -d \
--name zookeeper \
-p 2181:2181 \
-e ALLOW_ANONYMOUS_LOGIN=yes \
--restart=always \
bitnami/zookeeper:3.8.1
# 启动Kafka
docker run -d \
--name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
--link zookeeper:zookeeper \
--restart=always \
bitnami/kafka:3.5.02、最新版本(KRaft模式,无需Zookeeper)
docker pull bitnami/kafka:3.7.0
# 启动Kafka(KRaft模式)
docker run -d \
--name kafka \
-p 9092:9092 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
--restart=always \
bitnami/kafka:3.7.0Spring Boot 整合 Kafka
1、添加依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>2、配置Kafka
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer3、发送消息
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}4、接收消息
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void receiveMessage(String message) {
System.out.println("收到消息:" + message);
}
}