From 9b829227028684ef3d8c0e56aff8acec9f2429e5 Mon Sep 17 00:00:00 2001
From: fuxinji9527 <1992666531@qq.com>
Date: Thu, 9 May 2024 11:57:30 +0800
Subject: [PATCH 1/2] =?UTF-8?q?=E6=B3=A8=E9=87=8Akafka=E7=9B=B8=E5=85=B3?=
=?UTF-8?q?=E4=BB=A3=E7=A0=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
pom.xml | 18 +-
.../ApplicationVersionServiceImpl.java | 17 +-
.../epkgpackage/EPKGPackageServiceImpl.java | 17 +-
.../rpmpackage/RPMPackageServiceImpl.java | 17 +-
.../common/config/KafkaConsumerConfig.java | 182 ++++++------
.../easysoftware/kafka/AppPkgConsumer.java | 38 +--
.../com/easysoftware/kafka/BaseConsumer.java | 276 +++++++++---------
.../com/easysoftware/kafka/EpkgConsumer.java | 48 +--
.../java/com/easysoftware/kafka/Producer.java | 70 ++---
.../com/easysoftware/kafka/RpmConsumer.java | 64 ++--
.../easysoftware/kafka/VersionConsumer.java | 38 +--
11 files changed, 388 insertions(+), 397 deletions(-)
diff --git a/pom.xml b/pom.xml
index 509b378..232d87b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,11 +148,11 @@
1.78.1
-
- org.apache.kafka
- kafka-clients
- 3.7.0
-
+
+
+
+
+
@@ -167,10 +167,10 @@
4.5.13
-
- org.springframework.kafka
- spring-kafka
-
+
+
+
+
commons-codec
diff --git a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java
index 11f9a65..5d90fdd 100644
--- a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java
+++ b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java
@@ -6,15 +6,12 @@ import com.easysoftware.application.applicationversion.dto.InputApplicationVersi
import com.easysoftware.common.entity.MessageCode;
import com.easysoftware.common.utils.ObjectMapperUtil;
import com.easysoftware.common.utils.ResultUtil;
-import com.easysoftware.common.utils.UuidUtil;
import com.easysoftware.domain.applicationversion.ApplicationVersion;
import com.easysoftware.domain.applicationversion.gateway.ApplicationVersionGateway;
import com.easysoftware.infrastructure.applicationversion.gatewayimpl.dataobject.ApplicationVersionDO;
import com.easysoftware.infrastructure.mapper.ApplicationVersionDOMapper;
-import com.easysoftware.kafka.Producer;
import jakarta.annotation.Resource;
import org.springframework.beans.BeanUtils;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@@ -28,11 +25,11 @@ import java.util.Map;
public class ApplicationVersionServiceImpl extends ServiceImpl implements ApplicationVersionService {
- /**
- * Autowired Kafka producer for sending messages.
- */
- @Autowired
- private Producer kafkaProducer;
+// /**
+// * Autowired Kafka producer for sending messages.
+// */
+// @Autowired
+// private Producer kafkaProducer;
/**
* Topic name for the Kafka producer related to application versions.
@@ -71,8 +68,8 @@ public class ApplicationVersionServiceImpl extends ServiceImpl kafkaMsg = ObjectMapperUtil.jsonToMap(appVersion);
kafkaMsg.put("table", "ApplicationVersion");
kafkaMsg.put("unique", inputAppVersion.getName());
- kafkaProducer.sendMess(topicAppVersion + "_version",
- UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg));
+// kafkaProducer.sendMess(topicAppVersion + "_version",
+// UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg));
return ResultUtil.success(HttpStatus.OK);
}
diff --git a/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java b/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java
index 5448e04..27bd76b 100644
--- a/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java
+++ b/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java
@@ -6,17 +6,14 @@ import com.easysoftware.application.epkgpackage.dto.InputEPKGPackage;
import com.easysoftware.application.epkgpackage.vo.EPKGPackageDetailVo;
import com.easysoftware.common.utils.ObjectMapperUtil;
import com.easysoftware.common.utils.ResultUtil;
-import com.easysoftware.common.utils.UuidUtil;
import com.easysoftware.domain.epkgpackage.EPKGPackage;
import com.easysoftware.domain.epkgpackage.EPKGPackageUnique;
import com.easysoftware.domain.epkgpackage.gateway.EPKGPackageGateway;
import com.easysoftware.infrastructure.epkgpackage.gatewayimpl.dataobject.EPKGPackageDO;
import com.easysoftware.infrastructure.mapper.EPKGPackageDOMapper;
-import com.easysoftware.kafka.Producer;
import jakarta.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@@ -35,11 +32,11 @@ public class EPKGPackageServiceImpl extends
@Resource
private EPKGPackageGateway ePKGPackageGateway;
- /**
- * Kafka producer for messaging.
- */
- @Autowired
- private Producer kafkaProducer;
+// /**
+// * Kafka producer for messaging.
+// */
+// @Autowired
+// private Producer kafkaProducer;
/**
* API endpoint for repository maintainers.
@@ -86,8 +83,8 @@ public class EPKGPackageServiceImpl extends
Map kafkaMsg = ObjectMapperUtil.jsonToMap(inputEPKGPackage);
kafkaMsg.put("table", "EPKGPackage");
- kafkaProducer.sendMess(topicAppVersion + "_epkg", UuidUtil.getUUID32(),
- ObjectMapperUtil.writeValueAsString(kafkaMsg));
+// kafkaProducer.sendMess(topicAppVersion + "_epkg", UuidUtil.getUUID32(),
+// ObjectMapperUtil.writeValueAsString(kafkaMsg));
return ResultUtil.success(HttpStatus.OK);
}
diff --git a/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java b/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java
index 21c4713..9bad749 100644
--- a/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java
+++ b/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java
@@ -7,17 +7,14 @@ import com.easysoftware.application.rpmpackage.vo.RPMPackageDetailVo;
import com.easysoftware.application.rpmpackage.vo.RPMPackageDomainVo;
import com.easysoftware.common.utils.ObjectMapperUtil;
import com.easysoftware.common.utils.ResultUtil;
-import com.easysoftware.common.utils.UuidUtil;
import com.easysoftware.domain.rpmpackage.RPMPackage;
import com.easysoftware.domain.rpmpackage.RPMPackageUnique;
import com.easysoftware.domain.rpmpackage.gateway.RPMPackageGateway;
import com.easysoftware.infrastructure.mapper.RPMPackageDOMapper;
import com.easysoftware.infrastructure.rpmpackage.gatewayimpl.dataobject.RPMPackageDO;
-import com.easysoftware.kafka.Producer;
import jakarta.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.http.HttpStatus;
@@ -32,11 +29,11 @@ import java.util.Map;
@Primary
@Service("RPMPackageService")
public class RPMPackageServiceImpl extends ServiceImpl implements RPMPackageService {
- /**
- * Autowired Kafka producer.
- */
- @Autowired
- private Producer kafkaProducer;
+// /**
+// * Autowired Kafka producer.
+// */
+// @Autowired
+// private Producer kafkaProducer;
/**
* Resource for RPM Package Gateway.
@@ -106,8 +103,8 @@ public class RPMPackageServiceImpl extends ServiceImpl kafkaMsg = ObjectMapperUtil.jsonToMap(inputrPMPackage);
kafkaMsg.put("table", "RPMPackage");
- kafkaProducer.sendMess(topicAppVersion + "_rpm",
- UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg));
+// kafkaProducer.sendMess(topicAppVersion + "_rpm",
+// UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg));
return ResultUtil.success(HttpStatus.OK);
}
diff --git a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java b/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java
index 802ad70..8aa73a1 100644
--- a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java
+++ b/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java
@@ -1,91 +1,91 @@
-package com.easysoftware.common.config;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.annotation.EnableKafka;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.core.ConsumerFactory;
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@Configuration
-@EnableKafka
-public class KafkaConsumerConfig {
-
- /**
- * Bootstrap servers for Kafka connection.
- */
- @Value("${bootstrap.servers}")
- private String bootstrapServers;
-
- /**
- * Consumer group ID for Kafka consumer.
- */
- @Value("${consumer.groupId}")
- private String groupId;
-
- /**
- * SASL JAAS configuration for authentication.
- */
- @Value("${spring.kafka.properties.sasl.jaas.config}")
- private String authConfig;
-
- /**
- * SASL mechanism for authentication.
- */
- @Value("${spring.kafka.properties.sasl.mechanism}")
- private String mechanism;
-
- /**
- * Location of the SSL trust store.
- */
- @Value("${spring.kafka.properties.ssl.truststore.location}")
- private String trustStoreLocation;
-
-
- /**
- * Configures a ConsumerFactory for processing Kafka messages with String key and value types.
- *
- * @return The configured ConsumerFactory.
- */
- @Bean
- public ConsumerFactory consumerFactory() {
- Map configProps = new HashMap<>();
- configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- // add SASL_SSL config
- configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
- configProps.put(SaslConfigs.SASL_MECHANISM, mechanism);
- configProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
- configProps.put(SaslConfigs.SASL_JAAS_CONFIG, authConfig);
- configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);
-
- return new DefaultKafkaConsumerFactory<>(configProps);
- }
-
- /**
- * Configures a Kafka listener container factory for processing Kafka messages.
- *
- * @return The ConcurrentKafkaListenerContainerFactory for String key and value types.
- */
- @Bean
- public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory factory
- = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- factory.setBatchListener(true);
- return factory;
- }
-}
+//package com.easysoftware.common.config;
+//
+//import org.apache.kafka.clients.CommonClientConfigs;
+//import org.apache.kafka.clients.consumer.ConsumerConfig;
+//import org.apache.kafka.common.config.SaslConfigs;
+//import org.apache.kafka.common.config.SslConfigs;
+//import org.apache.kafka.common.security.auth.SecurityProtocol;
+//import org.apache.kafka.common.serialization.StringDeserializer;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.Configuration;
+//import org.springframework.kafka.annotation.EnableKafka;
+//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+//import org.springframework.kafka.core.ConsumerFactory;
+//import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+//
+//import java.util.HashMap;
+//import java.util.Map;
+//
+//@Configuration
+//@EnableKafka
+//public class KafkaConsumerConfig {
+//
+// /**
+// * Bootstrap servers for Kafka connection.
+// */
+// @Value("${bootstrap.servers}")
+// private String bootstrapServers;
+//
+// /**
+// * Consumer group ID for Kafka consumer.
+// */
+// @Value("${consumer.groupId}")
+// private String groupId;
+//
+// /**
+// * SASL JAAS configuration for authentication.
+// */
+// @Value("${spring.kafka.properties.sasl.jaas.config}")
+// private String authConfig;
+//
+// /**
+// * SASL mechanism for authentication.
+// */
+// @Value("${spring.kafka.properties.sasl.mechanism}")
+// private String mechanism;
+//
+// /**
+// * Location of the SSL trust store.
+// */
+// @Value("${spring.kafka.properties.ssl.truststore.location}")
+// private String trustStoreLocation;
+//
+//
+// /**
+// * Configures a ConsumerFactory for processing Kafka messages with String key and value types.
+// *
+// * @return The configured ConsumerFactory.
+// */
+// @Bean
+// public ConsumerFactory consumerFactory() {
+// Map configProps = new HashMap<>();
+// configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+// configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+// configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+// configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+//
+// // add SASL_SSL config
+// configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
+// configProps.put(SaslConfigs.SASL_MECHANISM, mechanism);
+// configProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+// configProps.put(SaslConfigs.SASL_JAAS_CONFIG, authConfig);
+// configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);
+//
+// return new DefaultKafkaConsumerFactory<>(configProps);
+// }
+//
+// /**
+// * Configures a Kafka listener container factory for processing Kafka messages.
+// *
+// * @return The ConcurrentKafkaListenerContainerFactory for String key and value types.
+// */
+// @Bean
+// public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
+// ConcurrentKafkaListenerContainerFactory factory
+// = new ConcurrentKafkaListenerContainerFactory<>();
+// factory.setConsumerFactory(consumerFactory());
+// factory.setBatchListener(true);
+// return factory;
+// }
+//}
diff --git a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java b/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java
index 6efbe8a..a2a5fdc 100644
--- a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java
+++ b/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java
@@ -1,19 +1,19 @@
-package com.easysoftware.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Service;
-
-@Service
-public class AppPkgConsumer extends BaseConsumer {
-
- /**
- * Listens for and processes ConsumerRecords of type .
- *
- * @param records The ConsumerRecords to process.
- */
- @KafkaListener(topics = "software_test_app")
- public void listen(final ConsumerRecords records) {
- dealDataToTableByBatch(records);
- }
-}
+//package com.easysoftware.kafka;
+//
+//import org.apache.kafka.clients.consumer.ConsumerRecords;
+//import org.springframework.kafka.annotation.KafkaListener;
+//import org.springframework.stereotype.Service;
+//
+//@Service
+//public class AppPkgConsumer extends BaseConsumer {
+//
+// /**
+// * Listens for and processes ConsumerRecords of type .
+// *
+// * @param records The ConsumerRecords to process.
+// */
+// @KafkaListener(topics = "software_test_app")
+// public void listen(final ConsumerRecords records) {
+// dealDataToTableByBatch(records);
+// }
+//}
diff --git a/src/main/java/com/easysoftware/kafka/BaseConsumer.java b/src/main/java/com/easysoftware/kafka/BaseConsumer.java
index 3d2dff9..7842910 100644
--- a/src/main/java/com/easysoftware/kafka/BaseConsumer.java
+++ b/src/main/java/com/easysoftware/kafka/BaseConsumer.java
@@ -1,138 +1,138 @@
-package com.easysoftware.kafka;
-
-import com.easysoftware.application.BaseIService;
-import com.easysoftware.application.ServiceMap;
-import com.easysoftware.common.utils.ObjectMapperUtil;
-import lombok.Generated;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-public class BaseConsumer {
-
- /**
- * Autowired ServiceMap instance.
- */
- @Autowired
- private ServiceMap serviceMap;
-
- /**
- * Logger for BaseConsumer class.
- */
- private static final Logger LOGGER = LoggerFactory.getLogger(BaseConsumer.class);
-
- /**
- * List to hold KafkaConsumer instances for String keys and values.
- */
- private final ArrayList> kafkaConsumerList = new ArrayList<>();
-
-
- /**
- * Custom tasks method to perform Kafka to MySQL data transfer.
- */
- // @Scheduled(fixedRate = 5000)
- @Generated
- public void tasks() {
- kafkaToMysql();
- }
-
- /**
- * Method to transfer data from Kafka to MySQL by processing ConsumerRecords.
- */
- @Generated
- public void kafkaToMysql() {
- while (true) {
- for (KafkaConsumer customer : kafkaConsumerList) {
- ConsumerRecords poll = customer.poll(Duration.ofSeconds(5));
- dealDataToTableByBatch(poll);
- customer.commitAsync();
- }
- }
- }
-
- /**
- * Processes ConsumerRecords in batches and deals with the data to insert into a table.
- *
- * @param records The ConsumerRecords to process.
- */
- // The data of a topic can only be written to the same table
- public void dealDataToTableByBatch(final ConsumerRecords records) {
- ArrayList appList = new ArrayList<>();
- BaseIService baseIService = null;
- int partition = 0;
- long offset = 0;
- long startTime = System.nanoTime();
- for (ConsumerRecord record : records) {
- String value = record.value();
- try {
- Map dtoMap = ObjectMapperUtil.toMap(value);
- String table = dtoMap.get("table").toString();
- baseIService = serviceMap.getIService(table + "Service");
- appList.add(value);
- partition = record.partition();
- offset = record.offset();
- } catch (Exception e) {
- LOGGER.error(e.getMessage() + ":" + value, e);
- }
- }
- long endTime1 = System.nanoTime();
- long duration = (endTime1 - startTime) / 1000000;
- LOGGER.info("处理records用时: " + duration + " 毫秒," + "数据量:" + appList.size());
- if (!appList.isEmpty()) {
- LOGGER.info("partation: " + partition + ", offset: " + offset);
- baseIService.saveDataObjectBatch(appList);
- }
- long endTime2 = System.nanoTime();
- duration = (endTime2 - endTime1) / 1000000;
- LOGGER.info("写入数据库用时: " + duration + " 毫秒," + "数据量:" + appList.size());
- }
-
- /**
- * Processes ConsumerRecords and deals with the data to insert into multiple tables.
- *
- * @param records The ConsumerRecords to process.
- */
- // The data of a topic may be written to multiple tables
- @Generated
- public void dealDataToMultipleTables(final ConsumerRecords records) {
- Map> resMap = new HashMap<>();
- int partition = 0;
- long offset = 0;
-
- for (ConsumerRecord record : records) {
- String value = record.value();
- try {
- Map dtoMap = ObjectMapperUtil.toMap(value);
- String table = dtoMap.get("table").toString();
-
- if (!resMap.containsKey(table)) {
- resMap.put(table, new ArrayList<>());
- }
-
- ArrayList tmp = resMap.get(table);
- tmp.add(value);
- resMap.put(table, tmp);
-
- partition = record.partition();
- offset = record.offset();
- } catch (Exception e) {
- LOGGER.error(e.getMessage() + ": " + value, e);
- }
- }
- resMap.forEach((table, values) -> {
- if (!values.isEmpty()) {
- serviceMap.getIService(table + "Service").saveDataObjectBatch(values);
- }
- });
- LOGGER.info("Partition: " + partition + ", Offset: " + offset);
- }
-
-}
+//package com.easysoftware.kafka;
+//
+//import com.easysoftware.application.BaseIService;
+//import com.easysoftware.application.ServiceMap;
+//import com.easysoftware.common.utils.ObjectMapperUtil;
+//import lombok.Generated;
+//import org.apache.kafka.clients.consumer.ConsumerRecord;
+//import org.apache.kafka.clients.consumer.ConsumerRecords;
+//import org.apache.kafka.clients.consumer.KafkaConsumer;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//import org.springframework.beans.factory.annotation.Autowired;
+//
+//import java.time.Duration;
+//import java.util.ArrayList;
+//import java.util.HashMap;
+//import java.util.Map;
+//
+//public class BaseConsumer {
+//
+// /**
+// * Autowired ServiceMap instance.
+// */
+// @Autowired
+// private ServiceMap serviceMap;
+//
+// /**
+// * Logger for BaseConsumer class.
+// */
+// private static final Logger LOGGER = LoggerFactory.getLogger(BaseConsumer.class);
+//
+// /**
+// * List to hold KafkaConsumer instances for String keys and values.
+// */
+// private final ArrayList> kafkaConsumerList = new ArrayList<>();
+//
+//
+// /**
+// * Custom tasks method to perform Kafka to MySQL data transfer.
+// */
+// // @Scheduled(fixedRate = 5000)
+// @Generated
+// public void tasks() {
+// kafkaToMysql();
+// }
+//
+// /**
+// * Method to transfer data from Kafka to MySQL by processing ConsumerRecords.
+// */
+// @Generated
+// public void kafkaToMysql() {
+// while (true) {
+// for (KafkaConsumer customer : kafkaConsumerList) {
+// ConsumerRecords poll = customer.poll(Duration.ofSeconds(5));
+// dealDataToTableByBatch(poll);
+// customer.commitAsync();
+// }
+// }
+// }
+//
+// /**
+// * Processes ConsumerRecords in batches and deals with the data to insert into a table.
+// *
+// * @param records The ConsumerRecords to process.
+// */
+// // The data of a topic can only be written to the same table
+// public void dealDataToTableByBatch(final ConsumerRecords records) {
+// ArrayList appList = new ArrayList<>();
+// BaseIService baseIService = null;
+// int partition = 0;
+// long offset = 0;
+// long startTime = System.nanoTime();
+// for (ConsumerRecord record : records) {
+// String value = record.value();
+// try {
+// Map dtoMap = ObjectMapperUtil.toMap(value);
+// String table = dtoMap.get("table").toString();
+// baseIService = serviceMap.getIService(table + "Service");
+// appList.add(value);
+// partition = record.partition();
+// offset = record.offset();
+// } catch (Exception e) {
+// LOGGER.error(e.getMessage() + ":" + value, e);
+// }
+// }
+// long endTime1 = System.nanoTime();
+// long duration = (endTime1 - startTime) / 1000000;
+// LOGGER.info("处理records用时: " + duration + " 毫秒," + "数据量:" + appList.size());
+// if (!appList.isEmpty()) {
+// LOGGER.info("partation: " + partition + ", offset: " + offset);
+// baseIService.saveDataObjectBatch(appList);
+// }
+// long endTime2 = System.nanoTime();
+// duration = (endTime2 - endTime1) / 1000000;
+// LOGGER.info("写入数据库用时: " + duration + " 毫秒," + "数据量:" + appList.size());
+// }
+//
+// /**
+// * Processes ConsumerRecords and deals with the data to insert into multiple tables.
+// *
+// * @param records The ConsumerRecords to process.
+// */
+// // The data of a topic may be written to multiple tables
+// @Generated
+// public void dealDataToMultipleTables(final ConsumerRecords records) {
+// Map> resMap = new HashMap<>();
+// int partition = 0;
+// long offset = 0;
+//
+// for (ConsumerRecord record : records) {
+// String value = record.value();
+// try {
+// Map dtoMap = ObjectMapperUtil.toMap(value);
+// String table = dtoMap.get("table").toString();
+//
+// if (!resMap.containsKey(table)) {
+// resMap.put(table, new ArrayList<>());
+// }
+//
+// ArrayList tmp = resMap.get(table);
+// tmp.add(value);
+// resMap.put(table, tmp);
+//
+// partition = record.partition();
+// offset = record.offset();
+// } catch (Exception e) {
+// LOGGER.error(e.getMessage() + ": " + value, e);
+// }
+// }
+// resMap.forEach((table, values) -> {
+// if (!values.isEmpty()) {
+// serviceMap.getIService(table + "Service").saveDataObjectBatch(values);
+// }
+// });
+// LOGGER.info("Partition: " + partition + ", Offset: " + offset);
+// }
+//
+//}
diff --git a/src/main/java/com/easysoftware/kafka/EpkgConsumer.java b/src/main/java/com/easysoftware/kafka/EpkgConsumer.java
index 224ceca..3d3d50d 100644
--- a/src/main/java/com/easysoftware/kafka/EpkgConsumer.java
+++ b/src/main/java/com/easysoftware/kafka/EpkgConsumer.java
@@ -1,24 +1,24 @@
-package com.easysoftware.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Service;
-
-@Service
-public class EpkgConsumer extends BaseConsumer {
- // @Value("${consumer.topic.name}")
- // String topicName;
-
- // @Value("${consumer.topic.offset}")
- // String topicOffset;
-
- /**
- * Listens for and processes ConsumerRecords of type .
- *
- * @param records The ConsumerRecords to process.
- */
- @KafkaListener(topics = "software_test_epkg", concurrency = "3")
- public void listen(final ConsumerRecords records) {
- dealDataToTableByBatch(records);
- }
-}
+//package com.easysoftware.kafka;
+//
+//import org.apache.kafka.clients.consumer.ConsumerRecords;
+//import org.springframework.kafka.annotation.KafkaListener;
+//import org.springframework.stereotype.Service;
+//
+//@Service
+//public class EpkgConsumer extends BaseConsumer {
+// // @Value("${consumer.topic.name}")
+// // String topicName;
+//
+// // @Value("${consumer.topic.offset}")
+// // String topicOffset;
+//
+// /**
+// * Listens for and processes ConsumerRecords of type .
+// *
+// * @param records The ConsumerRecords to process.
+// */
+// @KafkaListener(topics = "software_test_epkg", concurrency = "3")
+// public void listen(final ConsumerRecords records) {
+// dealDataToTableByBatch(records);
+// }
+//}
diff --git a/src/main/java/com/easysoftware/kafka/Producer.java b/src/main/java/com/easysoftware/kafka/Producer.java
index a091066..c5f983c 100644
--- a/src/main/java/com/easysoftware/kafka/Producer.java
+++ b/src/main/java/com/easysoftware/kafka/Producer.java
@@ -1,35 +1,35 @@
-package com.easysoftware.kafka;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.stereotype.Component;
-
-@Component
-public class Producer {
-
- /**
- * Autowired KafkaTemplate for producing messages.
- */
- @Autowired
- private KafkaTemplate kafkaTemplate;
-
- /**
- * Static KafkaProducer for handling Kafka operations.
- */
- private static KafkaProducer producer;
-
- /**
- * Sends a message with the specified topic, key, and value.
- *
- * @param topic The Kafka topic to send the message to.
- * @param key The key associated with the message.
- * @param value The value of the message.
- */
- public void sendMess(final String topic, final String key, final String value) {
- ProducerRecord mess = new ProducerRecord(topic, key, value);
- kafkaTemplate.send(mess);
- }
-
-}
+//package com.easysoftware.kafka;
+//
+//import org.apache.kafka.clients.producer.KafkaProducer;
+//import org.apache.kafka.clients.producer.ProducerRecord;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.kafka.core.KafkaTemplate;
+//import org.springframework.stereotype.Component;
+//
+//@Component
+//public class Producer {
+//
+// /**
+// * Autowired KafkaTemplate for producing messages.
+// */
+// @Autowired
+// private KafkaTemplate kafkaTemplate;
+//
+// /**
+// * Static KafkaProducer for handling Kafka operations.
+// */
+// private static KafkaProducer producer;
+//
+// /**
+// * Sends a message with the specified topic, key, and value.
+// *
+// * @param topic The Kafka topic to send the message to.
+// * @param key The key associated with the message.
+// * @param value The value of the message.
+// */
+// public void sendMess(final String topic, final String key, final String value) {
+// ProducerRecord mess = new ProducerRecord(topic, key, value);
+// kafkaTemplate.send(mess);
+// }
+//
+//}
diff --git a/src/main/java/com/easysoftware/kafka/RpmConsumer.java b/src/main/java/com/easysoftware/kafka/RpmConsumer.java
index 8bce16d..b035056 100644
--- a/src/main/java/com/easysoftware/kafka/RpmConsumer.java
+++ b/src/main/java/com/easysoftware/kafka/RpmConsumer.java
@@ -1,32 +1,32 @@
-package com.easysoftware.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Service;
-
-@Service
-public class RpmConsumer extends BaseConsumer {
- /**
- * Value for the Kafka consumer topic name.
- */
- @Value("${consumer.topic.name}")
- private String topicName;
-
- /**
- * Value for the Kafka consumer topic offset.
- */
- @Value("${consumer.topic.offset}")
- private String topicOffset;
-
- /**
- * Kafka listener method that listens to the "software_test_rpm" topic with concurrency set to 3.
- *
- * @param records The ConsumerRecords to process.
- */
- @KafkaListener(topics = "software_test_rpm", concurrency = "3")
- public void listen(final ConsumerRecords records) {
- dealDataToTableByBatch(records);
- }
-
-}
+//package com.easysoftware.kafka;
+//
+//import org.apache.kafka.clients.consumer.ConsumerRecords;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.kafka.annotation.KafkaListener;
+//import org.springframework.stereotype.Service;
+//
+//@Service
+//public class RpmConsumer extends BaseConsumer {
+// /**
+// * Value for the Kafka consumer topic name.
+// */
+// @Value("${consumer.topic.name}")
+// private String topicName;
+//
+// /**
+// * Value for the Kafka consumer topic offset.
+// */
+// @Value("${consumer.topic.offset}")
+// private String topicOffset;
+//
+// /**
+// * Kafka listener method that listens to the "software_test_rpm" topic with concurrency set to 3.
+// *
+// * @param records The ConsumerRecords to process.
+// */
+// @KafkaListener(topics = "software_test_rpm", concurrency = "3")
+// public void listen(final ConsumerRecords records) {
+// dealDataToTableByBatch(records);
+// }
+//
+//}
diff --git a/src/main/java/com/easysoftware/kafka/VersionConsumer.java b/src/main/java/com/easysoftware/kafka/VersionConsumer.java
index a02633d..62afb70 100644
--- a/src/main/java/com/easysoftware/kafka/VersionConsumer.java
+++ b/src/main/java/com/easysoftware/kafka/VersionConsumer.java
@@ -1,19 +1,19 @@
-package com.easysoftware.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Service;
-
-@Service
-public class VersionConsumer extends BaseConsumer {
-
- /**
- * Listens for and processes ConsumerRecords of type .
- *
- * @param records The ConsumerRecords to process.
- */
- @KafkaListener(topics = "software_test_version")
- public void listen(final ConsumerRecords records) {
- dealDataToTableByBatch(records);
- }
-}
+//package com.easysoftware.kafka;
+//
+//import org.apache.kafka.clients.consumer.ConsumerRecords;
+//import org.springframework.kafka.annotation.KafkaListener;
+//import org.springframework.stereotype.Service;
+//
+//@Service
+//public class VersionConsumer extends BaseConsumer {
+//
+// /**
+// * Listens for and processes ConsumerRecords of type .
+// *
+// * @param records The ConsumerRecords to process.
+// */
+// @KafkaListener(topics = "software_test_version")
+// public void listen(final ConsumerRecords records) {
+// dealDataToTableByBatch(records);
+// }
+//}
--
Gitee
From 8c6bafdd855d2e7315e5645269ef06b0594682fb Mon Sep 17 00:00:00 2001
From: fuxinji9527 <1992666531@qq.com>
Date: Fri, 10 May 2024 10:03:16 +0800
Subject: [PATCH 2/2] =?UTF-8?q?=E5=88=A0=E9=99=A4kafka=E7=9B=B8=E5=85=B3?=
=?UTF-8?q?=E4=BB=A3=E7=A0=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
pom.xml | 11 --
.../ApplicationVersionServiceImpl.java | 8 -
.../epkgpackage/EPKGPackageServiceImpl.java | 8 -
.../rpmpackage/RPMPackageServiceImpl.java | 7 -
.../common/config/KafkaConsumerConfig.java | 91 ------------
.../easysoftware/kafka/AppPkgConsumer.java | 19 ---
.../com/easysoftware/kafka/BaseConsumer.java | 138 ------------------
.../com/easysoftware/kafka/EpkgConsumer.java | 24 ---
.../java/com/easysoftware/kafka/Producer.java | 35 -----
.../com/easysoftware/kafka/RpmConsumer.java | 32 ----
.../easysoftware/kafka/VersionConsumer.java | 19 ---
11 files changed, 392 deletions(-)
delete mode 100644 src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java
delete mode 100644 src/main/java/com/easysoftware/kafka/AppPkgConsumer.java
delete mode 100644 src/main/java/com/easysoftware/kafka/BaseConsumer.java
delete mode 100644 src/main/java/com/easysoftware/kafka/EpkgConsumer.java
delete mode 100644 src/main/java/com/easysoftware/kafka/Producer.java
delete mode 100644 src/main/java/com/easysoftware/kafka/RpmConsumer.java
delete mode 100644 src/main/java/com/easysoftware/kafka/VersionConsumer.java
diff --git a/pom.xml b/pom.xml
index 232d87b..3728840 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,12 +147,6 @@
bcprov-jdk18on
1.78.1
-
-
-
-
-
-
@@ -167,11 +161,6 @@
4.5.13
-
-
-
-
-
commons-codec
commons-codec
diff --git a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java
index 5d90fdd..5e590bb 100644
--- a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java
+++ b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java
@@ -25,12 +25,6 @@ import java.util.Map;
public class ApplicationVersionServiceImpl extends ServiceImpl implements ApplicationVersionService {
-// /**
-// * Autowired Kafka producer for sending messages.
-// */
-// @Autowired
-// private Producer kafkaProducer;
-
/**
* Topic name for the Kafka producer related to application versions.
*/
@@ -68,8 +62,6 @@ public class ApplicationVersionServiceImpl extends ServiceImpl kafkaMsg = ObjectMapperUtil.jsonToMap(appVersion);
kafkaMsg.put("table", "ApplicationVersion");
kafkaMsg.put("unique", inputAppVersion.getName());
-// kafkaProducer.sendMess(topicAppVersion + "_version",
-// UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg));
return ResultUtil.success(HttpStatus.OK);
}
diff --git a/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java b/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java
index 27bd76b..2a363a9 100644
--- a/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java
+++ b/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java
@@ -32,12 +32,6 @@ public class EPKGPackageServiceImpl extends
@Resource
private EPKGPackageGateway ePKGPackageGateway;
-// /**
-// * Kafka producer for messaging.
-// */
-// @Autowired
-// private Producer kafkaProducer;
-
/**
* API endpoint for repository maintainers.
*/
@@ -83,8 +77,6 @@ public class EPKGPackageServiceImpl extends
Map kafkaMsg = ObjectMapperUtil.jsonToMap(inputEPKGPackage);
kafkaMsg.put("table", "EPKGPackage");
-// kafkaProducer.sendMess(topicAppVersion + "_epkg", UuidUtil.getUUID32(),
-// ObjectMapperUtil.writeValueAsString(kafkaMsg));
return ResultUtil.success(HttpStatus.OK);
}
diff --git a/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java b/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java
index 9bad749..66d5588 100644
--- a/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java
+++ b/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java
@@ -29,11 +29,6 @@ import java.util.Map;
@Primary
@Service("RPMPackageService")
public class RPMPackageServiceImpl extends ServiceImpl implements RPMPackageService {
-// /**
-// * Autowired Kafka producer.
-// */
-// @Autowired
-// private Producer kafkaProducer;
/**
* Resource for RPM Package Gateway.
@@ -103,8 +98,6 @@ public class RPMPackageServiceImpl extends ServiceImpl kafkaMsg = ObjectMapperUtil.jsonToMap(inputrPMPackage);
kafkaMsg.put("table", "RPMPackage");
-// kafkaProducer.sendMess(topicAppVersion + "_rpm",
-// UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg));
return ResultUtil.success(HttpStatus.OK);
}
diff --git a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java b/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java
deleted file mode 100644
index 8aa73a1..0000000
--- a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java
+++ /dev/null
@@ -1,91 +0,0 @@
-//package com.easysoftware.common.config;
-//
-//import org.apache.kafka.clients.CommonClientConfigs;
-//import org.apache.kafka.clients.consumer.ConsumerConfig;
-//import org.apache.kafka.common.config.SaslConfigs;
-//import org.apache.kafka.common.config.SslConfigs;
-//import org.apache.kafka.common.security.auth.SecurityProtocol;
-//import org.apache.kafka.common.serialization.StringDeserializer;
-//import org.springframework.beans.factory.annotation.Value;
-//import org.springframework.context.annotation.Bean;
-//import org.springframework.context.annotation.Configuration;
-//import org.springframework.kafka.annotation.EnableKafka;
-//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-//import org.springframework.kafka.core.ConsumerFactory;
-//import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
-//
-//import java.util.HashMap;
-//import java.util.Map;
-//
-//@Configuration
-//@EnableKafka
-//public class KafkaConsumerConfig {
-//
-// /**
-// * Bootstrap servers for Kafka connection.
-// */
-// @Value("${bootstrap.servers}")
-// private String bootstrapServers;
-//
-// /**
-// * Consumer group ID for Kafka consumer.
-// */
-// @Value("${consumer.groupId}")
-// private String groupId;
-//
-// /**
-// * SASL JAAS configuration for authentication.
-// */
-// @Value("${spring.kafka.properties.sasl.jaas.config}")
-// private String authConfig;
-//
-// /**
-// * SASL mechanism for authentication.
-// */
-// @Value("${spring.kafka.properties.sasl.mechanism}")
-// private String mechanism;
-//
-// /**
-// * Location of the SSL trust store.
-// */
-// @Value("${spring.kafka.properties.ssl.truststore.location}")
-// private String trustStoreLocation;
-//
-//
-// /**
-// * Configures a ConsumerFactory for processing Kafka messages with String key and value types.
-// *
-// * @return The configured ConsumerFactory.
-// */
-// @Bean
-// public ConsumerFactory consumerFactory() {
-// Map configProps = new HashMap<>();
-// configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-// configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-// configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-// configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-//
-// // add SASL_SSL config
-// configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
-// configProps.put(SaslConfigs.SASL_MECHANISM, mechanism);
-// configProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
-// configProps.put(SaslConfigs.SASL_JAAS_CONFIG, authConfig);
-// configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);
-//
-// return new DefaultKafkaConsumerFactory<>(configProps);
-// }
-//
-// /**
-// * Configures a Kafka listener container factory for processing Kafka messages.
-// *
-// * @return The ConcurrentKafkaListenerContainerFactory for String key and value types.
-// */
-// @Bean
-// public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
-// ConcurrentKafkaListenerContainerFactory factory
-// = new ConcurrentKafkaListenerContainerFactory<>();
-// factory.setConsumerFactory(consumerFactory());
-// factory.setBatchListener(true);
-// return factory;
-// }
-//}
diff --git a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java b/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java
deleted file mode 100644
index a2a5fdc..0000000
--- a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java
+++ /dev/null
@@ -1,19 +0,0 @@
-//package com.easysoftware.kafka;
-//
-//import org.apache.kafka.clients.consumer.ConsumerRecords;
-//import org.springframework.kafka.annotation.KafkaListener;
-//import org.springframework.stereotype.Service;
-//
-//@Service
-//public class AppPkgConsumer extends BaseConsumer {
-//
-// /**
-// * Listens for and processes ConsumerRecords of type .
-// *
-// * @param records The ConsumerRecords to process.
-// */
-// @KafkaListener(topics = "software_test_app")
-// public void listen(final ConsumerRecords records) {
-// dealDataToTableByBatch(records);
-// }
-//}
diff --git a/src/main/java/com/easysoftware/kafka/BaseConsumer.java b/src/main/java/com/easysoftware/kafka/BaseConsumer.java
deleted file mode 100644
index 7842910..0000000
--- a/src/main/java/com/easysoftware/kafka/BaseConsumer.java
+++ /dev/null
@@ -1,138 +0,0 @@
-//package com.easysoftware.kafka;
-//
-//import com.easysoftware.application.BaseIService;
-//import com.easysoftware.application.ServiceMap;
-//import com.easysoftware.common.utils.ObjectMapperUtil;
-//import lombok.Generated;
-//import org.apache.kafka.clients.consumer.ConsumerRecord;
-//import org.apache.kafka.clients.consumer.ConsumerRecords;
-//import org.apache.kafka.clients.consumer.KafkaConsumer;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//import org.springframework.beans.factory.annotation.Autowired;
-//
-//import java.time.Duration;
-//import java.util.ArrayList;
-//import java.util.HashMap;
-//import java.util.Map;
-//
-//public class BaseConsumer {
-//
-// /**
-// * Autowired ServiceMap instance.
-// */
-// @Autowired
-// private ServiceMap serviceMap;
-//
-// /**
-// * Logger for BaseConsumer class.
-// */
-// private static final Logger LOGGER = LoggerFactory.getLogger(BaseConsumer.class);
-//
-// /**
-// * List to hold KafkaConsumer instances for String keys and values.
-// */
-// private final ArrayList> kafkaConsumerList = new ArrayList<>();
-//
-//
-// /**
-// * Custom tasks method to perform Kafka to MySQL data transfer.
-// */
-// // @Scheduled(fixedRate = 5000)
-// @Generated
-// public void tasks() {
-// kafkaToMysql();
-// }
-//
-// /**
-// * Method to transfer data from Kafka to MySQL by processing ConsumerRecords.
-// */
-// @Generated
-// public void kafkaToMysql() {
-// while (true) {
-// for (KafkaConsumer customer : kafkaConsumerList) {
-// ConsumerRecords poll = customer.poll(Duration.ofSeconds(5));
-// dealDataToTableByBatch(poll);
-// customer.commitAsync();
-// }
-// }
-// }
-//
-// /**
-// * Processes ConsumerRecords in batches and deals with the data to insert into a table.
-// *
-// * @param records The ConsumerRecords to process.
-// */
-// // The data of a topic can only be written to the same table
-// public void dealDataToTableByBatch(final ConsumerRecords records) {
-// ArrayList appList = new ArrayList<>();
-// BaseIService baseIService = null;
-// int partition = 0;
-// long offset = 0;
-// long startTime = System.nanoTime();
-// for (ConsumerRecord record : records) {
-// String value = record.value();
-// try {
-// Map dtoMap = ObjectMapperUtil.toMap(value);
-// String table = dtoMap.get("table").toString();
-// baseIService = serviceMap.getIService(table + "Service");
-// appList.add(value);
-// partition = record.partition();
-// offset = record.offset();
-// } catch (Exception e) {
-// LOGGER.error(e.getMessage() + ":" + value, e);
-// }
-// }
-// long endTime1 = System.nanoTime();
-// long duration = (endTime1 - startTime) / 1000000;
-// LOGGER.info("处理records用时: " + duration + " 毫秒," + "数据量:" + appList.size());
-// if (!appList.isEmpty()) {
-// LOGGER.info("partation: " + partition + ", offset: " + offset);
-// baseIService.saveDataObjectBatch(appList);
-// }
-// long endTime2 = System.nanoTime();
-// duration = (endTime2 - endTime1) / 1000000;
-// LOGGER.info("写入数据库用时: " + duration + " 毫秒," + "数据量:" + appList.size());
-// }
-//
-// /**
-// * Processes ConsumerRecords and deals with the data to insert into multiple tables.
-// *
-// * @param records The ConsumerRecords to process.
-// */
-// // The data of a topic may be written to multiple tables
-// @Generated
-// public void dealDataToMultipleTables(final ConsumerRecords records) {
-// Map> resMap = new HashMap<>();
-// int partition = 0;
-// long offset = 0;
-//
-// for (ConsumerRecord record : records) {
-// String value = record.value();
-// try {
-// Map dtoMap = ObjectMapperUtil.toMap(value);
-// String table = dtoMap.get("table").toString();
-//
-// if (!resMap.containsKey(table)) {
-// resMap.put(table, new ArrayList<>());
-// }
-//
-// ArrayList tmp = resMap.get(table);
-// tmp.add(value);
-// resMap.put(table, tmp);
-//
-// partition = record.partition();
-// offset = record.offset();
-// } catch (Exception e) {
-// LOGGER.error(e.getMessage() + ": " + value, e);
-// }
-// }
-// resMap.forEach((table, values) -> {
-// if (!values.isEmpty()) {
-// serviceMap.getIService(table + "Service").saveDataObjectBatch(values);
-// }
-// });
-// LOGGER.info("Partition: " + partition + ", Offset: " + offset);
-// }
-//
-//}
diff --git a/src/main/java/com/easysoftware/kafka/EpkgConsumer.java b/src/main/java/com/easysoftware/kafka/EpkgConsumer.java
deleted file mode 100644
index 3d3d50d..0000000
--- a/src/main/java/com/easysoftware/kafka/EpkgConsumer.java
+++ /dev/null
@@ -1,24 +0,0 @@
-//package com.easysoftware.kafka;
-//
-//import org.apache.kafka.clients.consumer.ConsumerRecords;
-//import org.springframework.kafka.annotation.KafkaListener;
-//import org.springframework.stereotype.Service;
-//
-//@Service
-//public class EpkgConsumer extends BaseConsumer {
-// // @Value("${consumer.topic.name}")
-// // String topicName;
-//
-// // @Value("${consumer.topic.offset}")
-// // String topicOffset;
-//
-// /**
-// * Listens for and processes ConsumerRecords of type .
-// *
-// * @param records The ConsumerRecords to process.
-// */
-// @KafkaListener(topics = "software_test_epkg", concurrency = "3")
-// public void listen(final ConsumerRecords records) {
-// dealDataToTableByBatch(records);
-// }
-//}
diff --git a/src/main/java/com/easysoftware/kafka/Producer.java b/src/main/java/com/easysoftware/kafka/Producer.java
deleted file mode 100644
index c5f983c..0000000
--- a/src/main/java/com/easysoftware/kafka/Producer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-//package com.easysoftware.kafka;
-//
-//import org.apache.kafka.clients.producer.KafkaProducer;
-//import org.apache.kafka.clients.producer.ProducerRecord;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.kafka.core.KafkaTemplate;
-//import org.springframework.stereotype.Component;
-//
-//@Component
-//public class Producer {
-//
-// /**
-// * Autowired KafkaTemplate for producing messages.
-// */
-// @Autowired
-// private KafkaTemplate kafkaTemplate;
-//
-// /**
-// * Static KafkaProducer for handling Kafka operations.
-// */
-// private static KafkaProducer producer;
-//
-// /**
-// * Sends a message with the specified topic, key, and value.
-// *
-// * @param topic The Kafka topic to send the message to.
-// * @param key The key associated with the message.
-// * @param value The value of the message.
-// */
-// public void sendMess(final String topic, final String key, final String value) {
-// ProducerRecord mess = new ProducerRecord(topic, key, value);
-// kafkaTemplate.send(mess);
-// }
-//
-//}
diff --git a/src/main/java/com/easysoftware/kafka/RpmConsumer.java b/src/main/java/com/easysoftware/kafka/RpmConsumer.java
deleted file mode 100644
index b035056..0000000
--- a/src/main/java/com/easysoftware/kafka/RpmConsumer.java
+++ /dev/null
@@ -1,32 +0,0 @@
-//package com.easysoftware.kafka;
-//
-//import org.apache.kafka.clients.consumer.ConsumerRecords;
-//import org.springframework.beans.factory.annotation.Value;
-//import org.springframework.kafka.annotation.KafkaListener;
-//import org.springframework.stereotype.Service;
-//
-//@Service
-//public class RpmConsumer extends BaseConsumer {
-// /**
-// * Value for the Kafka consumer topic name.
-// */
-// @Value("${consumer.topic.name}")
-// private String topicName;
-//
-// /**
-// * Value for the Kafka consumer topic offset.
-// */
-// @Value("${consumer.topic.offset}")
-// private String topicOffset;
-//
-// /**
-// * Kafka listener method that listens to the "software_test_rpm" topic with concurrency set to 3.
-// *
-// * @param records The ConsumerRecords to process.
-// */
-// @KafkaListener(topics = "software_test_rpm", concurrency = "3")
-// public void listen(final ConsumerRecords records) {
-// dealDataToTableByBatch(records);
-// }
-//
-//}
diff --git a/src/main/java/com/easysoftware/kafka/VersionConsumer.java b/src/main/java/com/easysoftware/kafka/VersionConsumer.java
deleted file mode 100644
index 62afb70..0000000
--- a/src/main/java/com/easysoftware/kafka/VersionConsumer.java
+++ /dev/null
@@ -1,19 +0,0 @@
-//package com.easysoftware.kafka;
-//
-//import org.apache.kafka.clients.consumer.ConsumerRecords;
-//import org.springframework.kafka.annotation.KafkaListener;
-//import org.springframework.stereotype.Service;
-//
-//@Service
-//public class VersionConsumer extends BaseConsumer {
-//
-// /**
-// * Listens for and processes ConsumerRecords of type .
-// *
-// * @param records The ConsumerRecords to process.
-// */
-// @KafkaListener(topics = "software_test_version")
-// public void listen(final ConsumerRecords records) {
-// dealDataToTableByBatch(records);
-// }
-//}
--
Gitee