基本概念

052e404382634514803ed61659236e47.png
620a47301cb3463ebbaef396a16cb20e.png

  • producer:发布消息的对象(主题生产者)
  • topic:kafka将消息分门别类,每一类消息称为一个主题
  • consumer:订阅消息并处理发布的消息的对象成为主题消费者
  • broker:已发布的消息保存在一组服务器中,成为kafka集群。集群中每个服务器都是一个代理(Broker)。消费者可以订阅一个或多个主题(topic),并从broker拉取数据,从而消费这些已发布的消息。

安装kafka

kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装kafka之前必须先安装zookeeper

1. 安装zookeeper

docker run -d \
--name zookeeper \
--restart always \
-p 2181:2181 \
zookeeper:3.4.14

2. 安装kafka

docker run -d --name kafka \
--restart always \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.140.102 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.140.102:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.140.102:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host \
wurstmeister/kafka:2.12-2.3.1

--net=host:直接使用容器宿主机的网络命名空间,没有独立的网络环境。使用宿主机的ip和端口,也可以使用-p 9092:9092代替

3. 项目集成kafka

  1. 导入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
  1. 生产者:
public class ProducerQuickStart {
public static void main(String[] args) {
// 1. kafka的连接配置信息
Properties prop = new Properties();
// kafka的连接地址
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.140.102:9092");
// key的序列化
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// value的序列化
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

// 2. 创建kafka生产者对象
KafkaProducer<String, String> producer = new KafkaProducer(prop);

// 3. 发送消息
/**
* 第一个参数:topic
* 第二个参数:消息的key
* 第三个参数:消息的value
*/
ProducerRecord<String, String> record = new ProducerRecord<>("topic-first", "key-001", "hello kafka");
producer.send(record);

// 4. 关闭消息通道,必须要关闭,否则消息发送不成功
producer.close();
}
}
  1. 消费者:
public class ConsumerQuickStart {
public static void main(String[] args) {
// 1. kafka的连接配置信息
Properties prop = new Properties();
// kafka的连接地址
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.140.102:9092");
// key的反序列化
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// value的反序列化
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 设置消费者组
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");

// 2. 创建kafka消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);

// 3. 订阅主题
consumer.subscribe(Collections.singletonList("topic-first"));

// 4. 拉取消息
while (true) { // 让当前线程一直处于监听的状态
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000)); // 每秒钟拉取一次
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
String key = consumerRecord.key();
String value = consumerRecord.value();
System.out.println(key + ":" + value);
}
}
}
}

如果两个消费者在同一个消费者组内,此时就算只有多个消费者,那么只能有一个消费者收到消息(一对一)
3e7edd71c57a42f0ac05d969eea0d276.png
如果两个消费者在不同消费组内,此时所有消费者都可以收到消息(一对多)
242a212fdf394431aed386d613686efb.png

分区机制

kafka的分区机制是将每个主题(topic)划分成多个分区(Partition)
作用:这样就可以把数据存储到不同的机器上,并且机器上可以指定不同的分区。
7606f76bda97483797be6e6e33721eea.png
每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。每个分区下都维护了一个连续自增的数值,用来标记消息在什么位置【偏移量(offset)】。
1b07401b8c9e47c5b88209aa9a07286d.png
在发送消息的时候可以指定分区:

ProducerRecord<String, String> record = new ProducerRecord<>("topic-first", 0, "key-001", "hello kafka"); // 分区0

分区策略

32b71eecb1c14494892df71f61c37f39.png
不指定分区,默认是轮询,也可以指定分区进行存储。

Kafka的高可用设计

集群

Kafka的服务器由broker服务的进程构成,一个kafka集群由多个broker组成。
如果集群中某一台机器宕机了,其他机器上的broker也能够对外提供服务。
6bf86645577146469b64d4f2086f19c3.png

备份机制

kafka中消息的备份又叫做副本(Replica)
领导者副本:Leader Replica
追随者副本:Follower Replica

  • ISR:同步备份,leader保存数据后,会立即同步到ISR中
  • 普通:异步备份
    129513be9e444e9696111ea4cf0e6612.png

leader挂了之后,会选出新的leader,会优先从ISR中选;如果ISR的副本都不行了,就从普通副本里选
【极端情况】:所有副本都失效了
【解决方案】:
方案1:等待ISR中的第一个醒过来,选为Leader,数据可靠,但是活过来的时间不确定
方案2:选择第一个活过来的副本,不一定是ISR中的,选为Leader,这样可以最快速度恢复可用性,但不一定数据完整。

生产者

发送类型

同步发送

使用send()方法发送,会返回一个Future对象,调用get()方法进行等待就知道消息是否发送成功。

ProducerRecord<String, String> record = new ProducerRecord<>("topic-first", 0, "key-001", "hello kafka");
// 同步发送消息
RecordMetadata recordMetadata = producer.send(record).get();
System.out.println(recordMetadata.offset());

异步发送

调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数。

ProducerRecord<String, String> record = new ProducerRecord<>("topic-first", 0, "key-001", "hello kafka");
// 异步发送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null) {
System.out.println("记录一次信息到日志表中");
}
System.out.println(recordMetadata.offset());
}
});

连接参数配置信息

必须配置的连接参数

Properties prop = new Properties();
// kafka的连接地址
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.140.102:9092");
// key的序列化
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// value的序列化
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

ack消息确认机制

3a850408829847c88bbb843fa6b35fd3.png

Properties prop = new Properties();
prop.put(ProducerConfig.ACKS_CONFIG, "all"); // 确认机制

一般开发中不会设置ack的值,使用默认值。

retries消息重试机制

Properties prop = new Properties();
prop.put(ProducerConfig.RETRIES_CONFIG, 10); // 重试机制:重试次数10次

compression消息压缩

53a42f69a9d541eeb259bb7505980ab1.png

Properties prop = new Properties();
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); // 数据压缩

使用压缩可以降低网络传输开销和存储开销,这个往往是kafka发送消息的瓶颈所在。

消费者

消费者组

消费者组:由一个或多个消费者组成的群体
ec96f780d3d64df698eff4da84432b23.png
一个发布在topic上的消息被分发到消费者组中的一个消费者

  • queue模型:所有消费者都在一个组里
  • 发布-订阅模型:所有消费者都在不同的组中

消息的有序性

应用场景:

  1. 即时消息中的一对一聊天和群聊,保证发送方消息的发送顺序与接收方的接收顺序一致。
  2. 充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序。
    1cd07554d2c34611b93953e2fadd7da0.png
    topic分区中的消息只能由消费者组中唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。
    但是只能保证topic的一个分区顺序处理,不能保证跨分区的消息先后处理。如果想要顺序的处理topic的所有消息,就只能提供一个分区。

提交和偏移量

kafka不会像其他消息队列一样需要等到消费者的确认,消费者可以使用kafka来追踪消息在分区的位置(偏移量),消费者会往_consumer_offeset的特殊主题发送消息,消息里边包含了每个分区的偏移量。如果消费者发送崩溃,或又有新的消费者加入群组,就会触发再均衡
偏移量(Offset) 是消费者消费消息的核心机制之一,它记录了消费者在分区(Partition)中的消费进度,没有这个kafka不知道消费到哪里,也是在同一个分区下消息的消费会有顺序的原因

最开始的时候,消费者1、2、3各自负责各自的分区
3000c74db99d42ce9ac1b01b316671b9.png
此时如果消费者2发生了故障
a4841da9007d4411829cc9c4b87a5099.png
分区3、4没有消费者管理,此时就会触发再均衡,由消费者1和3分别接管这两个分区
4ada15f23cdf4098b5faf176432dd6cb.png
【存在的问题】:

  1. 如果提交的偏移量 < 客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理
    4c3b717bad1a408d9c528a8abab645ce.png
  2. 如果提交的偏移量 > 客户端的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会消失。(kafka默认情况下是拉取完数据就提交,实际上当获取到数据的时候,offset就提交了。但是业务还没有处理到offset的位置,就发生了故障。所以再均衡的时候,这部分数据就会发生丢失)
    a2ddb87729964e80a981f828d78a60e3.png

偏移量的提交方式

自动提交偏移量

当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5s消费者会自动把从poll()方法接收到的最大偏移量提交上去

手动提交偏移量

当enable.auto.commit被设置成false,此时可以改成以下三种提交方式:

Properties prop = new Properties();
// ...其他配置
// 开启手动提交偏移量的方式
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  1. 提交当前偏移量(同步提交)commitSync
// 拉取消息
while (true) { // 让当前线程一直处于监听的状态
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000)); // 每秒钟拉取一次
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + ":" + consumerRecord.value());
System.out.println("偏移量:" + consumerRecord.offset());
try {
// 同步提交偏移量
consumer.commitSync();
} catch (CommitFailedException e) {
System.out.println("记录提交失败的异常:" + e);
}
}
}

阻塞操作,等待Broker确认提交结果。性能较低,但能确保提交成功或抛出异常。(如果失败了,他会一直重试直到成功)

  1. 异步提交commitAsync
// 拉取消息
while (true) { // 让当前线程一直处于监听的状态
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000)); // 每秒钟拉取一次
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + ":" + consumerRecord.value());
System.out.println("偏移量:" + consumerRecord.offset());
}
// 异步提交偏移量(在for循环外提交)
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if(e != null) {
System.out.println("记录错误提交的偏移量:" + map + "异常信息为:" + e);
}
}
});
}

非阻塞操作,提交后立即返回,不等待Broker确认。性能更高,但无法保证提交成功(如果服务器返回提交失败后,异步提交不会进行重试)
【常见问题】:

  1. 同时有多个异步提交,可能会导致偏移量offset的覆盖。
  2. 消费者崩溃,最后一次异步提交可能未完成,导致偏移量未更新。
  3. 若Broker返回提交错误(如CommitFailedException),异步提交无法直接处理(需通过回调)。
  1. 同步和异步组合提交
// 拉取消息
try {
while (true) { // 让当前线程一直处于监听的状态
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000)); // 每秒钟拉取一次
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// 处理消息...
System.out.println(consumerRecord.key() + ":" + consumerRecord.value());
System.out.println("偏移量:" + consumerRecord.offset());
}
// 异步提交(正常流程、在for循环外提交)
consumer.commitAsync();
}
} catch (CommitFailedException e) {
// 处理提交失败异常(如消费者组已失效)
e.printStackTrace();
System.out.println("记录错误的信息:" + e);
}finally {
try {
consumer.commitSync(); // 同步提交(确保最终提交成功)
} finally {
consumer.close(); // 关闭消费者
}
}

会先进行异步提交,如果提交失败了,就记录日志,然后再进行同步的提交。
【正常流程中使用异步提交】:在消息处理循环中,优先使用commitAsync以提高吞吐量。即使某次提交失败,后续提交可能成功,避免阻塞处理。
【退出时使用同步提交】:在消费者关闭前(如finally块),使用commitSync确保最后一次提交成功。因为此时消费者即将终止,必须保证偏移量的正确性,避免重复消费。

SpringBoot集成kafka收发消息

传递普通类型的消息

  1. 导入依赖
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
  1. 配置kafka信息:
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 192.168.140.102:9092
producer:
# 重试次数
retries: 10
# key、value的序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 消费组
group-id: ${spring.application.name}-test
# key、value的反序列化器
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  1. 消息生产者
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;

@GetMapping("/hello")
public String hello(){
/**
* 第一个参数:topic
* 第二个参数:消息内容
*/
kafkaTemplate.send("name-topic","xiaolin0333");
return "ok";
}
  1. 消息消费者
@Component
public class HelloListener {
@KafkaListener(topics = "name-topic") // 和消息生产者的topic对应
public void onMessage(String message){ // message表示监听到的消息
if(!StringUtils.isEmpty(message)){
System.out.println(message);
}

}
}

传递对象类型的消息

目前springboot整合后的kafka,因为序列化器是StringSerializer,此时如果需要传递对象有两种方式:
方式1:自定义序列化器(但是对象类型众多,通用性不强)
方式2:把要传递的对象转成json字符串,接收消息后再转为对象

  1. 添加json序列化依赖:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
  1. 消息生产者
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@GetMapping("/hello")
public String hello() {
User user = new User("xiaolin", 18);
kafkaTemplate.send("user-topic", JSON.toJSONString(user));
return "ok";
}
  1. 消息消费者
@KafkaListener(topics = "user-topic")
public void onMessageUser(String userJson) {
if(!StringUtils.isEmpty(userJson)) {
User user = JSON.parseObject(userJson, User.class);
System.out.println(user);
}
}

Kafka实战——自媒体文章上下架

Kafka和OpenFeign

自媒体微服务中的文章需要上架或下架时,需要通知文章微服务

【使用OpenFeign的问题】:

  • 同步阻塞:OpenFeign 是基于 HTTP 的同步远程调用,自媒体微服务必须等待文章微服务的响应才能继续后续逻辑。
  • 性能瓶颈:如果文章微服务响应慢或不可用,自媒体微服务会被阻塞,导致接口延迟甚至超时。
  • 耦合性高:两个服务直接依赖,任何一方的故障都可能影响对方。

【改进方案】:自媒体微服务不再通过OpenFeign调用文章微服务,而是将文章上下架事件发送到 Kafka 消息队列,由文章微服务异步消费处理。从而达到系统的解耦。

【适用场景】:

  1. 适合 Kafka 的场景:
    • 不需要实时响应的操作(如状态同步、日志记录、通知)。
    • 高并发、高吞吐量的场景(如秒杀活动、批量任务)。
  2. 适合 OpenFeign 的场景:
    • 需要实时获取结果的场景(如支付确认、查询订单状态)。

需求分析

【需求】:已发表且已下架的文章可以上架;已发表且已上架的文章可以下架。
217a675a5ed74c1db7bb8e414cffa7ab.png
自媒体微服务是生产者,文章微服务是消费者,自媒体微服务向kafka发送修改文章状态的任务,文章微服务从kafka里取出任务后消费。