1. Kafka简介与应用场景
Apache Kafka是一种高性能的分布式消息队列系统,广泛应用于以下场景:
日志聚合:收集和汇总系统日志,便于集中管理和分析。
事件源:实时处理用户行为事件,如点击流、购买行为等。
流处理:与流处理框架(如Apache Flink、Apache Spark Streaming)结合,进行实时数据分析。
微服务通信:作为微服务架构中的消息中间件,实现服务间异步通信。
物联网(IoT):处理来自传感器的实时数据,支持大规模设备的数据传输。
Producer:Producer 即生产者,消息的产生者,是消息的入口Broker :Broker 是 kafka 一个实例,每个服务器上有一个或多个 kafka 的实例,简单的理解就是一台 kafka 服务器,kafka cluster表示集群的意思Topic:消息的主题,可以理解为消息队列,kafka的数据就保存在topic。在每个 broker 上都可以创建多个 topic 。Partition :Topic的分区,每个 topic 可以有多个分区,分区的作用是做负载,提高 kafka 的吞吐量。同一个 topic 在不同的分区的数据是不重复的,partition 的表现形式就是一个一个的文件夹!Replication:每一个分区都有多个副本,副本的作用是做备胎,主分区(Leader)会将数据同步到从分区(Follower)。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为 Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本Message:每一条发送的消息主体。Consumer:消费者,即消息的消费方,是消息的出口。Consumer Group:我们可以将多个消费组组成一个消费者组,在 kafka 的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!Zookeeper:kafka 集群依赖 zookeeper 来保存集群的的元信息,来保证系统的可用性。
2. 使用Docker安装Kafka
以下是使用Docker安装Kafka的详细步骤:
2.1 拉取Zookeeper和Kafka镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
2.2 启动Zookeeper和Kafka
# 启动Zookeeper
docker run -d --name zookeeper -p 2181:2181 -e TZ="Asia/Shanghai" --restart always wurstmeister/zookeeper
# 启动Kafka
docker run -d --name kafka -p 9092:9092 --link zookeeper \
-e KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" wurstmeister/kafka:latest
说明:
192.168.200.130 是你的Kafka容器的IP地址,需要根据你的实际环境进行调整。--name:容器名字-p:端口号KAFKA_BROKER_ID:该ID是集群的唯一标识KAFKA_ADVERTISED_LISTENERS:kafka发布到zookeeper供客户端使用的服务地址。KAFKA_ZOOKEEPER_CONNECT:zk的连接地址KAFKA_LISTENERS:允许使用PLAINTEXT侦听器
3. Spring Boot集成Kafka
在Spring Boot项目中集成Kafka,需要进行以下步骤:
3.1 引入依赖
在pom.xml中添加Kafka相关的依赖:
3.2 配置Kafka连接
在application.properties或application.yml中配置Kafka的连接信息:
spring.kafka.bootstrap-servers=192.168.200.130:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
这里192.168.200.130是你的Kafka容器的IP地址,9092是Kafka服务的端口。
3.3 创建生产者
创建一个Kafka消息生产者类,使用KafkaTemplate来发送消息:
public boolean sendMsg(String topics, String value) {
// 发送消息
CompletableFuture completableFuture = kafkaTemplate.send(topics, value);
completableFuture.thenAcceptAsync( o ->{
logger.info("消息发送成功");
}).exceptionally(throwable ->{
logger.error("消息发送失败");
return false;
});
return true;
}
3.4 创建消费者
创建一个Kafka消息消费者类,通过@KafkaListener注解监听指定主题的消息:
package com.example.demo;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "my_topic", groupId = "my-group")
public void receiveMessage(String message) {
System.out.println("Message received: " + message);
// 处理接收到的消息逻辑
}
}
3.5 测试发送消息
在Spring Boot的Controller中调用生产者的方法来发送消息:
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
@Autowired
private KafkaProducer producer;
@GetMapping("/send")
public String sendMessage(@RequestParam String message) {
producer.sendMessage("my_topic", message);
return "Message sent successfully!";
}
}
4. 启动和测试
启动Spring Boot应用:确保你的Spring Boot应用已经正确配置并启动。
访问接口发送消息:通过浏览器或Postman访问http://localhost:8080/send?message=Hello%20Kafka,发送消息到Kafka的my_topic主题。
查看消费者日志:观察消费者类的日志,确认消息是否被成功接收。
5. 注意事项
网络问题:确保你的Spring Boot应用能够访问到192.168.200.130这个IP地址。如果在不同的网络环境下,可能需要调整IP地址或使用Docker的网络配置。
防火墙和端口:确保9092端口没有被防火墙阻止。
主题创建:在发送消息之前,确保Kafka中已经创建了my_topic主题。如果主题不存在,Kafka会根据配置自动创建,但最好手动创建以避免潜在问题。
时间同步:确保Docker容器和宿主机的时间同步,避免因时间差异导致问题。
6. 手动创建Kafka主题
在发送消息之前,可以通过以下命令手动创建Kafka主题:
docker exec -it kafka bin/kafka-topics.sh --create --topic my_topic --bootstrap-server 192.168.200.130:9092 --partitions 1 --replication-factor 1
7. 总结
通过以上步骤,你可以在使用Docker安装的Kafka环境中成功实现Spring Boot应用的消息发送和接收功能。Kafka的强大功能使其在多种场景下都能发挥重要作用,希望这些内容对你有所帮助!