Kafka
|总字数:3.9k|阅读时长:14分钟|浏览量:|
基本概念


- producer:发布消息的对象(主题生产者)
- topic:kafka将消息分门别类,每一类消息称为一个主题
- consumer:订阅消息并处理发布的消息的对象成为主题消费者
- broker:已发布的消息保存在一组服务器中,成为kafka集群。集群中每个服务器都是一个代理(Broker)。消费者可以订阅一个或多个主题(topic),并从broker拉取数据,从而消费这些已发布的消息。
安装kafka
kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装kafka之前必须先安装zookeeper
1. 安装zookeeper
docker run -d \ --name zookeeper \ --restart always \ -p 2181:2181 \ zookeeper:3.4.14
|
2. 安装kafka
docker run -d --name kafka \ --restart always \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.140.102 \ --env KAFKA_ZOOKEEPER_CONNECT=192.168.140.102:2181 \ --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.140.102:9092 \ --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \ --net=host \ wurstmeister/kafka:2.12-2.3.1
|
--net=host
:直接使用容器宿主机的网络命名空间,没有独立的网络环境。使用宿主机的ip和端口,也可以使用-p 9092:9092
代替
3. 项目集成kafka
- 导入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
|
- 生产者:
public class ProducerQuickStart { public static void main(String[] args) { Properties prop = new Properties(); prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.140.102:9092"); prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer(prop);
ProducerRecord<String, String> record = new ProducerRecord<>("topic-first", "key-001", "hello kafka"); producer.send(record); producer.close(); } }
|
- 消费者:
public class ConsumerQuickStart { public static void main(String[] args) { Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.140.102:9092"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singletonList("topic-first"));
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { String key = consumerRecord.key(); String value = consumerRecord.value(); System.out.println(key + ":" + value); } } } }
|
如果两个消费者在同一个消费者组
内,此时就算只有多个消费者,那么只能有一个消费者收到消息(一对一)

如果两个消费者在不同消费组
内,此时所有消费者都可以收到消息(一对多)

分区机制
kafka的分区机制是将每个主题(topic)划分成多个分区(Partition)
作用:这样就可以把数据存储到不同的机器上,并且机器上可以指定不同的分区。

每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。每个分区下都维护了一个连续自增的数值,用来标记消息在什么位置【偏移量(offset)】。

在发送消息的时候可以指定分区:
ProducerRecord<String, String> record = new ProducerRecord<>("topic-first", 0, "key-001", "hello kafka");
|
分区策略

不指定分区,默认是轮询,也可以指定分区进行存储。
Kafka的高可用设计
集群
Kafka的服务器由broker服务的进程构成,一个kafka集群由多个broker组成。
如果集群中某一台机器宕机了,其他机器上的broker也能够对外提供服务。

备份机制
kafka中消息的备份又叫做副本(Replica)
领导者副本:Leader Replica
追随者副本:Follower Replica
- ISR:同步备份,leader保存数据后,会立即同步到ISR中
- 普通:异步备份

leader挂了之后,会选出新的leader,会优先从ISR中选;如果ISR的副本都不行了,就从普通副本里选
【极端情况】:所有副本都失效了
【解决方案】:
方案1:等待ISR中的第一个醒过来,选为Leader,数据可靠,但是活过来的时间不确定
方案2:选择第一个活过来的副本,不一定是ISR中的,选为Leader,这样可以最快速度恢复可用性,但不一定数据完整。
生产者
发送类型
同步发送
使用send()方法发送,会返回一个Future对象,调用get()方法进行等待就知道消息是否发送成功。
ProducerRecord<String, String> record = new ProducerRecord<>("topic-first", 0, "key-001", "hello kafka");
RecordMetadata recordMetadata = producer.send(record).get(); System.out.println(recordMetadata.offset());
|
异步发送
调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数。
ProducerRecord<String, String> record = new ProducerRecord<>("topic-first", 0, "key-001", "hello kafka");
producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e != null) { System.out.println("记录一次信息到日志表中"); } System.out.println(recordMetadata.offset()); } });
|
连接参数配置信息
必须配置的连接参数
Properties prop = new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.140.102:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
|
ack消息确认机制

Properties prop = new Properties(); prop.put(ProducerConfig.ACKS_CONFIG, "all");
|
一般开发中不会设置ack的值,使用默认值。
retries消息重试机制
Properties prop = new Properties(); prop.put(ProducerConfig.RETRIES_CONFIG, 10);
|
compression消息压缩

Properties prop = new Properties(); prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
|
使用压缩可以降低网络传输开销和存储开销,这个往往是kafka发送消息的瓶颈所在。
消费者
消费者组
消费者组:由一个或多个消费者组成的群体

一个发布在topic上的消息被分发到消费者组中的一个消费者
- queue模型:所有消费者都在一个组里
- 发布-订阅模型:所有消费者都在不同的组中
消息的有序性
应用场景:
- 即时消息中的一对一聊天和群聊,保证发送方消息的发送顺序与接收方的接收顺序一致。
- 充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序。

topic分区中的消息只能由消费者组中唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。
但是只能保证topic的一个分区顺序处理,不能保证跨分区的消息先后处理。如果想要顺序的处理topic的所有消息,就只能提供一个分区。
提交和偏移量
kafka不会像其他消息队列一样需要等到消费者的确认,消费者可以使用kafka来追踪消息在分区的位置(偏移量),消费者会往_consumer_offeset
的特殊主题发送消息,消息里边包含了每个分区的偏移量。如果消费者发送崩溃,或又有新的消费者加入群组,就会触发再均衡
。
偏移量(Offset) 是消费者消费消息的核心机制之一,它记录了消费者在分区(Partition)中的消费进度,没有这个kafka不知道消费到哪里,也是在同一个分区下消息的消费会有顺序的原因
最开始的时候,消费者1、2、3各自负责各自的分区

此时如果消费者2发生了故障

分区3、4没有消费者管理,此时就会触发再均衡
,由消费者1和3分别接管这两个分区

【存在的问题】:
- 如果
提交的偏移量 < 客户端处理的最后一个消息的偏移量
,那么处于两个偏移量之间的消息就会被重复处理

- 如果
提交的偏移量 > 客户端的最后一个消息的偏移量
,那么处于两个偏移量之间的消息将会消失。(kafka默认情况下是拉取完数据就提交,实际上当获取到数据的时候,offset就提交了。但是业务还没有处理到offset的位置,就发生了故障。所以再均衡的时候,这部分数据就会发生丢失)

偏移量的提交方式
自动提交偏移量
当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5s消费者会自动把从poll()方法接收到的最大偏移量提交上去
手动提交偏移量
当enable.auto.commit被设置成false,此时可以改成以下三种提交方式:
Properties prop = new Properties();
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
|
- 提交当前偏移量(同步提交)
commitSync
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.key() + ":" + consumerRecord.value()); System.out.println("偏移量:" + consumerRecord.offset()); try { consumer.commitSync(); } catch (CommitFailedException e) { System.out.println("记录提交失败的异常:" + e); } } }
|
阻塞操作,等待Broker确认提交结果。性能较低,但能确保提交成功或抛出异常。(如果失败了,他会一直重试直到成功)
- 异步提交
commitAsync
while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.key() + ":" + consumerRecord.value()); System.out.println("偏移量:" + consumerRecord.offset()); } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if(e != null) { System.out.println("记录错误提交的偏移量:" + map + "异常信息为:" + e); } } }); }
|
非阻塞操作,提交后立即返回,不等待Broker确认。性能更高,但无法保证提交成功(如果服务器返回提交失败后,异步提交不会进行重试)
【常见问题】:
- 同时有多个异步提交,可能会导致偏移量offset的覆盖。
- 消费者崩溃,最后一次异步提交可能未完成,导致偏移量未更新。
- 若Broker返回提交错误(如CommitFailedException),异步提交无法直接处理(需通过回调)。
- 同步和异步组合提交
try { while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.key() + ":" + consumerRecord.value()); System.out.println("偏移量:" + consumerRecord.offset()); } consumer.commitAsync(); } } catch (CommitFailedException e) { e.printStackTrace(); System.out.println("记录错误的信息:" + e); }finally { try { consumer.commitSync(); } finally { consumer.close(); } }
|
会先进行异步提交,如果提交失败了,就记录日志,然后再进行同步的提交。
【正常流程中使用异步提交】:在消息处理循环中,优先使用commitAsync以提高吞吐量。即使某次提交失败,后续提交可能成功,避免阻塞处理。
【退出时使用同步提交】:在消费者关闭前(如finally块),使用commitSync确保最后一次提交成功。因为此时消费者即将终止,必须保证偏移量的正确性,避免重复消费。
SpringBoot集成kafka收发消息
传递普通类型的消息
- 导入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
|
- 配置kafka信息:
spring: application: name: kafka-demo kafka: bootstrap-servers: 192.168.140.102:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
|
- 消息生产者
@Autowired private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("/hello") public String hello(){
kafkaTemplate.send("name-topic","xiaolin0333"); return "ok"; }
|
- 消息消费者
@Component public class HelloListener { @KafkaListener(topics = "name-topic") public void onMessage(String message){ if(!StringUtils.isEmpty(message)){ System.out.println(message); }
} }
|
传递对象类型的消息
目前springboot整合后的kafka,因为序列化器是StringSerializer,此时如果需要传递对象有两种方式:
方式1:自定义序列化器(但是对象类型众多,通用性不强)
方式2:把要传递的对象转成json字符串,接收消息后再转为对象
- 添加json序列化依赖:
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency>
|
- 消息生产者
@Autowired private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("/hello") public String hello() { User user = new User("xiaolin", 18); kafkaTemplate.send("user-topic", JSON.toJSONString(user)); return "ok"; }
|
- 消息消费者
@KafkaListener(topics = "user-topic") public void onMessageUser(String userJson) { if(!StringUtils.isEmpty(userJson)) { User user = JSON.parseObject(userJson, User.class); System.out.println(user); } }
|
Kafka实战——自媒体文章上下架
Kafka和OpenFeign
当自媒体微服务
中的文章需要上架或下架时,需要通知文章微服务
【使用OpenFeign的问题】:
- 同步阻塞:OpenFeign 是基于 HTTP 的同步远程调用,自媒体微服务必须等待文章微服务的响应才能继续后续逻辑。
- 性能瓶颈:如果文章微服务响应慢或不可用,自媒体微服务会被阻塞,导致接口延迟甚至超时。
- 耦合性高:两个服务直接依赖,任何一方的故障都可能影响对方。
【改进方案】:自媒体微服务不再通过OpenFeign调用文章微服务,而是将文章上下架事件发送到 Kafka 消息队列,由文章微服务异步消费处理。从而达到系统的解耦。
【适用场景】:
- 适合 Kafka 的场景:
- 不需要实时响应的操作(如状态同步、日志记录、通知)。
- 高并发、高吞吐量的场景(如秒杀活动、批量任务)。
- 适合 OpenFeign 的场景:
- 需要实时获取结果的场景(如支付确认、查询订单状态)。
需求分析
【需求】:已发表且已下架的文章可以上架;已发表且已上架的文章可以下架。

自媒体微服务是生产者,文章微服务是消费者,自媒体微服务向kafka发送修改文章状态的任务,文章微服务从kafka里取出任务后消费。