diff --git a/pom.xml b/pom.xml
index 509b378c3864d212461adfc8c682a64cc3a7e36d..37288401f1a88ed6e05a0f287f8883b0cd5511d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,12 +147,6 @@
bcprov-jdk18on
1.78.1
-
-
- org.apache.kafka
- kafka-clients
- 3.7.0
-
@@ -167,11 +161,6 @@
4.5.13
-
- org.springframework.kafka
- spring-kafka
-
-
commons-codec
commons-codec
diff --git a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java
index 11f9a6566ac61bf3c31aacff53caba06dc81b11c..5e590bb257d9197ff1340a4391b04e80fa2b21dd 100644
--- a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java
+++ b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java
@@ -6,15 +6,12 @@ import com.easysoftware.application.applicationversion.dto.InputApplicationVersi
import com.easysoftware.common.entity.MessageCode;
import com.easysoftware.common.utils.ObjectMapperUtil;
import com.easysoftware.common.utils.ResultUtil;
-import com.easysoftware.common.utils.UuidUtil;
import com.easysoftware.domain.applicationversion.ApplicationVersion;
import com.easysoftware.domain.applicationversion.gateway.ApplicationVersionGateway;
import com.easysoftware.infrastructure.applicationversion.gatewayimpl.dataobject.ApplicationVersionDO;
import com.easysoftware.infrastructure.mapper.ApplicationVersionDOMapper;
-import com.easysoftware.kafka.Producer;
import jakarta.annotation.Resource;
import org.springframework.beans.BeanUtils;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@@ -28,12 +25,6 @@ import java.util.Map;
public class ApplicationVersionServiceImpl extends ServiceImpl implements ApplicationVersionService {
- /**
- * Autowired Kafka producer for sending messages.
- */
- @Autowired
- private Producer kafkaProducer;
-
/**
* Topic name for the Kafka producer related to application versions.
*/
@@ -71,8 +62,6 @@ public class ApplicationVersionServiceImpl extends ServiceImpl kafkaMsg = ObjectMapperUtil.jsonToMap(appVersion);
kafkaMsg.put("table", "ApplicationVersion");
kafkaMsg.put("unique", inputAppVersion.getName());
- kafkaProducer.sendMess(topicAppVersion + "_version",
- UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg));
return ResultUtil.success(HttpStatus.OK);
}
diff --git a/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java b/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java
index 5448e04fb1d599c251ee8a5cef1c3b328746c136..2a363a96c4dbfeff0aac1b6193452dc6194a7af5 100644
--- a/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java
+++ b/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java
@@ -6,17 +6,14 @@ import com.easysoftware.application.epkgpackage.dto.InputEPKGPackage;
import com.easysoftware.application.epkgpackage.vo.EPKGPackageDetailVo;
import com.easysoftware.common.utils.ObjectMapperUtil;
import com.easysoftware.common.utils.ResultUtil;
-import com.easysoftware.common.utils.UuidUtil;
import com.easysoftware.domain.epkgpackage.EPKGPackage;
import com.easysoftware.domain.epkgpackage.EPKGPackageUnique;
import com.easysoftware.domain.epkgpackage.gateway.EPKGPackageGateway;
import com.easysoftware.infrastructure.epkgpackage.gatewayimpl.dataobject.EPKGPackageDO;
import com.easysoftware.infrastructure.mapper.EPKGPackageDOMapper;
-import com.easysoftware.kafka.Producer;
import jakarta.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@@ -35,12 +32,6 @@ public class EPKGPackageServiceImpl extends
@Resource
private EPKGPackageGateway ePKGPackageGateway;
- /**
- * Kafka producer for messaging.
- */
- @Autowired
- private Producer kafkaProducer;
-
/**
* API endpoint for repository maintainers.
*/
@@ -86,8 +77,6 @@ public class EPKGPackageServiceImpl extends
Map kafkaMsg = ObjectMapperUtil.jsonToMap(inputEPKGPackage);
kafkaMsg.put("table", "EPKGPackage");
- kafkaProducer.sendMess(topicAppVersion + "_epkg", UuidUtil.getUUID32(),
- ObjectMapperUtil.writeValueAsString(kafkaMsg));
return ResultUtil.success(HttpStatus.OK);
}
diff --git a/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java b/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java
index 21c471339b94914c1f797e15db7f4751091f82bf..66d558819ea2318bc13cc301749327c9d70c7a8d 100644
--- a/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java
+++ b/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java
@@ -7,17 +7,14 @@ import com.easysoftware.application.rpmpackage.vo.RPMPackageDetailVo;
import com.easysoftware.application.rpmpackage.vo.RPMPackageDomainVo;
import com.easysoftware.common.utils.ObjectMapperUtil;
import com.easysoftware.common.utils.ResultUtil;
-import com.easysoftware.common.utils.UuidUtil;
import com.easysoftware.domain.rpmpackage.RPMPackage;
import com.easysoftware.domain.rpmpackage.RPMPackageUnique;
import com.easysoftware.domain.rpmpackage.gateway.RPMPackageGateway;
import com.easysoftware.infrastructure.mapper.RPMPackageDOMapper;
import com.easysoftware.infrastructure.rpmpackage.gatewayimpl.dataobject.RPMPackageDO;
-import com.easysoftware.kafka.Producer;
import jakarta.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.http.HttpStatus;
@@ -32,11 +29,6 @@ import java.util.Map;
@Primary
@Service("RPMPackageService")
public class RPMPackageServiceImpl extends ServiceImpl implements RPMPackageService {
- /**
- * Autowired Kafka producer.
- */
- @Autowired
- private Producer kafkaProducer;
/**
* Resource for RPM Package Gateway.
@@ -106,8 +98,6 @@ public class RPMPackageServiceImpl extends ServiceImpl kafkaMsg = ObjectMapperUtil.jsonToMap(inputrPMPackage);
kafkaMsg.put("table", "RPMPackage");
- kafkaProducer.sendMess(topicAppVersion + "_rpm",
- UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg));
return ResultUtil.success(HttpStatus.OK);
}
diff --git a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java b/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java
deleted file mode 100644
index 802ad701fc21f32c1237c8314fbb0ab3ed26d260..0000000000000000000000000000000000000000
--- a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package com.easysoftware.common.config;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.annotation.EnableKafka;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.core.ConsumerFactory;
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@Configuration
-@EnableKafka
-public class KafkaConsumerConfig {
-
- /**
- * Bootstrap servers for Kafka connection.
- */
- @Value("${bootstrap.servers}")
- private String bootstrapServers;
-
- /**
- * Consumer group ID for Kafka consumer.
- */
- @Value("${consumer.groupId}")
- private String groupId;
-
- /**
- * SASL JAAS configuration for authentication.
- */
- @Value("${spring.kafka.properties.sasl.jaas.config}")
- private String authConfig;
-
- /**
- * SASL mechanism for authentication.
- */
- @Value("${spring.kafka.properties.sasl.mechanism}")
- private String mechanism;
-
- /**
- * Location of the SSL trust store.
- */
- @Value("${spring.kafka.properties.ssl.truststore.location}")
- private String trustStoreLocation;
-
-
- /**
- * Configures a ConsumerFactory for processing Kafka messages with String key and value types.
- *
- * @return The configured ConsumerFactory.
- */
- @Bean
- public ConsumerFactory consumerFactory() {
- Map configProps = new HashMap<>();
- configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- // add SASL_SSL config
- configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
- configProps.put(SaslConfigs.SASL_MECHANISM, mechanism);
- configProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
- configProps.put(SaslConfigs.SASL_JAAS_CONFIG, authConfig);
- configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);
-
- return new DefaultKafkaConsumerFactory<>(configProps);
- }
-
- /**
- * Configures a Kafka listener container factory for processing Kafka messages.
- *
- * @return The ConcurrentKafkaListenerContainerFactory for String key and value types.
- */
- @Bean
- public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory factory
- = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- factory.setBatchListener(true);
- return factory;
- }
-}
diff --git a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java b/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java
deleted file mode 100644
index 6efbe8aba16874fb6ca37107d605664a9eaff7ce..0000000000000000000000000000000000000000
--- a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.easysoftware.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Service;
-
-@Service
-public class AppPkgConsumer extends BaseConsumer {
-
- /**
- * Listens for and processes ConsumerRecords of type .
- *
- * @param records The ConsumerRecords to process.
- */
- @KafkaListener(topics = "software_test_app")
- public void listen(final ConsumerRecords records) {
- dealDataToTableByBatch(records);
- }
-}
diff --git a/src/main/java/com/easysoftware/kafka/BaseConsumer.java b/src/main/java/com/easysoftware/kafka/BaseConsumer.java
deleted file mode 100644
index 3d2dff9c0686e6645e5fd17137a9077e98de0c3f..0000000000000000000000000000000000000000
--- a/src/main/java/com/easysoftware/kafka/BaseConsumer.java
+++ /dev/null
@@ -1,138 +0,0 @@
-package com.easysoftware.kafka;
-
-import com.easysoftware.application.BaseIService;
-import com.easysoftware.application.ServiceMap;
-import com.easysoftware.common.utils.ObjectMapperUtil;
-import lombok.Generated;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-public class BaseConsumer {
-
- /**
- * Autowired ServiceMap instance.
- */
- @Autowired
- private ServiceMap serviceMap;
-
- /**
- * Logger for BaseConsumer class.
- */
- private static final Logger LOGGER = LoggerFactory.getLogger(BaseConsumer.class);
-
- /**
- * List to hold KafkaConsumer instances for String keys and values.
- */
- private final ArrayList> kafkaConsumerList = new ArrayList<>();
-
-
- /**
- * Custom tasks method to perform Kafka to MySQL data transfer.
- */
- // @Scheduled(fixedRate = 5000)
- @Generated
- public void tasks() {
- kafkaToMysql();
- }
-
- /**
- * Method to transfer data from Kafka to MySQL by processing ConsumerRecords.
- */
- @Generated
- public void kafkaToMysql() {
- while (true) {
- for (KafkaConsumer customer : kafkaConsumerList) {
- ConsumerRecords poll = customer.poll(Duration.ofSeconds(5));
- dealDataToTableByBatch(poll);
- customer.commitAsync();
- }
- }
- }
-
- /**
- * Processes ConsumerRecords in batches and deals with the data to insert into a table.
- *
- * @param records The ConsumerRecords to process.
- */
- // The data of a topic can only be written to the same table
- public void dealDataToTableByBatch(final ConsumerRecords records) {
- ArrayList appList = new ArrayList<>();
- BaseIService baseIService = null;
- int partition = 0;
- long offset = 0;
- long startTime = System.nanoTime();
- for (ConsumerRecord record : records) {
- String value = record.value();
- try {
- Map dtoMap = ObjectMapperUtil.toMap(value);
- String table = dtoMap.get("table").toString();
- baseIService = serviceMap.getIService(table + "Service");
- appList.add(value);
- partition = record.partition();
- offset = record.offset();
- } catch (Exception e) {
- LOGGER.error(e.getMessage() + ":" + value, e);
- }
- }
- long endTime1 = System.nanoTime();
- long duration = (endTime1 - startTime) / 1000000;
- LOGGER.info("处理records用时: " + duration + " 毫秒," + "数据量:" + appList.size());
- if (!appList.isEmpty()) {
- LOGGER.info("partation: " + partition + ", offset: " + offset);
- baseIService.saveDataObjectBatch(appList);
- }
- long endTime2 = System.nanoTime();
- duration = (endTime2 - endTime1) / 1000000;
- LOGGER.info("写入数据库用时: " + duration + " 毫秒," + "数据量:" + appList.size());
- }
-
- /**
- * Processes ConsumerRecords and deals with the data to insert into multiple tables.
- *
- * @param records The ConsumerRecords to process.
- */
- // The data of a topic may be written to multiple tables
- @Generated
- public void dealDataToMultipleTables(final ConsumerRecords records) {
- Map> resMap = new HashMap<>();
- int partition = 0;
- long offset = 0;
-
- for (ConsumerRecord record : records) {
- String value = record.value();
- try {
- Map dtoMap = ObjectMapperUtil.toMap(value);
- String table = dtoMap.get("table").toString();
-
- if (!resMap.containsKey(table)) {
- resMap.put(table, new ArrayList<>());
- }
-
- ArrayList tmp = resMap.get(table);
- tmp.add(value);
- resMap.put(table, tmp);
-
- partition = record.partition();
- offset = record.offset();
- } catch (Exception e) {
- LOGGER.error(e.getMessage() + ": " + value, e);
- }
- }
- resMap.forEach((table, values) -> {
- if (!values.isEmpty()) {
- serviceMap.getIService(table + "Service").saveDataObjectBatch(values);
- }
- });
- LOGGER.info("Partition: " + partition + ", Offset: " + offset);
- }
-
-}
diff --git a/src/main/java/com/easysoftware/kafka/EpkgConsumer.java b/src/main/java/com/easysoftware/kafka/EpkgConsumer.java
deleted file mode 100644
index 224ceca698aa6cbb9550e015c2bc732dd9e47fd3..0000000000000000000000000000000000000000
--- a/src/main/java/com/easysoftware/kafka/EpkgConsumer.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package com.easysoftware.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Service;
-
-@Service
-public class EpkgConsumer extends BaseConsumer {
- // @Value("${consumer.topic.name}")
- // String topicName;
-
- // @Value("${consumer.topic.offset}")
- // String topicOffset;
-
- /**
- * Listens for and processes ConsumerRecords of type .
- *
- * @param records The ConsumerRecords to process.
- */
- @KafkaListener(topics = "software_test_epkg", concurrency = "3")
- public void listen(final ConsumerRecords records) {
- dealDataToTableByBatch(records);
- }
-}
diff --git a/src/main/java/com/easysoftware/kafka/Producer.java b/src/main/java/com/easysoftware/kafka/Producer.java
deleted file mode 100644
index a0910668cab177c4142c1046ba1d93c841cce9ad..0000000000000000000000000000000000000000
--- a/src/main/java/com/easysoftware/kafka/Producer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package com.easysoftware.kafka;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.stereotype.Component;
-
-@Component
-public class Producer {
-
- /**
- * Autowired KafkaTemplate for producing messages.
- */
- @Autowired
- private KafkaTemplate kafkaTemplate;
-
- /**
- * Static KafkaProducer for handling Kafka operations.
- */
- private static KafkaProducer producer;
-
- /**
- * Sends a message with the specified topic, key, and value.
- *
- * @param topic The Kafka topic to send the message to.
- * @param key The key associated with the message.
- * @param value The value of the message.
- */
- public void sendMess(final String topic, final String key, final String value) {
- ProducerRecord mess = new ProducerRecord(topic, key, value);
- kafkaTemplate.send(mess);
- }
-
-}
diff --git a/src/main/java/com/easysoftware/kafka/RpmConsumer.java b/src/main/java/com/easysoftware/kafka/RpmConsumer.java
deleted file mode 100644
index 8bce16d608d94b0e81460cc37e92a4ea3171a2ef..0000000000000000000000000000000000000000
--- a/src/main/java/com/easysoftware/kafka/RpmConsumer.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package com.easysoftware.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Service;
-
-@Service
-public class RpmConsumer extends BaseConsumer {
- /**
- * Value for the Kafka consumer topic name.
- */
- @Value("${consumer.topic.name}")
- private String topicName;
-
- /**
- * Value for the Kafka consumer topic offset.
- */
- @Value("${consumer.topic.offset}")
- private String topicOffset;
-
- /**
- * Kafka listener method that listens to the "software_test_rpm" topic with concurrency set to 3.
- *
- * @param records The ConsumerRecords to process.
- */
- @KafkaListener(topics = "software_test_rpm", concurrency = "3")
- public void listen(final ConsumerRecords records) {
- dealDataToTableByBatch(records);
- }
-
-}
diff --git a/src/main/java/com/easysoftware/kafka/VersionConsumer.java b/src/main/java/com/easysoftware/kafka/VersionConsumer.java
deleted file mode 100644
index a02633d012b53cbe7d4ca4c2f4256b4c3dc91fc6..0000000000000000000000000000000000000000
--- a/src/main/java/com/easysoftware/kafka/VersionConsumer.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.easysoftware.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.stereotype.Service;
-
-@Service
-public class VersionConsumer extends BaseConsumer {
-
- /**
- * Listens for and processes ConsumerRecords of type .
- *
- * @param records The ConsumerRecords to process.
- */
- @KafkaListener(topics = "software_test_version")
- public void listen(final ConsumerRecords records) {
- dealDataToTableByBatch(records);
- }
-}