diff --git a/pom.xml b/pom.xml index 509b378c3864d212461adfc8c682a64cc3a7e36d..37288401f1a88ed6e05a0f287f8883b0cd5511d5 100644 --- a/pom.xml +++ b/pom.xml @@ -147,12 +147,6 @@ bcprov-jdk18on 1.78.1 - - - org.apache.kafka - kafka-clients - 3.7.0 - @@ -167,11 +161,6 @@ 4.5.13 - - org.springframework.kafka - spring-kafka - - commons-codec commons-codec diff --git a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java index 11f9a6566ac61bf3c31aacff53caba06dc81b11c..5e590bb257d9197ff1340a4391b04e80fa2b21dd 100644 --- a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java +++ b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java @@ -6,15 +6,12 @@ import com.easysoftware.application.applicationversion.dto.InputApplicationVersi import com.easysoftware.common.entity.MessageCode; import com.easysoftware.common.utils.ObjectMapperUtil; import com.easysoftware.common.utils.ResultUtil; -import com.easysoftware.common.utils.UuidUtil; import com.easysoftware.domain.applicationversion.ApplicationVersion; import com.easysoftware.domain.applicationversion.gateway.ApplicationVersionGateway; import com.easysoftware.infrastructure.applicationversion.gatewayimpl.dataobject.ApplicationVersionDO; import com.easysoftware.infrastructure.mapper.ApplicationVersionDOMapper; -import com.easysoftware.kafka.Producer; import jakarta.annotation.Resource; import org.springframework.beans.BeanUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -28,12 +25,6 @@ import java.util.Map; public class ApplicationVersionServiceImpl extends ServiceImpl implements ApplicationVersionService { - /** - * Autowired Kafka producer for sending messages. - */ - @Autowired - private Producer kafkaProducer; - /** * Topic name for the Kafka producer related to application versions. */ @@ -71,8 +62,6 @@ public class ApplicationVersionServiceImpl extends ServiceImpl kafkaMsg = ObjectMapperUtil.jsonToMap(appVersion); kafkaMsg.put("table", "ApplicationVersion"); kafkaMsg.put("unique", inputAppVersion.getName()); - kafkaProducer.sendMess(topicAppVersion + "_version", - UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg)); return ResultUtil.success(HttpStatus.OK); } diff --git a/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java b/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java index 5448e04fb1d599c251ee8a5cef1c3b328746c136..2a363a96c4dbfeff0aac1b6193452dc6194a7af5 100644 --- a/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java +++ b/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java @@ -6,17 +6,14 @@ import com.easysoftware.application.epkgpackage.dto.InputEPKGPackage; import com.easysoftware.application.epkgpackage.vo.EPKGPackageDetailVo; import com.easysoftware.common.utils.ObjectMapperUtil; import com.easysoftware.common.utils.ResultUtil; -import com.easysoftware.common.utils.UuidUtil; import com.easysoftware.domain.epkgpackage.EPKGPackage; import com.easysoftware.domain.epkgpackage.EPKGPackageUnique; import com.easysoftware.domain.epkgpackage.gateway.EPKGPackageGateway; import com.easysoftware.infrastructure.epkgpackage.gatewayimpl.dataobject.EPKGPackageDO; import com.easysoftware.infrastructure.mapper.EPKGPackageDOMapper; -import com.easysoftware.kafka.Producer; import jakarta.annotation.Resource; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -35,12 +32,6 @@ public class EPKGPackageServiceImpl extends @Resource private EPKGPackageGateway ePKGPackageGateway; - /** - * Kafka producer for messaging. - */ - @Autowired - private Producer kafkaProducer; - /** * API endpoint for repository maintainers. */ @@ -86,8 +77,6 @@ public class EPKGPackageServiceImpl extends Map kafkaMsg = ObjectMapperUtil.jsonToMap(inputEPKGPackage); kafkaMsg.put("table", "EPKGPackage"); - kafkaProducer.sendMess(topicAppVersion + "_epkg", UuidUtil.getUUID32(), - ObjectMapperUtil.writeValueAsString(kafkaMsg)); return ResultUtil.success(HttpStatus.OK); } diff --git a/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java b/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java index 21c471339b94914c1f797e15db7f4751091f82bf..66d558819ea2318bc13cc301749327c9d70c7a8d 100644 --- a/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java +++ b/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java @@ -7,17 +7,14 @@ import com.easysoftware.application.rpmpackage.vo.RPMPackageDetailVo; import com.easysoftware.application.rpmpackage.vo.RPMPackageDomainVo; import com.easysoftware.common.utils.ObjectMapperUtil; import com.easysoftware.common.utils.ResultUtil; -import com.easysoftware.common.utils.UuidUtil; import com.easysoftware.domain.rpmpackage.RPMPackage; import com.easysoftware.domain.rpmpackage.RPMPackageUnique; import com.easysoftware.domain.rpmpackage.gateway.RPMPackageGateway; import com.easysoftware.infrastructure.mapper.RPMPackageDOMapper; import com.easysoftware.infrastructure.rpmpackage.gatewayimpl.dataobject.RPMPackageDO; -import com.easysoftware.kafka.Producer; import jakarta.annotation.Resource; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Primary; import org.springframework.http.HttpStatus; @@ -32,11 +29,6 @@ import java.util.Map; @Primary @Service("RPMPackageService") public class RPMPackageServiceImpl extends ServiceImpl implements RPMPackageService { - /** - * Autowired Kafka producer. - */ - @Autowired - private Producer kafkaProducer; /** * Resource for RPM Package Gateway. @@ -106,8 +98,6 @@ public class RPMPackageServiceImpl extends ServiceImpl kafkaMsg = ObjectMapperUtil.jsonToMap(inputrPMPackage); kafkaMsg.put("table", "RPMPackage"); - kafkaProducer.sendMess(topicAppVersion + "_rpm", - UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg)); return ResultUtil.success(HttpStatus.OK); } diff --git a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java b/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java deleted file mode 100644 index 802ad701fc21f32c1237c8314fbb0ab3ed26d260..0000000000000000000000000000000000000000 --- a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java +++ /dev/null @@ -1,91 +0,0 @@ -package com.easysoftware.common.config; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; - -import java.util.HashMap; -import java.util.Map; - -@Configuration -@EnableKafka -public class KafkaConsumerConfig { - - /** - * Bootstrap servers for Kafka connection. - */ - @Value("${bootstrap.servers}") - private String bootstrapServers; - - /** - * Consumer group ID for Kafka consumer. - */ - @Value("${consumer.groupId}") - private String groupId; - - /** - * SASL JAAS configuration for authentication. - */ - @Value("${spring.kafka.properties.sasl.jaas.config}") - private String authConfig; - - /** - * SASL mechanism for authentication. - */ - @Value("${spring.kafka.properties.sasl.mechanism}") - private String mechanism; - - /** - * Location of the SSL trust store. - */ - @Value("${spring.kafka.properties.ssl.truststore.location}") - private String trustStoreLocation; - - - /** - * Configures a ConsumerFactory for processing Kafka messages with String key and value types. - * - * @return The configured ConsumerFactory. - */ - @Bean - public ConsumerFactory consumerFactory() { - Map configProps = new HashMap<>(); - configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - - // add SASL_SSL config - configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name); - configProps.put(SaslConfigs.SASL_MECHANISM, mechanism); - configProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - configProps.put(SaslConfigs.SASL_JAAS_CONFIG, authConfig); - configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation); - - return new DefaultKafkaConsumerFactory<>(configProps); - } - - /** - * Configures a Kafka listener container factory for processing Kafka messages. - * - * @return The ConcurrentKafkaListenerContainerFactory for String key and value types. - */ - @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory - = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory()); - factory.setBatchListener(true); - return factory; - } -} diff --git a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java b/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java deleted file mode 100644 index 6efbe8aba16874fb6ca37107d605664a9eaff7ce..0000000000000000000000000000000000000000 --- a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.easysoftware.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Service; - -@Service -public class AppPkgConsumer extends BaseConsumer { - - /** - * Listens for and processes ConsumerRecords of type . - * - * @param records The ConsumerRecords to process. - */ - @KafkaListener(topics = "software_test_app") - public void listen(final ConsumerRecords records) { - dealDataToTableByBatch(records); - } -} diff --git a/src/main/java/com/easysoftware/kafka/BaseConsumer.java b/src/main/java/com/easysoftware/kafka/BaseConsumer.java deleted file mode 100644 index 3d2dff9c0686e6645e5fd17137a9077e98de0c3f..0000000000000000000000000000000000000000 --- a/src/main/java/com/easysoftware/kafka/BaseConsumer.java +++ /dev/null @@ -1,138 +0,0 @@ -package com.easysoftware.kafka; - -import com.easysoftware.application.BaseIService; -import com.easysoftware.application.ServiceMap; -import com.easysoftware.common.utils.ObjectMapperUtil; -import lombok.Generated; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -public class BaseConsumer { - - /** - * Autowired ServiceMap instance. - */ - @Autowired - private ServiceMap serviceMap; - - /** - * Logger for BaseConsumer class. - */ - private static final Logger LOGGER = LoggerFactory.getLogger(BaseConsumer.class); - - /** - * List to hold KafkaConsumer instances for String keys and values. - */ - private final ArrayList> kafkaConsumerList = new ArrayList<>(); - - - /** - * Custom tasks method to perform Kafka to MySQL data transfer. - */ - // @Scheduled(fixedRate = 5000) - @Generated - public void tasks() { - kafkaToMysql(); - } - - /** - * Method to transfer data from Kafka to MySQL by processing ConsumerRecords. - */ - @Generated - public void kafkaToMysql() { - while (true) { - for (KafkaConsumer customer : kafkaConsumerList) { - ConsumerRecords poll = customer.poll(Duration.ofSeconds(5)); - dealDataToTableByBatch(poll); - customer.commitAsync(); - } - } - } - - /** - * Processes ConsumerRecords in batches and deals with the data to insert into a table. - * - * @param records The ConsumerRecords to process. - */ - // The data of a topic can only be written to the same table - public void dealDataToTableByBatch(final ConsumerRecords records) { - ArrayList appList = new ArrayList<>(); - BaseIService baseIService = null; - int partition = 0; - long offset = 0; - long startTime = System.nanoTime(); - for (ConsumerRecord record : records) { - String value = record.value(); - try { - Map dtoMap = ObjectMapperUtil.toMap(value); - String table = dtoMap.get("table").toString(); - baseIService = serviceMap.getIService(table + "Service"); - appList.add(value); - partition = record.partition(); - offset = record.offset(); - } catch (Exception e) { - LOGGER.error(e.getMessage() + ":" + value, e); - } - } - long endTime1 = System.nanoTime(); - long duration = (endTime1 - startTime) / 1000000; - LOGGER.info("处理records用时: " + duration + " 毫秒," + "数据量:" + appList.size()); - if (!appList.isEmpty()) { - LOGGER.info("partation: " + partition + ", offset: " + offset); - baseIService.saveDataObjectBatch(appList); - } - long endTime2 = System.nanoTime(); - duration = (endTime2 - endTime1) / 1000000; - LOGGER.info("写入数据库用时: " + duration + " 毫秒," + "数据量:" + appList.size()); - } - - /** - * Processes ConsumerRecords and deals with the data to insert into multiple tables. - * - * @param records The ConsumerRecords to process. - */ - // The data of a topic may be written to multiple tables - @Generated - public void dealDataToMultipleTables(final ConsumerRecords records) { - Map> resMap = new HashMap<>(); - int partition = 0; - long offset = 0; - - for (ConsumerRecord record : records) { - String value = record.value(); - try { - Map dtoMap = ObjectMapperUtil.toMap(value); - String table = dtoMap.get("table").toString(); - - if (!resMap.containsKey(table)) { - resMap.put(table, new ArrayList<>()); - } - - ArrayList tmp = resMap.get(table); - tmp.add(value); - resMap.put(table, tmp); - - partition = record.partition(); - offset = record.offset(); - } catch (Exception e) { - LOGGER.error(e.getMessage() + ": " + value, e); - } - } - resMap.forEach((table, values) -> { - if (!values.isEmpty()) { - serviceMap.getIService(table + "Service").saveDataObjectBatch(values); - } - }); - LOGGER.info("Partition: " + partition + ", Offset: " + offset); - } - -} diff --git a/src/main/java/com/easysoftware/kafka/EpkgConsumer.java b/src/main/java/com/easysoftware/kafka/EpkgConsumer.java deleted file mode 100644 index 224ceca698aa6cbb9550e015c2bc732dd9e47fd3..0000000000000000000000000000000000000000 --- a/src/main/java/com/easysoftware/kafka/EpkgConsumer.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.easysoftware.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Service; - -@Service -public class EpkgConsumer extends BaseConsumer { - // @Value("${consumer.topic.name}") - // String topicName; - - // @Value("${consumer.topic.offset}") - // String topicOffset; - - /** - * Listens for and processes ConsumerRecords of type . - * - * @param records The ConsumerRecords to process. - */ - @KafkaListener(topics = "software_test_epkg", concurrency = "3") - public void listen(final ConsumerRecords records) { - dealDataToTableByBatch(records); - } -} diff --git a/src/main/java/com/easysoftware/kafka/Producer.java b/src/main/java/com/easysoftware/kafka/Producer.java deleted file mode 100644 index a0910668cab177c4142c1046ba1d93c841cce9ad..0000000000000000000000000000000000000000 --- a/src/main/java/com/easysoftware/kafka/Producer.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.easysoftware.kafka; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; - -@Component -public class Producer { - - /** - * Autowired KafkaTemplate for producing messages. - */ - @Autowired - private KafkaTemplate kafkaTemplate; - - /** - * Static KafkaProducer for handling Kafka operations. - */ - private static KafkaProducer producer; - - /** - * Sends a message with the specified topic, key, and value. - * - * @param topic The Kafka topic to send the message to. - * @param key The key associated with the message. - * @param value The value of the message. - */ - public void sendMess(final String topic, final String key, final String value) { - ProducerRecord mess = new ProducerRecord(topic, key, value); - kafkaTemplate.send(mess); - } - -} diff --git a/src/main/java/com/easysoftware/kafka/RpmConsumer.java b/src/main/java/com/easysoftware/kafka/RpmConsumer.java deleted file mode 100644 index 8bce16d608d94b0e81460cc37e92a4ea3171a2ef..0000000000000000000000000000000000000000 --- a/src/main/java/com/easysoftware/kafka/RpmConsumer.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.easysoftware.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Service; - -@Service -public class RpmConsumer extends BaseConsumer { - /** - * Value for the Kafka consumer topic name. - */ - @Value("${consumer.topic.name}") - private String topicName; - - /** - * Value for the Kafka consumer topic offset. - */ - @Value("${consumer.topic.offset}") - private String topicOffset; - - /** - * Kafka listener method that listens to the "software_test_rpm" topic with concurrency set to 3. - * - * @param records The ConsumerRecords to process. - */ - @KafkaListener(topics = "software_test_rpm", concurrency = "3") - public void listen(final ConsumerRecords records) { - dealDataToTableByBatch(records); - } - -} diff --git a/src/main/java/com/easysoftware/kafka/VersionConsumer.java b/src/main/java/com/easysoftware/kafka/VersionConsumer.java deleted file mode 100644 index a02633d012b53cbe7d4ca4c2f4256b4c3dc91fc6..0000000000000000000000000000000000000000 --- a/src/main/java/com/easysoftware/kafka/VersionConsumer.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.easysoftware.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Service; - -@Service -public class VersionConsumer extends BaseConsumer { - - /** - * Listens for and processes ConsumerRecords of type . - * - * @param records The ConsumerRecords to process. - */ - @KafkaListener(topics = "software_test_version") - public void listen(final ConsumerRecords records) { - dealDataToTableByBatch(records); - } -}