# kafka **Repository Path**: java_wangyin/kafka ## Basic Information - **Project Name**: kafka - **Description**: No description available - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-03-25 - **Last Updated**: 2021-03-31 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## Kafka 学习记录 #### 使用场景 + 日志收集 + 消息系统 + 用户活动跟踪 + 运营指标 + 流式处理 #### 技术优势 + 可伸缩 + 容错性和可靠性 + 吞吐量 #### 概念 + Producer 消息发布者 + Consumer 消息消费者 + Topic 主题 + 每个主题可能分布在多个分区 + Partition 分区(生产者生产消息可以指定分区,如果不指定分区,分区器就根据键来计算分区) + 分区器:算法 + Offset 偏移量:消费者根据偏移量找到指定分区指定 + 一个分区里每个消息的偏移量是唯一的 + 消费者只能顺序读取 + broker : 一个broker里面可以存在多个主题 + broker之间会互相复制消息做冗余备份 + Leader broker + Follower broker + Zookeeper + Zookeeper负责维护和协调broker,当kafka系统中新增了broker或者某个broker 发生故障失效时,由Zookeeper通知生产者和消费者。生产者和消费者依据Zookeeper的borker状态信息与broker协调数据的发步和订阅任务。 + AR(Assigned Replicas) + 分区中所有的副本统称AR + ISR(In-Sync Replicas) + 所有与Leader部分保持一定程度的副本组成ISR #### 安装配置 + http://kafka.apache.org/documentation/#quickstart + JDK1.8 + NOTE: Your local environment must have Java 8+ installed. + Zookeeper + https://zookeeper.apache.org/releases.html + apache-zookeeper-3.7.0-bin.tar.gz + 配置文件 + apache-zookeeper-3.7.0-bin/conf/zoo_sample.cfg + ~~~shell cp zoo_sample.cfg zoo.cfg ~~~ ~~~properties # The number of milliseconds of each tick # zk服务器的心跳时间 tickTime=2000 # The number of ticks that the initial # synchronization phase can take # 投票选举新Leader的初始化时间 initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. # 数据目录 dataDir=/usr/local/tools/apache-zookeeper-3.7.0-bin/data #日志目录 dataLogDir=/usr/local/tools/apache-zookeeper-3.7.0-bin/logs # the port at which the clients will connect # 对外服务端口 clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 ## Metrics Providers ~~~ + 启动Zk:/apache-zookeeper-3.7.0-bin/bin/zkServer.sh start ~~~shell [root@localhost bin]# ./zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/tools/apache-zookeeper-3.7.0-bin/bin/../conf/zoo.cfg Starting zookeeper ... STARTED ~~~ + Kafka安装 + http://kafka.apache.org/downloads + https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka-2.7.0-src.tgz + https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka_2.13-2.7.0.tgz + tar -zxvf kafka_2.13-2.7.0.tgz + 配置文件 + config/server.properties ~~~properties # The id of the broker. This must be set to a unique integer for each broker. # broker编号 如果有多个broker实例,那么编号不能重复 broker.id=0 # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 # broker 对外提供的服务入口地址 listeners=PLAINTEXT://:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://10.0.1.28:9092 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/usr/local/tools/kafka_2.13-2.7.0/logs ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. # Zookeeper 注册中心地址 zookeeper.connect=127.0.0.1:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=18000 ~~~ + 启动 ~~~shell ./bin/kafka-server-start.sh ./config/server.properties ~~~ + 命令测试 + 创建主题 ~~~shell [root@localhost kafka_2.13-2.7.0]#./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic wangyin --partitions 2 --replication-factor 1 Created topic wangyin. --zookeeper:指定了Kafka所连接的Zookeeper服务地址 --topic : 指定了所要创建主题的名称 --partitions : 指定了分区个数 --replication-factor : 指定了副本因子 --create : 创建主题的动作指令 ~~~ + 查看主题信息 ~~~shell [root@localhost kafka_2.13-2.7.0]# ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list wangyin [root@localhost kafka_2.13-2.7.0]# ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe Topic: wangyin PartitionCount: 2 ReplicationFactor: 1 Configs: Topic: wangyin Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: wangyin Partition: 1 Leader: 0 Replicas: 0 Isr: 0 ~~~ + 消费者接受消息 ~~~shell [root@localhost kafka_2.13-2.7.0]# ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic wangyin Hello Kafka! ...等待消息... --bootstrap-server : 指定kafka集群地址 --topic : 指定接收那个主题的消息 ~~~ + 生产者生产消息 ~~~shell [root@localhost kafka_2.13-2.7.0]# ./bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic wangyin >Hello Kafka! > --broker-list : 指定了连接的kafka集群地址 --topic : 指定了发送消息时的主题 ~~~ + 分区器 算法 + org.apache.kafka.common.utils.Utils#murmur2 + ```java Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; ``` ## Java Kafka Client ### Producer 生产者 + 生产者 + cn.mrwangyin.kafka.producer.ProducerFastStart + 消费者 + cn.mrwangyin.kafka.consumer.ConsumerFastStart + 自定义序列化 + cn.mrwangyin.kafka.serializer.ObjectSerializer + cn.mrwangyin.kafka.serializer.ObjectDeserializer + cn.mrwangyin.kafka.serializer.SerializeUtil + 自定义分区器 + cn.mrwangyin.kafka.partitioner.DefinePartitioner #### 拦截器 + org.apache.kafka.clients.producer.ProducerInterceptor + cn.mrwangyin.kafka.interceptor.ProducerInterceptorPrefix ~~~java //使用自定义拦截器 properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptorPrefix.class.getName()); ~~~ #### Producer ack 参数 ~~~java //指定ACKS properties.put(ProducerConfig.ACKS_CONFIG, "-1"); ~~~ + ack=1, 默认值,简单来说就是,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。这里有一个地方需要注意,这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。 + ack=1的情况下,producer只要收到分区leader成功写入的通知就会认为消息发送成功了。如果leader成功写入后,还没来得及把数据同步到follower节点就挂了,这时候消息就丢失了。 + ack=0,简单来说就是,producer发送一次就不再发送了,不管是否发送成功。 + ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了 + 安全,效率低 #### Producer retries 参数 + 默认:Integer.MAX_VALUE + ```java configs.put(RETRIES_CONFIG, userConfiguredRetries ? this.getInt(RETRIES_CONFIG) : Integer.MAX_VALUE); ``` + 重试次数,生产者收到Kafka的错误可能是临时性的,当达到retries次数时,生产者会放弃重试,并返回错误。默认每次重试间隔时间为100ms,可以通过retry.backoff.ms设置 #### Producer batch.size + 单位字节,默认16384 + 批量发送的消息大小,生产者会把将要发送到同一分区的消息,放在同一批次里,该参数指定了一个批次所占的内存大小,并不是消息个数。不过生产者并不一定会等到批次被填满后才发送,甚至只包含一个消息也会发送,就算把batch.size设置很大,也不会造成延迟,只会占用更多的内存,如果设置太小,可能会频繁发送而增加额外开销。 #### Producer max.request.size + 单位字节:默认1024*1024 ~~~java .define(MAX_REQUEST_SIZE_CONFIG, Type.INT, 1024 * 1024, atLeast(0), Importance.MEDIUM, MAX_REQUEST_SIZE_DOC) ~~~ + 该参数用于空值生产者发送请求的大小,它可以指定单个消息的最大值,也可以指单个请求所以消息的总大小。broker可以对接收消息最大值也有限制:message.max.size,所以两边最好一致! ### Consumer 消费者 #### 消费组 group id + 必填项 #### 订阅主题 + 可以订阅多个主题 ~~~java KafkaConsumer consumer = new KafkaConsumer(properties); consumer.subscribe(Collections.singletonList(TOPIC)); ~~~ + 正则表达式匹配Topic ~~~java //以topic开头的主题 consumer.subscribe(Pattern.compile("topic*")); ~~~ + 指定订阅的分区 ~~~java consumer.assign(Arrays.asList(new TopicPartition("topic01",0))); public TopicPartition(String topic, int partition) { this.partition = partition; this.topic = topic; } ~~~ #### 反序列化 ~~~java properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ObjectDeserializer.class); ~~~ #### 位移提交 ~~~java 消费者自己维护消息偏移量(offset), 消费者向Kafka 获取(poll)消息时,Kafka会将未被消费的消息交给消费者,Kafka自身并不管理该消息是否真的被消费了,消息是否被消费,是由消费者自己来维护offset的,默认情况通过消费者定期的commit提交给Kafka那些消息被消费了,这种情况就会导致,其他的服务可能重复消费该消息。 消息丢失:当一个消费者poll消息后,还未实际消费消息,就遇到了commit提交,就更新了这批消息的offset,这时该消费者宕机了,导致消息未被消费,但是在Kafka上的消息已被标记为以消费! ~~~ + 默认情况,消费者自动提交offset + enable.auto.commit=true + 默认每隔5秒提交一次offset + auto.commit.interval.ms 可以设置 + 重复消费可能需要客户端自己处理 + 关闭自动提交 ~~~java //关闭自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); KafkaConsumer consumer = new KafkaConsumer(properties); consumer.subscribe(Collections.singletonList(TOPIC)); while (true) { //1秒钟取一次 ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { System.out.println(record.value()); } //手动提交 consumer.commitSync(); } ~~~