From 9b829227028684ef3d8c0e56aff8acec9f2429e5 Mon Sep 17 00:00:00 2001 From: fuxinji9527 <1992666531@qq.com> Date: Thu, 9 May 2024 11:57:30 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=B3=A8=E9=87=8Akafka=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 18 +- .../ApplicationVersionServiceImpl.java | 17 +- .../epkgpackage/EPKGPackageServiceImpl.java | 17 +- .../rpmpackage/RPMPackageServiceImpl.java | 17 +- .../common/config/KafkaConsumerConfig.java | 182 ++++++------ .../easysoftware/kafka/AppPkgConsumer.java | 38 +-- .../com/easysoftware/kafka/BaseConsumer.java | 276 +++++++++--------- .../com/easysoftware/kafka/EpkgConsumer.java | 48 +-- .../java/com/easysoftware/kafka/Producer.java | 70 ++--- .../com/easysoftware/kafka/RpmConsumer.java | 64 ++-- .../easysoftware/kafka/VersionConsumer.java | 38 +-- 11 files changed, 388 insertions(+), 397 deletions(-) diff --git a/pom.xml b/pom.xml index 509b378..232d87b 100644 --- a/pom.xml +++ b/pom.xml @@ -148,11 +148,11 @@ 1.78.1 - - org.apache.kafka - kafka-clients - 3.7.0 - + + + + + @@ -167,10 +167,10 @@ 4.5.13 - - org.springframework.kafka - spring-kafka - + + + + commons-codec diff --git a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java index 11f9a65..5d90fdd 100644 --- a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java +++ b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java @@ -6,15 +6,12 @@ import com.easysoftware.application.applicationversion.dto.InputApplicationVersi import com.easysoftware.common.entity.MessageCode; import com.easysoftware.common.utils.ObjectMapperUtil; import com.easysoftware.common.utils.ResultUtil; -import com.easysoftware.common.utils.UuidUtil; import com.easysoftware.domain.applicationversion.ApplicationVersion; import com.easysoftware.domain.applicationversion.gateway.ApplicationVersionGateway; import com.easysoftware.infrastructure.applicationversion.gatewayimpl.dataobject.ApplicationVersionDO; import com.easysoftware.infrastructure.mapper.ApplicationVersionDOMapper; -import com.easysoftware.kafka.Producer; import jakarta.annotation.Resource; import org.springframework.beans.BeanUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -28,11 +25,11 @@ import java.util.Map; public class ApplicationVersionServiceImpl extends ServiceImpl implements ApplicationVersionService { - /** - * Autowired Kafka producer for sending messages. - */ - @Autowired - private Producer kafkaProducer; +// /** +// * Autowired Kafka producer for sending messages. +// */ +// @Autowired +// private Producer kafkaProducer; /** * Topic name for the Kafka producer related to application versions. @@ -71,8 +68,8 @@ public class ApplicationVersionServiceImpl extends ServiceImpl kafkaMsg = ObjectMapperUtil.jsonToMap(appVersion); kafkaMsg.put("table", "ApplicationVersion"); kafkaMsg.put("unique", inputAppVersion.getName()); - kafkaProducer.sendMess(topicAppVersion + "_version", - UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg)); +// kafkaProducer.sendMess(topicAppVersion + "_version", +// UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg)); return ResultUtil.success(HttpStatus.OK); } diff --git a/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java b/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java index 5448e04..27bd76b 100644 --- a/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java +++ b/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java @@ -6,17 +6,14 @@ import com.easysoftware.application.epkgpackage.dto.InputEPKGPackage; import com.easysoftware.application.epkgpackage.vo.EPKGPackageDetailVo; import com.easysoftware.common.utils.ObjectMapperUtil; import com.easysoftware.common.utils.ResultUtil; -import com.easysoftware.common.utils.UuidUtil; import com.easysoftware.domain.epkgpackage.EPKGPackage; import com.easysoftware.domain.epkgpackage.EPKGPackageUnique; import com.easysoftware.domain.epkgpackage.gateway.EPKGPackageGateway; import com.easysoftware.infrastructure.epkgpackage.gatewayimpl.dataobject.EPKGPackageDO; import com.easysoftware.infrastructure.mapper.EPKGPackageDOMapper; -import com.easysoftware.kafka.Producer; import jakarta.annotation.Resource; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -35,11 +32,11 @@ public class EPKGPackageServiceImpl extends @Resource private EPKGPackageGateway ePKGPackageGateway; - /** - * Kafka producer for messaging. - */ - @Autowired - private Producer kafkaProducer; +// /** +// * Kafka producer for messaging. +// */ +// @Autowired +// private Producer kafkaProducer; /** * API endpoint for repository maintainers. @@ -86,8 +83,8 @@ public class EPKGPackageServiceImpl extends Map kafkaMsg = ObjectMapperUtil.jsonToMap(inputEPKGPackage); kafkaMsg.put("table", "EPKGPackage"); - kafkaProducer.sendMess(topicAppVersion + "_epkg", UuidUtil.getUUID32(), - ObjectMapperUtil.writeValueAsString(kafkaMsg)); +// kafkaProducer.sendMess(topicAppVersion + "_epkg", UuidUtil.getUUID32(), +// ObjectMapperUtil.writeValueAsString(kafkaMsg)); return ResultUtil.success(HttpStatus.OK); } diff --git a/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java b/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java index 21c4713..9bad749 100644 --- a/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java +++ b/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java @@ -7,17 +7,14 @@ import com.easysoftware.application.rpmpackage.vo.RPMPackageDetailVo; import com.easysoftware.application.rpmpackage.vo.RPMPackageDomainVo; import com.easysoftware.common.utils.ObjectMapperUtil; import com.easysoftware.common.utils.ResultUtil; -import com.easysoftware.common.utils.UuidUtil; import com.easysoftware.domain.rpmpackage.RPMPackage; import com.easysoftware.domain.rpmpackage.RPMPackageUnique; import com.easysoftware.domain.rpmpackage.gateway.RPMPackageGateway; import com.easysoftware.infrastructure.mapper.RPMPackageDOMapper; import com.easysoftware.infrastructure.rpmpackage.gatewayimpl.dataobject.RPMPackageDO; -import com.easysoftware.kafka.Producer; import jakarta.annotation.Resource; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Primary; import org.springframework.http.HttpStatus; @@ -32,11 +29,11 @@ import java.util.Map; @Primary @Service("RPMPackageService") public class RPMPackageServiceImpl extends ServiceImpl implements RPMPackageService { - /** - * Autowired Kafka producer. - */ - @Autowired - private Producer kafkaProducer; +// /** +// * Autowired Kafka producer. +// */ +// @Autowired +// private Producer kafkaProducer; /** * Resource for RPM Package Gateway. @@ -106,8 +103,8 @@ public class RPMPackageServiceImpl extends ServiceImpl kafkaMsg = ObjectMapperUtil.jsonToMap(inputrPMPackage); kafkaMsg.put("table", "RPMPackage"); - kafkaProducer.sendMess(topicAppVersion + "_rpm", - UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg)); +// kafkaProducer.sendMess(topicAppVersion + "_rpm", +// UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg)); return ResultUtil.success(HttpStatus.OK); } diff --git a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java b/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java index 802ad70..8aa73a1 100644 --- a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java +++ b/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java @@ -1,91 +1,91 @@ -package com.easysoftware.common.config; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; - -import java.util.HashMap; -import java.util.Map; - -@Configuration -@EnableKafka -public class KafkaConsumerConfig { - - /** - * Bootstrap servers for Kafka connection. - */ - @Value("${bootstrap.servers}") - private String bootstrapServers; - - /** - * Consumer group ID for Kafka consumer. - */ - @Value("${consumer.groupId}") - private String groupId; - - /** - * SASL JAAS configuration for authentication. - */ - @Value("${spring.kafka.properties.sasl.jaas.config}") - private String authConfig; - - /** - * SASL mechanism for authentication. - */ - @Value("${spring.kafka.properties.sasl.mechanism}") - private String mechanism; - - /** - * Location of the SSL trust store. - */ - @Value("${spring.kafka.properties.ssl.truststore.location}") - private String trustStoreLocation; - - - /** - * Configures a ConsumerFactory for processing Kafka messages with String key and value types. - * - * @return The configured ConsumerFactory. - */ - @Bean - public ConsumerFactory consumerFactory() { - Map configProps = new HashMap<>(); - configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - - // add SASL_SSL config - configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name); - configProps.put(SaslConfigs.SASL_MECHANISM, mechanism); - configProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - configProps.put(SaslConfigs.SASL_JAAS_CONFIG, authConfig); - configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation); - - return new DefaultKafkaConsumerFactory<>(configProps); - } - - /** - * Configures a Kafka listener container factory for processing Kafka messages. - * - * @return The ConcurrentKafkaListenerContainerFactory for String key and value types. - */ - @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory - = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory()); - factory.setBatchListener(true); - return factory; - } -} +//package com.easysoftware.common.config; +// +//import org.apache.kafka.clients.CommonClientConfigs; +//import org.apache.kafka.clients.consumer.ConsumerConfig; +//import org.apache.kafka.common.config.SaslConfigs; +//import org.apache.kafka.common.config.SslConfigs; +//import org.apache.kafka.common.security.auth.SecurityProtocol; +//import org.apache.kafka.common.serialization.StringDeserializer; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.kafka.annotation.EnableKafka; +//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +//import org.springframework.kafka.core.ConsumerFactory; +//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +// +//import java.util.HashMap; +//import java.util.Map; +// +//@Configuration +//@EnableKafka +//public class KafkaConsumerConfig { +// +// /** +// * Bootstrap servers for Kafka connection. +// */ +// @Value("${bootstrap.servers}") +// private String bootstrapServers; +// +// /** +// * Consumer group ID for Kafka consumer. +// */ +// @Value("${consumer.groupId}") +// private String groupId; +// +// /** +// * SASL JAAS configuration for authentication. +// */ +// @Value("${spring.kafka.properties.sasl.jaas.config}") +// private String authConfig; +// +// /** +// * SASL mechanism for authentication. +// */ +// @Value("${spring.kafka.properties.sasl.mechanism}") +// private String mechanism; +// +// /** +// * Location of the SSL trust store. +// */ +// @Value("${spring.kafka.properties.ssl.truststore.location}") +// private String trustStoreLocation; +// +// +// /** +// * Configures a ConsumerFactory for processing Kafka messages with String key and value types. +// * +// * @return The configured ConsumerFactory. +// */ +// @Bean +// public ConsumerFactory consumerFactory() { +// Map configProps = new HashMap<>(); +// configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); +// configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); +// configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); +// configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); +// +// // add SASL_SSL config +// configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name); +// configProps.put(SaslConfigs.SASL_MECHANISM, mechanism); +// configProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); +// configProps.put(SaslConfigs.SASL_JAAS_CONFIG, authConfig); +// configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation); +// +// return new DefaultKafkaConsumerFactory<>(configProps); +// } +// +// /** +// * Configures a Kafka listener container factory for processing Kafka messages. +// * +// * @return The ConcurrentKafkaListenerContainerFactory for String key and value types. +// */ +// @Bean +// public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { +// ConcurrentKafkaListenerContainerFactory factory +// = new ConcurrentKafkaListenerContainerFactory<>(); +// factory.setConsumerFactory(consumerFactory()); +// factory.setBatchListener(true); +// return factory; +// } +//} diff --git a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java b/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java index 6efbe8a..a2a5fdc 100644 --- a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java +++ b/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java @@ -1,19 +1,19 @@ -package com.easysoftware.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Service; - -@Service -public class AppPkgConsumer extends BaseConsumer { - - /** - * Listens for and processes ConsumerRecords of type . - * - * @param records The ConsumerRecords to process. - */ - @KafkaListener(topics = "software_test_app") - public void listen(final ConsumerRecords records) { - dealDataToTableByBatch(records); - } -} +//package com.easysoftware.kafka; +// +//import org.apache.kafka.clients.consumer.ConsumerRecords; +//import org.springframework.kafka.annotation.KafkaListener; +//import org.springframework.stereotype.Service; +// +//@Service +//public class AppPkgConsumer extends BaseConsumer { +// +// /** +// * Listens for and processes ConsumerRecords of type . +// * +// * @param records The ConsumerRecords to process. +// */ +// @KafkaListener(topics = "software_test_app") +// public void listen(final ConsumerRecords records) { +// dealDataToTableByBatch(records); +// } +//} diff --git a/src/main/java/com/easysoftware/kafka/BaseConsumer.java b/src/main/java/com/easysoftware/kafka/BaseConsumer.java index 3d2dff9..7842910 100644 --- a/src/main/java/com/easysoftware/kafka/BaseConsumer.java +++ b/src/main/java/com/easysoftware/kafka/BaseConsumer.java @@ -1,138 +1,138 @@ -package com.easysoftware.kafka; - -import com.easysoftware.application.BaseIService; -import com.easysoftware.application.ServiceMap; -import com.easysoftware.common.utils.ObjectMapperUtil; -import lombok.Generated; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -public class BaseConsumer { - - /** - * Autowired ServiceMap instance. - */ - @Autowired - private ServiceMap serviceMap; - - /** - * Logger for BaseConsumer class. - */ - private static final Logger LOGGER = LoggerFactory.getLogger(BaseConsumer.class); - - /** - * List to hold KafkaConsumer instances for String keys and values. - */ - private final ArrayList> kafkaConsumerList = new ArrayList<>(); - - - /** - * Custom tasks method to perform Kafka to MySQL data transfer. - */ - // @Scheduled(fixedRate = 5000) - @Generated - public void tasks() { - kafkaToMysql(); - } - - /** - * Method to transfer data from Kafka to MySQL by processing ConsumerRecords. - */ - @Generated - public void kafkaToMysql() { - while (true) { - for (KafkaConsumer customer : kafkaConsumerList) { - ConsumerRecords poll = customer.poll(Duration.ofSeconds(5)); - dealDataToTableByBatch(poll); - customer.commitAsync(); - } - } - } - - /** - * Processes ConsumerRecords in batches and deals with the data to insert into a table. - * - * @param records The ConsumerRecords to process. - */ - // The data of a topic can only be written to the same table - public void dealDataToTableByBatch(final ConsumerRecords records) { - ArrayList appList = new ArrayList<>(); - BaseIService baseIService = null; - int partition = 0; - long offset = 0; - long startTime = System.nanoTime(); - for (ConsumerRecord record : records) { - String value = record.value(); - try { - Map dtoMap = ObjectMapperUtil.toMap(value); - String table = dtoMap.get("table").toString(); - baseIService = serviceMap.getIService(table + "Service"); - appList.add(value); - partition = record.partition(); - offset = record.offset(); - } catch (Exception e) { - LOGGER.error(e.getMessage() + ":" + value, e); - } - } - long endTime1 = System.nanoTime(); - long duration = (endTime1 - startTime) / 1000000; - LOGGER.info("处理records用时: " + duration + " 毫秒," + "数据量:" + appList.size()); - if (!appList.isEmpty()) { - LOGGER.info("partation: " + partition + ", offset: " + offset); - baseIService.saveDataObjectBatch(appList); - } - long endTime2 = System.nanoTime(); - duration = (endTime2 - endTime1) / 1000000; - LOGGER.info("写入数据库用时: " + duration + " 毫秒," + "数据量:" + appList.size()); - } - - /** - * Processes ConsumerRecords and deals with the data to insert into multiple tables. - * - * @param records The ConsumerRecords to process. - */ - // The data of a topic may be written to multiple tables - @Generated - public void dealDataToMultipleTables(final ConsumerRecords records) { - Map> resMap = new HashMap<>(); - int partition = 0; - long offset = 0; - - for (ConsumerRecord record : records) { - String value = record.value(); - try { - Map dtoMap = ObjectMapperUtil.toMap(value); - String table = dtoMap.get("table").toString(); - - if (!resMap.containsKey(table)) { - resMap.put(table, new ArrayList<>()); - } - - ArrayList tmp = resMap.get(table); - tmp.add(value); - resMap.put(table, tmp); - - partition = record.partition(); - offset = record.offset(); - } catch (Exception e) { - LOGGER.error(e.getMessage() + ": " + value, e); - } - } - resMap.forEach((table, values) -> { - if (!values.isEmpty()) { - serviceMap.getIService(table + "Service").saveDataObjectBatch(values); - } - }); - LOGGER.info("Partition: " + partition + ", Offset: " + offset); - } - -} +//package com.easysoftware.kafka; +// +//import com.easysoftware.application.BaseIService; +//import com.easysoftware.application.ServiceMap; +//import com.easysoftware.common.utils.ObjectMapperUtil; +//import lombok.Generated; +//import org.apache.kafka.clients.consumer.ConsumerRecord; +//import org.apache.kafka.clients.consumer.ConsumerRecords; +//import org.apache.kafka.clients.consumer.KafkaConsumer; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +//import org.springframework.beans.factory.annotation.Autowired; +// +//import java.time.Duration; +//import java.util.ArrayList; +//import java.util.HashMap; +//import java.util.Map; +// +//public class BaseConsumer { +// +// /** +// * Autowired ServiceMap instance. +// */ +// @Autowired +// private ServiceMap serviceMap; +// +// /** +// * Logger for BaseConsumer class. +// */ +// private static final Logger LOGGER = LoggerFactory.getLogger(BaseConsumer.class); +// +// /** +// * List to hold KafkaConsumer instances for String keys and values. +// */ +// private final ArrayList> kafkaConsumerList = new ArrayList<>(); +// +// +// /** +// * Custom tasks method to perform Kafka to MySQL data transfer. +// */ +// // @Scheduled(fixedRate = 5000) +// @Generated +// public void tasks() { +// kafkaToMysql(); +// } +// +// /** +// * Method to transfer data from Kafka to MySQL by processing ConsumerRecords. +// */ +// @Generated +// public void kafkaToMysql() { +// while (true) { +// for (KafkaConsumer customer : kafkaConsumerList) { +// ConsumerRecords poll = customer.poll(Duration.ofSeconds(5)); +// dealDataToTableByBatch(poll); +// customer.commitAsync(); +// } +// } +// } +// +// /** +// * Processes ConsumerRecords in batches and deals with the data to insert into a table. +// * +// * @param records The ConsumerRecords to process. +// */ +// // The data of a topic can only be written to the same table +// public void dealDataToTableByBatch(final ConsumerRecords records) { +// ArrayList appList = new ArrayList<>(); +// BaseIService baseIService = null; +// int partition = 0; +// long offset = 0; +// long startTime = System.nanoTime(); +// for (ConsumerRecord record : records) { +// String value = record.value(); +// try { +// Map dtoMap = ObjectMapperUtil.toMap(value); +// String table = dtoMap.get("table").toString(); +// baseIService = serviceMap.getIService(table + "Service"); +// appList.add(value); +// partition = record.partition(); +// offset = record.offset(); +// } catch (Exception e) { +// LOGGER.error(e.getMessage() + ":" + value, e); +// } +// } +// long endTime1 = System.nanoTime(); +// long duration = (endTime1 - startTime) / 1000000; +// LOGGER.info("处理records用时: " + duration + " 毫秒," + "数据量:" + appList.size()); +// if (!appList.isEmpty()) { +// LOGGER.info("partation: " + partition + ", offset: " + offset); +// baseIService.saveDataObjectBatch(appList); +// } +// long endTime2 = System.nanoTime(); +// duration = (endTime2 - endTime1) / 1000000; +// LOGGER.info("写入数据库用时: " + duration + " 毫秒," + "数据量:" + appList.size()); +// } +// +// /** +// * Processes ConsumerRecords and deals with the data to insert into multiple tables. +// * +// * @param records The ConsumerRecords to process. +// */ +// // The data of a topic may be written to multiple tables +// @Generated +// public void dealDataToMultipleTables(final ConsumerRecords records) { +// Map> resMap = new HashMap<>(); +// int partition = 0; +// long offset = 0; +// +// for (ConsumerRecord record : records) { +// String value = record.value(); +// try { +// Map dtoMap = ObjectMapperUtil.toMap(value); +// String table = dtoMap.get("table").toString(); +// +// if (!resMap.containsKey(table)) { +// resMap.put(table, new ArrayList<>()); +// } +// +// ArrayList tmp = resMap.get(table); +// tmp.add(value); +// resMap.put(table, tmp); +// +// partition = record.partition(); +// offset = record.offset(); +// } catch (Exception e) { +// LOGGER.error(e.getMessage() + ": " + value, e); +// } +// } +// resMap.forEach((table, values) -> { +// if (!values.isEmpty()) { +// serviceMap.getIService(table + "Service").saveDataObjectBatch(values); +// } +// }); +// LOGGER.info("Partition: " + partition + ", Offset: " + offset); +// } +// +//} diff --git a/src/main/java/com/easysoftware/kafka/EpkgConsumer.java b/src/main/java/com/easysoftware/kafka/EpkgConsumer.java index 224ceca..3d3d50d 100644 --- a/src/main/java/com/easysoftware/kafka/EpkgConsumer.java +++ b/src/main/java/com/easysoftware/kafka/EpkgConsumer.java @@ -1,24 +1,24 @@ -package com.easysoftware.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Service; - -@Service -public class EpkgConsumer extends BaseConsumer { - // @Value("${consumer.topic.name}") - // String topicName; - - // @Value("${consumer.topic.offset}") - // String topicOffset; - - /** - * Listens for and processes ConsumerRecords of type . - * - * @param records The ConsumerRecords to process. - */ - @KafkaListener(topics = "software_test_epkg", concurrency = "3") - public void listen(final ConsumerRecords records) { - dealDataToTableByBatch(records); - } -} +//package com.easysoftware.kafka; +// +//import org.apache.kafka.clients.consumer.ConsumerRecords; +//import org.springframework.kafka.annotation.KafkaListener; +//import org.springframework.stereotype.Service; +// +//@Service +//public class EpkgConsumer extends BaseConsumer { +// // @Value("${consumer.topic.name}") +// // String topicName; +// +// // @Value("${consumer.topic.offset}") +// // String topicOffset; +// +// /** +// * Listens for and processes ConsumerRecords of type . +// * +// * @param records The ConsumerRecords to process. +// */ +// @KafkaListener(topics = "software_test_epkg", concurrency = "3") +// public void listen(final ConsumerRecords records) { +// dealDataToTableByBatch(records); +// } +//} diff --git a/src/main/java/com/easysoftware/kafka/Producer.java b/src/main/java/com/easysoftware/kafka/Producer.java index a091066..c5f983c 100644 --- a/src/main/java/com/easysoftware/kafka/Producer.java +++ b/src/main/java/com/easysoftware/kafka/Producer.java @@ -1,35 +1,35 @@ -package com.easysoftware.kafka; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Component; - -@Component -public class Producer { - - /** - * Autowired KafkaTemplate for producing messages. - */ - @Autowired - private KafkaTemplate kafkaTemplate; - - /** - * Static KafkaProducer for handling Kafka operations. - */ - private static KafkaProducer producer; - - /** - * Sends a message with the specified topic, key, and value. - * - * @param topic The Kafka topic to send the message to. - * @param key The key associated with the message. - * @param value The value of the message. - */ - public void sendMess(final String topic, final String key, final String value) { - ProducerRecord mess = new ProducerRecord(topic, key, value); - kafkaTemplate.send(mess); - } - -} +//package com.easysoftware.kafka; +// +//import org.apache.kafka.clients.producer.KafkaProducer; +//import org.apache.kafka.clients.producer.ProducerRecord; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.kafka.core.KafkaTemplate; +//import org.springframework.stereotype.Component; +// +//@Component +//public class Producer { +// +// /** +// * Autowired KafkaTemplate for producing messages. +// */ +// @Autowired +// private KafkaTemplate kafkaTemplate; +// +// /** +// * Static KafkaProducer for handling Kafka operations. +// */ +// private static KafkaProducer producer; +// +// /** +// * Sends a message with the specified topic, key, and value. +// * +// * @param topic The Kafka topic to send the message to. +// * @param key The key associated with the message. +// * @param value The value of the message. +// */ +// public void sendMess(final String topic, final String key, final String value) { +// ProducerRecord mess = new ProducerRecord(topic, key, value); +// kafkaTemplate.send(mess); +// } +// +//} diff --git a/src/main/java/com/easysoftware/kafka/RpmConsumer.java b/src/main/java/com/easysoftware/kafka/RpmConsumer.java index 8bce16d..b035056 100644 --- a/src/main/java/com/easysoftware/kafka/RpmConsumer.java +++ b/src/main/java/com/easysoftware/kafka/RpmConsumer.java @@ -1,32 +1,32 @@ -package com.easysoftware.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Service; - -@Service -public class RpmConsumer extends BaseConsumer { - /** - * Value for the Kafka consumer topic name. - */ - @Value("${consumer.topic.name}") - private String topicName; - - /** - * Value for the Kafka consumer topic offset. - */ - @Value("${consumer.topic.offset}") - private String topicOffset; - - /** - * Kafka listener method that listens to the "software_test_rpm" topic with concurrency set to 3. - * - * @param records The ConsumerRecords to process. - */ - @KafkaListener(topics = "software_test_rpm", concurrency = "3") - public void listen(final ConsumerRecords records) { - dealDataToTableByBatch(records); - } - -} +//package com.easysoftware.kafka; +// +//import org.apache.kafka.clients.consumer.ConsumerRecords; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.kafka.annotation.KafkaListener; +//import org.springframework.stereotype.Service; +// +//@Service +//public class RpmConsumer extends BaseConsumer { +// /** +// * Value for the Kafka consumer topic name. +// */ +// @Value("${consumer.topic.name}") +// private String topicName; +// +// /** +// * Value for the Kafka consumer topic offset. +// */ +// @Value("${consumer.topic.offset}") +// private String topicOffset; +// +// /** +// * Kafka listener method that listens to the "software_test_rpm" topic with concurrency set to 3. +// * +// * @param records The ConsumerRecords to process. +// */ +// @KafkaListener(topics = "software_test_rpm", concurrency = "3") +// public void listen(final ConsumerRecords records) { +// dealDataToTableByBatch(records); +// } +// +//} diff --git a/src/main/java/com/easysoftware/kafka/VersionConsumer.java b/src/main/java/com/easysoftware/kafka/VersionConsumer.java index a02633d..62afb70 100644 --- a/src/main/java/com/easysoftware/kafka/VersionConsumer.java +++ b/src/main/java/com/easysoftware/kafka/VersionConsumer.java @@ -1,19 +1,19 @@ -package com.easysoftware.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Service; - -@Service -public class VersionConsumer extends BaseConsumer { - - /** - * Listens for and processes ConsumerRecords of type . - * - * @param records The ConsumerRecords to process. - */ - @KafkaListener(topics = "software_test_version") - public void listen(final ConsumerRecords records) { - dealDataToTableByBatch(records); - } -} +//package com.easysoftware.kafka; +// +//import org.apache.kafka.clients.consumer.ConsumerRecords; +//import org.springframework.kafka.annotation.KafkaListener; +//import org.springframework.stereotype.Service; +// +//@Service +//public class VersionConsumer extends BaseConsumer { +// +// /** +// * Listens for and processes ConsumerRecords of type . +// * +// * @param records The ConsumerRecords to process. +// */ +// @KafkaListener(topics = "software_test_version") +// public void listen(final ConsumerRecords records) { +// dealDataToTableByBatch(records); +// } +//} -- Gitee From 8c6bafdd855d2e7315e5645269ef06b0594682fb Mon Sep 17 00:00:00 2001 From: fuxinji9527 <1992666531@qq.com> Date: Fri, 10 May 2024 10:03:16 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=88=A0=E9=99=A4kafka=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 11 -- .../ApplicationVersionServiceImpl.java | 8 - .../epkgpackage/EPKGPackageServiceImpl.java | 8 - .../rpmpackage/RPMPackageServiceImpl.java | 7 - .../common/config/KafkaConsumerConfig.java | 91 ------------ .../easysoftware/kafka/AppPkgConsumer.java | 19 --- .../com/easysoftware/kafka/BaseConsumer.java | 138 ------------------ .../com/easysoftware/kafka/EpkgConsumer.java | 24 --- .../java/com/easysoftware/kafka/Producer.java | 35 ----- .../com/easysoftware/kafka/RpmConsumer.java | 32 ---- .../easysoftware/kafka/VersionConsumer.java | 19 --- 11 files changed, 392 deletions(-) delete mode 100644 src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java delete mode 100644 src/main/java/com/easysoftware/kafka/AppPkgConsumer.java delete mode 100644 src/main/java/com/easysoftware/kafka/BaseConsumer.java delete mode 100644 src/main/java/com/easysoftware/kafka/EpkgConsumer.java delete mode 100644 src/main/java/com/easysoftware/kafka/Producer.java delete mode 100644 src/main/java/com/easysoftware/kafka/RpmConsumer.java delete mode 100644 src/main/java/com/easysoftware/kafka/VersionConsumer.java diff --git a/pom.xml b/pom.xml index 232d87b..3728840 100644 --- a/pom.xml +++ b/pom.xml @@ -147,12 +147,6 @@ bcprov-jdk18on 1.78.1 - - - - - - @@ -167,11 +161,6 @@ 4.5.13 - - - - - commons-codec commons-codec diff --git a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java index 5d90fdd..5e590bb 100644 --- a/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java +++ b/src/main/java/com/easysoftware/application/applicationversion/ApplicationVersionServiceImpl.java @@ -25,12 +25,6 @@ import java.util.Map; public class ApplicationVersionServiceImpl extends ServiceImpl implements ApplicationVersionService { -// /** -// * Autowired Kafka producer for sending messages. -// */ -// @Autowired -// private Producer kafkaProducer; - /** * Topic name for the Kafka producer related to application versions. */ @@ -68,8 +62,6 @@ public class ApplicationVersionServiceImpl extends ServiceImpl kafkaMsg = ObjectMapperUtil.jsonToMap(appVersion); kafkaMsg.put("table", "ApplicationVersion"); kafkaMsg.put("unique", inputAppVersion.getName()); -// kafkaProducer.sendMess(topicAppVersion + "_version", -// UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg)); return ResultUtil.success(HttpStatus.OK); } diff --git a/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java b/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java index 27bd76b..2a363a9 100644 --- a/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java +++ b/src/main/java/com/easysoftware/application/epkgpackage/EPKGPackageServiceImpl.java @@ -32,12 +32,6 @@ public class EPKGPackageServiceImpl extends @Resource private EPKGPackageGateway ePKGPackageGateway; -// /** -// * Kafka producer for messaging. -// */ -// @Autowired -// private Producer kafkaProducer; - /** * API endpoint for repository maintainers. */ @@ -83,8 +77,6 @@ public class EPKGPackageServiceImpl extends Map kafkaMsg = ObjectMapperUtil.jsonToMap(inputEPKGPackage); kafkaMsg.put("table", "EPKGPackage"); -// kafkaProducer.sendMess(topicAppVersion + "_epkg", UuidUtil.getUUID32(), -// ObjectMapperUtil.writeValueAsString(kafkaMsg)); return ResultUtil.success(HttpStatus.OK); } diff --git a/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java b/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java index 9bad749..66d5588 100644 --- a/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java +++ b/src/main/java/com/easysoftware/application/rpmpackage/RPMPackageServiceImpl.java @@ -29,11 +29,6 @@ import java.util.Map; @Primary @Service("RPMPackageService") public class RPMPackageServiceImpl extends ServiceImpl implements RPMPackageService { -// /** -// * Autowired Kafka producer. -// */ -// @Autowired -// private Producer kafkaProducer; /** * Resource for RPM Package Gateway. @@ -103,8 +98,6 @@ public class RPMPackageServiceImpl extends ServiceImpl kafkaMsg = ObjectMapperUtil.jsonToMap(inputrPMPackage); kafkaMsg.put("table", "RPMPackage"); -// kafkaProducer.sendMess(topicAppVersion + "_rpm", -// UuidUtil.getUUID32(), ObjectMapperUtil.writeValueAsString(kafkaMsg)); return ResultUtil.success(HttpStatus.OK); } diff --git a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java b/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java deleted file mode 100644 index 8aa73a1..0000000 --- a/src/main/java/com/easysoftware/common/config/KafkaConsumerConfig.java +++ /dev/null @@ -1,91 +0,0 @@ -//package com.easysoftware.common.config; -// -//import org.apache.kafka.clients.CommonClientConfigs; -//import org.apache.kafka.clients.consumer.ConsumerConfig; -//import org.apache.kafka.common.config.SaslConfigs; -//import org.apache.kafka.common.config.SslConfigs; -//import org.apache.kafka.common.security.auth.SecurityProtocol; -//import org.apache.kafka.common.serialization.StringDeserializer; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.context.annotation.Bean; -//import org.springframework.context.annotation.Configuration; -//import org.springframework.kafka.annotation.EnableKafka; -//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -//import org.springframework.kafka.core.ConsumerFactory; -//import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -// -//import java.util.HashMap; -//import java.util.Map; -// -//@Configuration -//@EnableKafka -//public class KafkaConsumerConfig { -// -// /** -// * Bootstrap servers for Kafka connection. -// */ -// @Value("${bootstrap.servers}") -// private String bootstrapServers; -// -// /** -// * Consumer group ID for Kafka consumer. -// */ -// @Value("${consumer.groupId}") -// private String groupId; -// -// /** -// * SASL JAAS configuration for authentication. -// */ -// @Value("${spring.kafka.properties.sasl.jaas.config}") -// private String authConfig; -// -// /** -// * SASL mechanism for authentication. -// */ -// @Value("${spring.kafka.properties.sasl.mechanism}") -// private String mechanism; -// -// /** -// * Location of the SSL trust store. -// */ -// @Value("${spring.kafka.properties.ssl.truststore.location}") -// private String trustStoreLocation; -// -// -// /** -// * Configures a ConsumerFactory for processing Kafka messages with String key and value types. -// * -// * @return The configured ConsumerFactory. -// */ -// @Bean -// public ConsumerFactory consumerFactory() { -// Map configProps = new HashMap<>(); -// configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); -// configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); -// configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); -// configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); -// -// // add SASL_SSL config -// configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name); -// configProps.put(SaslConfigs.SASL_MECHANISM, mechanism); -// configProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); -// configProps.put(SaslConfigs.SASL_JAAS_CONFIG, authConfig); -// configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation); -// -// return new DefaultKafkaConsumerFactory<>(configProps); -// } -// -// /** -// * Configures a Kafka listener container factory for processing Kafka messages. -// * -// * @return The ConcurrentKafkaListenerContainerFactory for String key and value types. -// */ -// @Bean -// public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { -// ConcurrentKafkaListenerContainerFactory factory -// = new ConcurrentKafkaListenerContainerFactory<>(); -// factory.setConsumerFactory(consumerFactory()); -// factory.setBatchListener(true); -// return factory; -// } -//} diff --git a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java b/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java deleted file mode 100644 index a2a5fdc..0000000 --- a/src/main/java/com/easysoftware/kafka/AppPkgConsumer.java +++ /dev/null @@ -1,19 +0,0 @@ -//package com.easysoftware.kafka; -// -//import org.apache.kafka.clients.consumer.ConsumerRecords; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.stereotype.Service; -// -//@Service -//public class AppPkgConsumer extends BaseConsumer { -// -// /** -// * Listens for and processes ConsumerRecords of type . -// * -// * @param records The ConsumerRecords to process. -// */ -// @KafkaListener(topics = "software_test_app") -// public void listen(final ConsumerRecords records) { -// dealDataToTableByBatch(records); -// } -//} diff --git a/src/main/java/com/easysoftware/kafka/BaseConsumer.java b/src/main/java/com/easysoftware/kafka/BaseConsumer.java deleted file mode 100644 index 7842910..0000000 --- a/src/main/java/com/easysoftware/kafka/BaseConsumer.java +++ /dev/null @@ -1,138 +0,0 @@ -//package com.easysoftware.kafka; -// -//import com.easysoftware.application.BaseIService; -//import com.easysoftware.application.ServiceMap; -//import com.easysoftware.common.utils.ObjectMapperUtil; -//import lombok.Generated; -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.apache.kafka.clients.consumer.ConsumerRecords; -//import org.apache.kafka.clients.consumer.KafkaConsumer; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -//import org.springframework.beans.factory.annotation.Autowired; -// -//import java.time.Duration; -//import java.util.ArrayList; -//import java.util.HashMap; -//import java.util.Map; -// -//public class BaseConsumer { -// -// /** -// * Autowired ServiceMap instance. -// */ -// @Autowired -// private ServiceMap serviceMap; -// -// /** -// * Logger for BaseConsumer class. -// */ -// private static final Logger LOGGER = LoggerFactory.getLogger(BaseConsumer.class); -// -// /** -// * List to hold KafkaConsumer instances for String keys and values. -// */ -// private final ArrayList> kafkaConsumerList = new ArrayList<>(); -// -// -// /** -// * Custom tasks method to perform Kafka to MySQL data transfer. -// */ -// // @Scheduled(fixedRate = 5000) -// @Generated -// public void tasks() { -// kafkaToMysql(); -// } -// -// /** -// * Method to transfer data from Kafka to MySQL by processing ConsumerRecords. -// */ -// @Generated -// public void kafkaToMysql() { -// while (true) { -// for (KafkaConsumer customer : kafkaConsumerList) { -// ConsumerRecords poll = customer.poll(Duration.ofSeconds(5)); -// dealDataToTableByBatch(poll); -// customer.commitAsync(); -// } -// } -// } -// -// /** -// * Processes ConsumerRecords in batches and deals with the data to insert into a table. -// * -// * @param records The ConsumerRecords to process. -// */ -// // The data of a topic can only be written to the same table -// public void dealDataToTableByBatch(final ConsumerRecords records) { -// ArrayList appList = new ArrayList<>(); -// BaseIService baseIService = null; -// int partition = 0; -// long offset = 0; -// long startTime = System.nanoTime(); -// for (ConsumerRecord record : records) { -// String value = record.value(); -// try { -// Map dtoMap = ObjectMapperUtil.toMap(value); -// String table = dtoMap.get("table").toString(); -// baseIService = serviceMap.getIService(table + "Service"); -// appList.add(value); -// partition = record.partition(); -// offset = record.offset(); -// } catch (Exception e) { -// LOGGER.error(e.getMessage() + ":" + value, e); -// } -// } -// long endTime1 = System.nanoTime(); -// long duration = (endTime1 - startTime) / 1000000; -// LOGGER.info("处理records用时: " + duration + " 毫秒," + "数据量:" + appList.size()); -// if (!appList.isEmpty()) { -// LOGGER.info("partation: " + partition + ", offset: " + offset); -// baseIService.saveDataObjectBatch(appList); -// } -// long endTime2 = System.nanoTime(); -// duration = (endTime2 - endTime1) / 1000000; -// LOGGER.info("写入数据库用时: " + duration + " 毫秒," + "数据量:" + appList.size()); -// } -// -// /** -// * Processes ConsumerRecords and deals with the data to insert into multiple tables. -// * -// * @param records The ConsumerRecords to process. -// */ -// // The data of a topic may be written to multiple tables -// @Generated -// public void dealDataToMultipleTables(final ConsumerRecords records) { -// Map> resMap = new HashMap<>(); -// int partition = 0; -// long offset = 0; -// -// for (ConsumerRecord record : records) { -// String value = record.value(); -// try { -// Map dtoMap = ObjectMapperUtil.toMap(value); -// String table = dtoMap.get("table").toString(); -// -// if (!resMap.containsKey(table)) { -// resMap.put(table, new ArrayList<>()); -// } -// -// ArrayList tmp = resMap.get(table); -// tmp.add(value); -// resMap.put(table, tmp); -// -// partition = record.partition(); -// offset = record.offset(); -// } catch (Exception e) { -// LOGGER.error(e.getMessage() + ": " + value, e); -// } -// } -// resMap.forEach((table, values) -> { -// if (!values.isEmpty()) { -// serviceMap.getIService(table + "Service").saveDataObjectBatch(values); -// } -// }); -// LOGGER.info("Partition: " + partition + ", Offset: " + offset); -// } -// -//} diff --git a/src/main/java/com/easysoftware/kafka/EpkgConsumer.java b/src/main/java/com/easysoftware/kafka/EpkgConsumer.java deleted file mode 100644 index 3d3d50d..0000000 --- a/src/main/java/com/easysoftware/kafka/EpkgConsumer.java +++ /dev/null @@ -1,24 +0,0 @@ -//package com.easysoftware.kafka; -// -//import org.apache.kafka.clients.consumer.ConsumerRecords; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.stereotype.Service; -// -//@Service -//public class EpkgConsumer extends BaseConsumer { -// // @Value("${consumer.topic.name}") -// // String topicName; -// -// // @Value("${consumer.topic.offset}") -// // String topicOffset; -// -// /** -// * Listens for and processes ConsumerRecords of type . -// * -// * @param records The ConsumerRecords to process. -// */ -// @KafkaListener(topics = "software_test_epkg", concurrency = "3") -// public void listen(final ConsumerRecords records) { -// dealDataToTableByBatch(records); -// } -//} diff --git a/src/main/java/com/easysoftware/kafka/Producer.java b/src/main/java/com/easysoftware/kafka/Producer.java deleted file mode 100644 index c5f983c..0000000 --- a/src/main/java/com/easysoftware/kafka/Producer.java +++ /dev/null @@ -1,35 +0,0 @@ -//package com.easysoftware.kafka; -// -//import org.apache.kafka.clients.producer.KafkaProducer; -//import org.apache.kafka.clients.producer.ProducerRecord; -//import org.springframework.beans.factory.annotation.Autowired; -//import org.springframework.kafka.core.KafkaTemplate; -//import org.springframework.stereotype.Component; -// -//@Component -//public class Producer { -// -// /** -// * Autowired KafkaTemplate for producing messages. -// */ -// @Autowired -// private KafkaTemplate kafkaTemplate; -// -// /** -// * Static KafkaProducer for handling Kafka operations. -// */ -// private static KafkaProducer producer; -// -// /** -// * Sends a message with the specified topic, key, and value. -// * -// * @param topic The Kafka topic to send the message to. -// * @param key The key associated with the message. -// * @param value The value of the message. -// */ -// public void sendMess(final String topic, final String key, final String value) { -// ProducerRecord mess = new ProducerRecord(topic, key, value); -// kafkaTemplate.send(mess); -// } -// -//} diff --git a/src/main/java/com/easysoftware/kafka/RpmConsumer.java b/src/main/java/com/easysoftware/kafka/RpmConsumer.java deleted file mode 100644 index b035056..0000000 --- a/src/main/java/com/easysoftware/kafka/RpmConsumer.java +++ /dev/null @@ -1,32 +0,0 @@ -//package com.easysoftware.kafka; -// -//import org.apache.kafka.clients.consumer.ConsumerRecords; -//import org.springframework.beans.factory.annotation.Value; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.stereotype.Service; -// -//@Service -//public class RpmConsumer extends BaseConsumer { -// /** -// * Value for the Kafka consumer topic name. -// */ -// @Value("${consumer.topic.name}") -// private String topicName; -// -// /** -// * Value for the Kafka consumer topic offset. -// */ -// @Value("${consumer.topic.offset}") -// private String topicOffset; -// -// /** -// * Kafka listener method that listens to the "software_test_rpm" topic with concurrency set to 3. -// * -// * @param records The ConsumerRecords to process. -// */ -// @KafkaListener(topics = "software_test_rpm", concurrency = "3") -// public void listen(final ConsumerRecords records) { -// dealDataToTableByBatch(records); -// } -// -//} diff --git a/src/main/java/com/easysoftware/kafka/VersionConsumer.java b/src/main/java/com/easysoftware/kafka/VersionConsumer.java deleted file mode 100644 index 62afb70..0000000 --- a/src/main/java/com/easysoftware/kafka/VersionConsumer.java +++ /dev/null @@ -1,19 +0,0 @@ -//package com.easysoftware.kafka; -// -//import org.apache.kafka.clients.consumer.ConsumerRecords; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.stereotype.Service; -// -//@Service -//public class VersionConsumer extends BaseConsumer { -// -// /** -// * Listens for and processes ConsumerRecords of type . -// * -// * @param records The ConsumerRecords to process. -// */ -// @KafkaListener(topics = "software_test_version") -// public void listen(final ConsumerRecords records) { -// dealDataToTableByBatch(records); -// } -//} -- Gitee