# jtcl-kafka-starter
**Repository Path**: stonelife/jtcl-kafka-starter
## Basic Information
- **Project Name**: jtcl-kafka-starter
- **Description**: 基于spring-kafka的增强,
由于特殊业务需要实现kafka的动态更新,而进行的封装
- **Primary Language**: Java
- **License**: MulanPSL-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 5
- **Forks**: 7
- **Created**: 2021-08-22
- **Last Updated**: 2024-04-17
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# jtcl-kafka-starter
#### 介绍
基于spring-kafka的增强, 由于特殊业务需要实现kafka的动态更新,而进行的封装
#### 安装教程
1. pom依赖
```xml
jtcl.tech
jtcl-kafka-starter
0.0.1-SNAPSHOT
```
#### 使用说明
1. 修改kafka配置
1. 初始化阶段可通过实现KafkaInitListener进行配置覆盖
```java
@Component
public class KafkaInitializing implements KafkaInitListener {
@Override
public KafkaConfigDTO updateConfig(KafkaConfigDTO config) {
return config;
}
}
```
2. 可通过ConsumerBatchListener.config(KafkaConfigDTO config)进行配置覆盖
需注意,在项目启动时进行覆盖可即时生效,运行过程中动态表更,则需要注入KafkaFactory的bean,调用其init(),刷新配置,刷新时会重建监听;
2. 批量监听
实现ConsumerBatchListener接口
```java
@KafkaConsumerListener
public class ExternalInputDataConsumerListener implements ConsumerBatchListener {
public KafkaConfigDTO kafkaConfigDTO = new KafkaConfigDTO();
@Override
public KafkaConfigDTO config(KafkaConfigDTO config) {
config.setBootstrapServers(StringUtils.isNotBlank(kafkaConfigDTO.getBootstrapServers()) ?
kafkaConfigDTO.getBootstrapServers() : config.getBootstrapServers());
config.setTopics(kafkaConfigDTO.getTopics().length != 0 ?
kafkaConfigDTO.getTopics() : config.getTopics());
return config;
}
@Override
public void onMessage(List> data, Acknowledgment acknowledgment, Consumer, ?> consumer) {
System.out.println("数据来自topic:" + data.get(0).topic() + ",分区:" + data.get(0).partition());
}
}
```
#### 参与贡献