# kafka-demo **Repository Path**: chen_qiang_dev/kafka-demo ## Basic Information - **Project Name**: kafka-demo - **Description**: kafka练习 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2020-05-13 - **Last Updated**: 2020-12-19 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 第一章 什么是kafka 1. 是由Scala语言编写 2. 需要依赖ZooKeeper 3. 是一个分布式的发布-订阅系统 4. 支持多分区、多副本、水平扩展、流处理 5. 将消息持久化到磁盘中 ![官网架构](http://kafka.apache.org/25/images/kafka-apis.png) ## 1. 特性 - 高吞吐量低延迟 - 可扩展性 - 持久性 - 容错性 - 高并发 ## 2. 使用场景 - 日志收集 - 消息系统 - 用户活动跟踪 - 运营指标 - 流式处理 ## 3. 技术优势 - 可伸缩性 - 容错性和可靠性 - 吞吐量 # 第二章 概念 ![架构图](https://ss0.bdstatic.com/70cFvHSh_Q1YnxGkpoWK1HF6hhy/it/u=3976122854,3162418718&fm=26&gp=0.jpg) ## 1. producer - 生产者 - 将数据发布到kafka的topic中 ## 2. consumer - 消费者 - 从topic中读取数据 - 可以消费多个topic中的数据 ## 3. topic - 主题 - 在kafka中使用topic划分数据的所属类 ## 4. partition - 分区 - topic中的数据划分为一个或多个partition - 每个topic至少有一个partition - 每个partition中的数据使用多个segment文件存储 - partition中的数据是有序的 - 多个partition间的数据不能保证顺序性 ## 5. offset - 位置下标 - 指明当前消费的消息的位置 ## 6. replication - 副本 - replication是一个分区的备份 - 不会被消费者消费 ## 7. broker - 节点 - kafka服务器集群中的节点 ## 8. leader - 主 - 当有多个partition时,只有一个leader - leader负责数据的读写 ## 9. follower - 备 - 备份leader的数据 - leader失效,则从follower中选举一个新的leader - leader会将卡顿或挂掉的follower删除,创建一个新的follower ## 10. zookeeper - 负责维护和协调broker - 当kafka集群中有broker失效,由zk负责同志producer和consumer - producer和consumer依据zk中broker的状态协调数据的发布和订阅 ## 11. assigned replicas - 分区中所有副本统称为AR ## 12. in sync replicas - 所有与leader部分保持一个程度的副本组成ISR ## 13. out of sync replicas - 与leader副本同步滞后过多的副本 ## 14. high watermark - 高水位 - 标识了一个特定的offset - 消费者只能拉取到这个offset之前的消息 ## 15. log end offset - 日志末端位置 - 记录了该副本底层日志中下一条消息的位移值 # 第三章 安装与配置 - 安装JDK - 安装ZooKeeper - 安装KafKa # 第四章 生产者 - 参见producer分支代码 ## 1. 序列化器 消息要到网络上进行传输,必须进行序列化,序列化器的作用就是如此 kafka提供了默认的字符串序列化器 - org.apache.kafka.common.serialization.StringSerializer - org.apache.kafka.common.serialization.IntegerSerializer - org.apache.kafka.common.serialization.BytesSerializer ## 2. 分区器 kafka本身有自己的分区策略,如果未指定,就会使用默认的分区策略 kafka根据传递消息的key来进行分区的分配 默认分区参考:org.apache.kafka.clients.producer.internals.DefaultPartitioner ```java public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { return Utils.toPositive(nextValue) % numPartitions; } } else { return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } ``` ## 3. 拦截器 拦截器是一个比较新的功能,主要用于实现客户端的定制化逻辑 生产者拦截器可以在消息发送前对消息做一些处理 - 按照某种规则过滤不符合要求的消息 - 修改消息内容 - 统计类的需求 参见:org.apache.kafka.clients.producer.ProducerInterceptor ## 4. 其他相关参数 ### 1. acks 该参数用来指定分区中必须有多少个副本收到这条消息,之后生产者才会认为这条消息是写入成功的 acks是生产者客户端中非常重要的一个参数,设计到消息的可靠性和吞吐量之间的权衡 - 等于0时,生产者在成功写入消息前不会等待任何来自服务器的响应。如果出现问题生产者是感知不到的,消息就丢失了。不过因为不需要等待服务器响应,所以它的吞吐量是最高的 - 等于1时,只要集群的leader节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息没有到达leader节点,生产者会受到一个错误响应。为了避免消息丢失,生产者会重新发送消息。但是这样有可能导致数据丢失,如果收到写成功通知,此时leader节点还没有同步到follower节点,而leader节点崩溃了,就会导致数据的丢失 - 等于-1时,只有当所有节点都收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,但吞吐量相对较低 acks参数为字符串,而不是整数 ```java Properties properties = new Properties(); properties.put(ProducerConfig.ACKS_CONFIG, "0"); ``` ### 2. retries 消息发送失败后的重试次数 默认情况下,生产者每次重试之间等待100ms,可以通过设置`retry.backoff.ms`参数来修改时间间隔 ### 3. batch.size 当多个消息被发送发送到同一个分区时,生产者会把他们放进同一个批次。该参数指定了一个批次可以使用的内存大小,按照字节数计算而不是消息个数。批次填满、半满、甚至只包含一条消息都可能被直接发送,所以就算把它设置的很大,也不会造成延时,只是会多占用些内存而已,如果设置太小,生产者会因为频繁发送消息而增加一些额外开销 ### 4. max.request.size 该参数用于控制生产者发送的请求的大小,它可以指定能发送的单个消息的最大值,也可以指定单个请求中所有消息的总大小。broker对可接受的消息最大值也有限制`message.max.size`,所以两边匹配是最好的,避免消息过大被broker拒绝 ## 5. 总结 详细代码演示可以参考项目的producer分支 # 第五章 消费者 参见consumer分支 ## 1. 消费者和消费组 消费者是消费组中的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者都会收到不同分区的消息 只需要写入一次消息,支持任意多应用来读取消息,每个应用都可以读到全量的消息,为了使得每个消费者都读到全量的消息,应用需要有不同的消费组 ## 2. 消息接收 参见consumer分支 ## 3. 位移提交 对于kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中的位置 当我们调用poll方法时,该方法返回我们没有消费的消息 当消息从broker发送给消费者时,broker并不跟踪这些消息是否被消费者接收到,kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移的方式成为提交 - 重复消费 - 消息丢失 - 自动提交 - 同步提交 手动提交的缺点是,当发起提交时应用会阻塞 参见代码:com.huhu.demo.commit.ConsumerWithCommit - 异步提交 异步提交的缺点是,如果服务器返回提交失败,异步提交不会进行重试,异步提交不实现重试的原因是防止位移覆盖 ## 4. 指定位移消费 目前为止,对于开发者来说无法精确掌握其消费的起始位置,seek方法正好提供了这个功能,可以用来做消息的回溯 ## 5. 再均衡监听器 再均衡是指分区的所属从一个消费者转移到另一个消费者的行为,它为消费者组的高可用性和伸缩性提供了保障,使得我们即方便又安全的对消费者组内的成员进行添加和删除,不过再均衡期间消费者是无法拉取消息的 参见代码:com.huhu.demo.rebalance.CommitSyncInRebalance