# Kafka-Demo **Repository Path**: stefanie-zy/kafka-demo ## Basic Information - **Project Name**: Kafka-Demo - **Description**: No description available - **Primary Language**: Java - **License**: Not specified - **Default Branch**: stefanie-zy - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2023-11-30 - **Last Updated**: 2024-01-03 ## Categories & Tags **Categories**: Uncategorized **Tags**: Kafka ## README ## Kafka消息中间件 > 常见的消息中间件:ActiveMQ、Kafka、RabbitMQ、RocketMQ、ZeroMQ > > - Kafka:全球性能最好的中间件 > - RabbitMQ:内部可玩性较强,功能强大 > - RocketMQ:阿里开源,性能和Kafka相似 Kafka功能: 1. 日志收集 2. 消息系统 3. 用户活跃度跟踪 4. 运营指标 #### 1、流派 > ​ broker:消息中转。 ##### 1、有Broker > - 重Topict:过度依赖topic进行消息中转。 > - 轻Topic:Topic只是消息中转的一个类型。 | 重Topic | 轻Topic | | :-----------------------: | :------: | | Kafka、RocketMQ、ActiveMQ | RabbitMQ | ##### 2、无Broker #### 2、Kafka基础 ##### 1、消息的生产和消费 - 生产者将消息发送给broker,broker会将信息保存在本地日志文件当中。 - 消息的保存是有序的,是通过offset偏移量描述有序性。 - 消费者消费消息也是通过offset偏移量来描述当前要消费的消息的位置。 ##### 2、单播、多播 ###### 1、单播 - 如果多个消费者在同一个消费组里边,那只有一个消费者会收订阅到消息。 - 同一个消费组中只能有一个消费者消费消息。 ###### 2、多播 - 如果多个消息组同时订阅一个消息,那么各个消息组都会受到这条消息。 - 基于1的情况,每个消息组只会有一个消费者收到消息。 ##### 3、消息组 - current-offset:最后被消费的消息偏移量 - Log-end-offset:消息总量 - Lag:积压了多少条消息 #### 3、主题和分区 > ​ 为了解决Topic消息过多导致的问题,Kafka提出分区的概念来解决。 ##### 1、分区 - 将一个Topic中的消息分区来存储: - 分区存储,解决统一存储文件过大的问题 - 提供读写的吞吐量,读写可以同时在多个分区中进行 ##### 2、日志保存内容 - 00000.log:文件中保存的是消息 - _consumer_offset-49:kafka内部创建`__consumer_offset`主题包含50个分区 #### 4、同步发送和异步发送 ##### 1、同步 > ​ 生产者发送消息给kafka/broker,kafka/broker会给生产者发送ack,如果生产者没有收到ack,则生产者会出现阻塞,阻塞3s的时间,如果还没有收到则会进行重试,重试次数为3次。 ###### ack的参数配置 - ack=0——不需要kafka-broker收到消息,会立刻返回消息给生产者。***最容易丢失消息,效率最高。*** - ack=1——多副本之间都接收到消息,且消息已经写入到log中,才会返回消息给生产者。***性能和安全性最好。*** - ack=-1/all——默认进行副本同步,,同步完成之后会返回给生产者。但是同步的副本个数取决于`min.insyn.replicas`的值(>=2)。***最安全但是性能最差。*** ##### **2、异步** > ​ 生产者发送消息给kafka/broker,kafka/broker不会给生产者发送ack,会使用回调函数查看相关信息。***可能存在消息丢失的可能*** #### 5、消息缓冲区 ```java // 缓冲区配置 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); // 线程拉取数据大小配置 properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); // 如果线程拉取不到16384[16k]的数据,间隔10ms也会将拉取到的数据发送 properties.put(ProducerConfig.LINGER_MS_CONFIG,10); ``` #### 6、消费者 > ​ 消费者消费完成数据之后需要向kafka/broker提交消费记录,用于更新kafka队列的offset数值。提交topic=consumer_offsets。 - 自动提交:消息在完全poll的情况下,消费者突然挂了,可能会导致部分消息未消费,下次一读取会直接从上次的offset位置去读取数据,从而导致数据丢失。 - 手动提交 - 手动同步提交 - 手动异步提交 #### 7、controller、rebalance、hw ##### 1、controller > ​ 每一个broker启动时都会向zk创建一个临时序号节点,获得序号最小的那个broker将会作为集成当中的controller,负责的事情如下: > > - 当集群当中有一个leader挂了,需要在集群当中重新选举leader,选举规则是从isr集合中最左边获得。 > - 集群中增加/减少broker,controller会同步信息给其他broker。 > - 集群中增加/较少分区,controller会同步信息给其他broker。 ##### 2、rebalance > ​ 消费者消费分区策略。 > > 1. 前提:消费者未指定具体的分区。 > > 2. 触发:消费组里的消费着或者分区发生变化,会触发rebalance策略。 > > 3. 策略: > 1. 触发rebalance策略之前: > - range:根据公式计算消费者消费具体的那些策略。 > - 轮询:消费者轮流消费分区信息。 > - sticky:粘合策略,需要rebalance,会在之前分配的基础上进行调整,不会改变之前的分配情况。如果这个策略没有开,那么全部都需要进行重新分配。建议开启这个策略,否则需要消耗大量的资源来调整。 ##### 3、HW和LEO > ​ HW俗称高水位,[已完成的同步位置],consumer只能消费到HW所在的位置,每一个replica都有一个HW,leader和follower各自负责更新各自的HW状态,对于leader新写入的消息,consumer不能立即消费,必须等所有的broker消息都同步完成之后(同步完成之后更新HW),才开始消费刚刚新写入的消息。这样保证在leader失效的情况,消费者仍然可以在新选举的leader中消费。 > > ​ LEO消息位置,[log-end-offset]。 #### 8、kafka优化 ##### 1、如何防止消息丢失 > - 生产者: > - 使用同步发送。 > - ack=1:保证leader收到消息才返回。 > - ack=-1/all:`min.insyn.replicas=分区个数`。 > - 消费者:将自动提交改为手动提交,保证每条消息都能被消费者消费。 ##### 2、如何防止消息重复消费 > - 触发情况: > - 生产者生成消息。 > - 消费者收到消息并进行业务处理。 > - 消费者回复生产者的过程当中遇到网络波动导致生产者未收到相关消息。 > - 生产者未收到消息会重复发送。 > > - 在业务服务下游进行去重处理。 > > - 保证数据的非幂等性。***[幂等性:多次请求的结果相同]*** > - 插入数据的时候使用联合主键的方式。**[联合主键:任意一个属性重复都会导致数据无法插入**] > - *使用分布式锁*:Redission.lock(order_id)。保证只有一个order_id。 ##### 3、如何做到顺序消费 > - 生产者: > - 在发送消息的时候设置`ack!=0`,关闭重试机制,使用同步发送,确保一条消息消费完成之后,发送其他消息,确保发送的顺序性。 > - 消费者: > - 消息发送到一个分区当中,只能有一个消费组来消费消息。 > - 备注: > - 顺序消费会牺牲很多服务性能。使用不多,涉及到顺序消费一般使用RocketMQ。 ##### 4、解决消息积压问题 > - 消息积压会导致的后果: > - 服务器磁盘占满 > - 方案: > - 消费者启动多个线程消费消息,加速消息的业务处理能力。 > - 消费者开启多个消费者,分别部署在不同的服务器上。 ##### 5、延迟队列 > - 备注:Kafka一般不做延迟队列。涉及到延迟队列一般使用RabbitMQ。 #### 9、安装教程 - [kafka安装](https://gitee.com/stefanie-zy/stefanie-docker/blob/stefanie-zy/07-Mq/02-Kafka/01-ReadMe.md) - [zookeeper安装](https://gitee.com/stefanie-zy/stefanie-docker/blob/stefanie-zy/20-Zookeeper/01-ReadMe.md) ##### 10、消费逻辑 > ​ 配置客户端参数->创建响应的消费者实例->订阅主题->拉取消息并消费->提交消费位移->关闭消费者实例