# jtcl-kafka-starter **Repository Path**: stonelife/jtcl-kafka-starter ## Basic Information - **Project Name**: jtcl-kafka-starter - **Description**: 基于spring-kafka的增强, 由于特殊业务需要实现kafka的动态更新,而进行的封装 - **Primary Language**: Java - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 5 - **Forks**: 7 - **Created**: 2021-08-22 - **Last Updated**: 2024-04-17 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # jtcl-kafka-starter #### 介绍 基于spring-kafka的增强, 由于特殊业务需要实现kafka的动态更新,而进行的封装 #### 安装教程 1. pom依赖 ```xml jtcl.tech jtcl-kafka-starter 0.0.1-SNAPSHOT ``` #### 使用说明 1. 修改kafka配置 1. 初始化阶段可通过实现KafkaInitListener进行配置覆盖 ```java @Component public class KafkaInitializing implements KafkaInitListener { @Override public KafkaConfigDTO updateConfig(KafkaConfigDTO config) { return config; } } ``` 2. 可通过ConsumerBatchListener.config(KafkaConfigDTO config)进行配置覆盖 需注意,在项目启动时进行覆盖可即时生效,运行过程中动态表更,则需要注入KafkaFactory的bean,调用其init(),刷新配置,刷新时会重建监听; 2. 批量监听 实现ConsumerBatchListener接口 ```java @KafkaConsumerListener public class ExternalInputDataConsumerListener implements ConsumerBatchListener { public KafkaConfigDTO kafkaConfigDTO = new KafkaConfigDTO(); @Override public KafkaConfigDTO config(KafkaConfigDTO config) { config.setBootstrapServers(StringUtils.isNotBlank(kafkaConfigDTO.getBootstrapServers()) ? kafkaConfigDTO.getBootstrapServers() : config.getBootstrapServers()); config.setTopics(kafkaConfigDTO.getTopics().length != 0 ? kafkaConfigDTO.getTopics() : config.getTopics()); return config; } @Override public void onMessage(List> data, Acknowledgment acknowledgment, Consumer consumer) { System.out.println("数据来自topic:" + data.get(0).topic() + ",分区:" + data.get(0).partition()); } } ``` #### 参与贡献