From 89fd60e4347d787ed1695b75d07084cc4dfdaf91 Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Wed, 14 Jun 2023 16:01:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96Kafka=20=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=EF=BC=8C=E8=B0=83=E6=95=B4=E6=A0=A1=E9=AA=8C?= =?UTF-8?q?=E5=B9=B6=E8=A1=8C=E9=BB=98=E8=AE=A4=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/application-sink.yml | 10 ++-- config/application-source.yml | 8 +-- config/application.yml | 4 +- config/log4j2.xml | 32 +++++++++- config/log4j2sink.xml | 24 ++++++-- config/log4j2source.xml | 24 ++++++-- .../event/DeleteTopicsEventListener.java | 32 ++++------ .../check/event/KafkaTopicDeleteProvider.java | 10 ++-- .../modules/check/DataCheckRunnable.java | 8 ++- .../modules/check/KafkaConsumerHandler.java | 5 ++ .../datachecker/common/util/LogUtils.java | 59 +++++++++++++++++++ .../extract/kafka/KafkaAdminService.java | 31 ++++------ .../service/DataExtractServiceImpl.java | 5 +- .../extract/task/ExtractTaskRunnable.java | 42 ++++++++----- 14 files changed, 209 insertions(+), 85 deletions(-) create mode 100644 datachecker-common/src/main/java/org/opengauss/datachecker/common/util/LogUtils.java diff --git a/config/application-sink.yml b/config/application-sink.yml index fbc3b67..b26aee5 100644 --- a/config/application-sink.yml +++ b/config/application-sink.yml @@ -5,12 +5,12 @@ logging: spring: check: server-uri: http://127.0.0.1:9000 - core-pool-size: 2 - maximum-pool-size: 10 - maximum-topic-size: 30 + core-pool-size: 1 + maximum-pool-size: 5 + maximum-topic-size: 5 maximum-table-slice-size: 100000 extract: - schema: jack + schema: test databaseType: OG query-dop: 8 # jdbc Parallel Query config debezium-enable: false # no need config,but not delete @@ -26,7 +26,7 @@ spring: druid: dataSourceOne: driver-class-name: org.opengauss.Driver - url: jdbc:opengauss://127.0.0.1:5432/postgres?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC + url: jdbc:opengauss://localhost:5432/postgres?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC username: password: 'xxxx' # The password text may contain special characters, which need to be enclosed in quotation marks # Configure initialization connection pool size, minimum number of connections, and maximum number of connections diff --git a/config/application-source.yml b/config/application-source.yml index a487b89..9f094da 100644 --- a/config/application-source.yml +++ b/config/application-source.yml @@ -6,9 +6,9 @@ logging: spring: check: server-uri: http://127.0.0.1:9000 - core-pool-size: 2 - maximum-pool-size: 10 - maximum-topic-size: 30 + core-pool-size: 1 + maximum-pool-size: 5 + maximum-topic-size: 5 maximum-table-slice-size: 100000 extract: schema: test @@ -30,7 +30,7 @@ spring: druid: dataSourceOne: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://127.0.0.1:3306/mysql?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&allowPublicKeyRetrieval=true + url: jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&allowPublicKeyRetrieval=true username: password: 'xxxx' # The password text may contain special characters, which need to be enclosed in quotation marks # Configure initialization connection pool size, minimum number of connections, and maximum number of connections diff --git a/config/application.yml b/config/application.yml index 7fe3738..31129d0 100644 --- a/config/application.yml +++ b/config/application.yml @@ -6,8 +6,8 @@ spring: kafka: bootstrap-servers: localhost:9092 check: - core-pool-size: 2 - maximum-pool-size: 10 + core-pool-size: 1 + maximum-pool-size: 4 data: check: data-path: ./check_result diff --git a/config/log4j2.xml b/config/log4j2.xml index 49753d2..eb3ce14 100644 --- a/config/log4j2.xml +++ b/config/log4j2.xml @@ -20,7 +20,10 @@ logs INFO ${sys:logName} - + kafka + business + + @@ -40,6 +43,26 @@ + + + + + + + + + + + + + + + + + + @@ -71,7 +94,12 @@ - + + + + + + diff --git a/config/log4j2sink.xml b/config/log4j2sink.xml index 1cccf7c..f53477f 100644 --- a/config/log4j2sink.xml +++ b/config/log4j2sink.xml @@ -19,9 +19,10 @@ logs INFO ${sys:logName} - business - - + kafka + business + + @@ -41,8 +42,18 @@ - + + + + + + + + + + @@ -85,6 +96,9 @@ + + + diff --git a/config/log4j2source.xml b/config/log4j2source.xml index 221ac6d..91f421d 100644 --- a/config/log4j2source.xml +++ b/config/log4j2source.xml @@ -20,9 +20,10 @@ logs INFO ${sys:logName} - business - - + kafka + business + + @@ -42,8 +43,18 @@ - + + + + + + + + + + @@ -84,6 +95,9 @@ + + + diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/DeleteTopicsEventListener.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/DeleteTopicsEventListener.java index 8eee8a0..4f46627 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/DeleteTopicsEventListener.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/DeleteTopicsEventListener.java @@ -15,16 +15,15 @@ package org.opengauss.datachecker.check.event; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.KafkaAdminClient; -import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.common.KafkaFuture; +import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.check.client.FeignClientService; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.util.LogUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; @@ -34,56 +33,46 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; /** * @author :wangchao * @date :Created in 2023/3/7 * @since :11 */ -@Slf4j @Component public class DeleteTopicsEventListener implements ApplicationListener { - private static final int DELETE_RETRY_TIMES = 3; + private static final Logger log = LogUtils.geKafkaLogger(); + @Resource private FeignClientService feignClient; @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; private AdminClient adminClient = null; - private AtomicInteger retryTimes = new AtomicInteger(0); @Override public void onApplicationEvent(DeleteTopicsEvent event) { try { + log.info("delete topic event : {}", event.getMessage()); final Object source = event.getSource(); initAdminClient(); final DeleteTopics deleteOption = (DeleteTopics) source; deleteTopic(deleteOption.getTopicList()); feignClient.notifyCheckTableFinished(Endpoint.SOURCE, deleteOption.getTableName()); feignClient.notifyCheckTableFinished(Endpoint.SINK, deleteOption.getTableName()); - log.info("delete topic event : {}", event.getMessage()); } catch (Exception exception) { log.error("delete topic has error ", exception); } } private void deleteTopic(List deleteTopicList) { - DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(deleteTopicList); - final KafkaFuture kafkaFuture = deleteTopicsResult.all(); - retryTimes.incrementAndGet(); try { + log.info("delete topic [{}] start", deleteTopicList); + DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(deleteTopicList); + final KafkaFuture kafkaFuture = deleteTopicsResult.all(); kafkaFuture.get(); - List checkedList = adminClient.listTopics().listings().get().stream().map(TopicListing::name) - .filter(deleteTopicList::contains).collect(Collectors.toList()); - if (CollectionUtils.isNotEmpty(checkedList)) { - if (retryTimes.get() <= DELETE_RETRY_TIMES) { - deleteTopic(checkedList); - } else { - log.error("retry to delete {} topic error : delete too many times(3) ", checkedList); - } - } + log.info("delete topic [{}] finished", deleteTopicList); } catch (InterruptedException | ExecutionException ignore) { + log.error("delete topic [{}] error : ", deleteTopicList, ignore); } } @@ -92,6 +81,7 @@ public class DeleteTopicsEventListener implements ApplicationListener props = new HashMap<>(1); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); this.adminClient = KafkaAdminClient.create(props); + log.info("init admin client [{}]", bootstrapServers); } } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/KafkaTopicDeleteProvider.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/KafkaTopicDeleteProvider.java index 579d12e..2a99bdf 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/KafkaTopicDeleteProvider.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/KafkaTopicDeleteProvider.java @@ -15,12 +15,13 @@ package org.opengauss.datachecker.check.event; -import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.check.config.DataCheckProperties; import org.opengauss.datachecker.check.modules.task.TaskManagerService; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.TopicUtil; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; @@ -41,11 +42,12 @@ import java.util.concurrent.TimeUnit; * @date :Created in 2023/3/7 * @since :11 */ -@Slf4j @Service public class KafkaTopicDeleteProvider implements ApplicationContextAware { - private ApplicationContext applicationContext; + private static final Logger logKafka = LogUtils.geKafkaLogger(); private static volatile Map deleteTableMap = new ConcurrentHashMap<>(); + + private ApplicationContext applicationContext; @Resource private DataCheckProperties properties; @Resource @@ -101,7 +103,7 @@ public class KafkaTopicDeleteProvider implements ApplicationContextAware { }); if (CollectionUtils.isNotEmpty(deleteOptions)) { deleteOptions.forEach(deleteOption -> { - log.info("publish delete-topic-event table = [{}] , current-pending-quantity = [{}]", + logKafka.info("publish delete-topic-event table = [{}] , current-pending-quantity = [{}]", deleteOption.getTableName(), deleteTableMap.size()); applicationContext.publishEvent(new DeleteTopicsEvent(deleteOption, deleteOption.toString())); deleteTableMap.remove(deleteOption.getTableName()); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java index 457eb2e..5d217ec 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java @@ -43,6 +43,7 @@ import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.exception.LargeDataDiffException; import org.opengauss.datachecker.common.exception.MerkleTreeDepthException; +import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.SpringUtil; import org.opengauss.datachecker.common.util.TopicUtil; import org.springframework.lang.NonNull; @@ -70,7 +71,8 @@ import java.util.concurrent.ConcurrentHashMap; * @since :11 */ public class DataCheckRunnable implements Runnable { - public static final Logger log = LogManager.getLogger("check_business"); + private static final Logger log = LogUtils.getCheckLogger(); + private static final Logger logKafka = LogUtils.geKafkaLogger(); private static final int THRESHOLD_MIN_BUCKET_SIZE = 2; private final List sourceBucketList = Collections.synchronizedList(new ArrayList<>()); @@ -117,6 +119,7 @@ public class DataCheckRunnable implements Runnable { private KafkaConsumerHandler buildKafkaHandler(DataCheckRunnableSupport support) { KafkaConsumerService kafkaConsumerService = support.getKafkaConsumerService(); + logKafka.info("create kafka consumer for [{}] [{}]", tableName, partitions); return new KafkaConsumerHandler(kafkaConsumerService.buildKafkaConsumer(false), kafkaConsumerService.getRetryFetchRecordTimes()); } @@ -146,6 +149,9 @@ public class DataCheckRunnable implements Runnable { checkResult(); cleanCheckThreadEnvironment(); checkRateCache.add(buildCheckTable()); + kafkaConsumerHandler.closeConsumer(); + logKafka.info("close table consumer of topic, [{},{}] : [{} : {}] ", tableName, partitions, sourceTopic, + sinkTopic); log.info("check table result {} complete!", tableName); } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java index 29b5a9f..e714356 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java @@ -39,6 +39,7 @@ import java.util.Map; */ @Slf4j public class KafkaConsumerHandler { + private static final int KAFKA_CONSUMER_POLL_DURATION = 20; private final KafkaConsumer kafkaConsumer; @@ -122,4 +123,8 @@ public class KafkaConsumerHandler { dataList.add(JSON.parseObject(record.value(), RowDataHash.class)); }); } + + public void closeConsumer() { + kafkaConsumer.close(Duration.ofSeconds(1)); + } } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/LogUtils.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/LogUtils.java new file mode 100644 index 0000000..55e39c3 --- /dev/null +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/LogUtils.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.common.util; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * LogUtils + * + * @author :wangchao + * @date :Created in 2023/6/14 + * @since :11 + */ +public class LogUtils { + private static final String CHECK_BUSINESS = "check_business"; + private static final String EXTRACT_BUSINESS = "extract_business"; + private static final String KAFKA_BUSINESS = "kafka_business"; + + /** + * get kafka business logger + * + * @return logger + */ + public static Logger geKafkaLogger() { + return LogManager.getLogger(KAFKA_BUSINESS); + } + + /** + * get check business logger + * + * @return logger + */ + public static Logger getCheckLogger() { + return LogManager.getLogger(CHECK_BUSINESS); + } + + /** + * get extrect business logger + * + * @return logger + */ + public static Logger getExtractLogger() { + return LogManager.getLogger(EXTRACT_BUSINESS); + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java index 0ffb1e3..3a922ee 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java @@ -24,8 +24,9 @@ import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.common.KafkaFuture; +import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.entry.enums.Endpoint; -import org.opengauss.datachecker.common.exception.CreateTopicException; +import org.opengauss.datachecker.common.util.LogUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.KafkaException; import org.springframework.stereotype.Component; @@ -36,7 +37,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -49,13 +49,11 @@ import java.util.stream.Collectors; * @since :11 */ @Component -@Slf4j public class KafkaAdminService { + private static final Logger log = LogUtils.geKafkaLogger(); @Value("${spring.kafka.bootstrap-servers}") private String springKafkaBootstrapServers; private AdminClient adminClient; - @Value("${spring.check.maximum-topic-size}") - private int maximumTopicSize = 50; private String endpointTopicPrefix = ""; private ReentrantLock lock = new ReentrantLock(); @@ -74,6 +72,7 @@ public class KafkaAdminService { adminClient = KafkaAdminClient.create(props); try { adminClient.listTopics().listings().get(); + log.info("init and listTopics admin client [{}]", springKafkaBootstrapServers); } catch (ExecutionException | InterruptedException ex) { log.error("kafka Client link exception: ", ex); throw new KafkaException("kafka Client link exception"); @@ -89,18 +88,10 @@ public class KafkaAdminService { public boolean createTopic(String topic, int partitions) { lock.lock(); try { - KafkaFuture> names = adminClient.listTopics().names(); - if (names.get().contains(topic)) { - return true; - } else { - CreateTopicsResult topicsResult = - adminClient.createTopics(List.of(new NewTopic(topic, partitions, (short) 1))); - log.info("topic={} create,numPartitions={}, short replicationFactor={}", topic, partitions, 1); - return topicsResult.all().isDone(); - } - } catch (InterruptedException | ExecutionException e) { - log.error("topic={} is delete error : {}", topic, e); - throw new CreateTopicException(topic); + CreateTopicsResult topicsResult = + adminClient.createTopics(List.of(new NewTopic(topic, partitions, (short) 1))); + log.info("create topic success , name= [{}] numPartitions = [{}]", topic, partitions); + return topicsResult.all().isDone(); } finally { lock.unlock(); } @@ -117,7 +108,7 @@ public class KafkaAdminService { kafkaFutureMap.forEach((topic, future) -> { try { future.get(); - log.debug("topic={} is delete successfull", topic); + log.info("topic={} is delete successfull", topic); } catch (InterruptedException | ExecutionException e) { log.error("topic={} is delete error : {}", topic, e); } @@ -132,7 +123,7 @@ public class KafkaAdminService { */ public List getAllTopic(String prefix) { try { - log.debug("topic prefix :{}", prefix); + log.info("get topic from kafka list topics and prefix [{}]", prefix); return adminClient.listTopics().listings().get().stream().map(TopicListing::name) .filter(name -> name.startsWith(prefix)).collect(Collectors.toList()); } catch (InterruptedException | ExecutionException e) { @@ -148,6 +139,7 @@ public class KafkaAdminService { */ public List getAllTopic() { try { + log.info("get topic from kafka list topics"); return adminClient.listTopics().listings().get().stream().map(TopicListing::name) .collect(Collectors.toList()); } catch (InterruptedException | ExecutionException e) { @@ -164,6 +156,7 @@ public class KafkaAdminService { */ public boolean isTopicExists(String topicName) { try { + log.info("check topic [{}] has exists --> check kafka list topics", topicName); return adminClient.listTopics().listings().get().stream().map(TopicListing::name) .anyMatch(name -> name.equalsIgnoreCase(topicName)); } catch (InterruptedException | ExecutionException e) { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java index e5e4135..e4b79f3 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.extract.service; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; +import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.constant.Constants; import org.opengauss.datachecker.common.entry.enums.DML; import org.opengauss.datachecker.common.entry.enums.Endpoint; @@ -34,6 +35,7 @@ import org.opengauss.datachecker.common.exception.ProcessMultipleException; import org.opengauss.datachecker.common.exception.TableNotExistException; import org.opengauss.datachecker.common.exception.TaskNotFoundException; import org.opengauss.datachecker.common.service.DynamicThreadPoolManager; +import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.ThreadUtil; import org.opengauss.datachecker.common.util.TopicUtil; import org.opengauss.datachecker.extract.cache.MetaDataCache; @@ -76,7 +78,7 @@ import static org.opengauss.datachecker.common.constant.DynamicTpConstant.EXTRAC @Slf4j @Service public class DataExtractServiceImpl implements DataExtractService { - + private static final Logger logKafka = LogUtils.geKafkaLogger(); /** * Maximum number of sleeps of threads executing data extraction tasks */ @@ -304,6 +306,7 @@ public class DataExtractServiceImpl implements DataExtractService { } Topic topic = task.getTopic(); Endpoint endpoint = extractProperties.getEndpoint(); + logKafka.info("try to create [{}] [{}]", topic.getTopicName(endpoint), topic.getPartitions()); kafkaAdminService.createTopic(topic.getTopicName(endpoint), topic.getPartitions()); topicCache.add(topic); log.info("current topic cache size = {}", topicCache.size()); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java index 87076cb..af49e79 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java @@ -17,7 +17,6 @@ package org.opengauss.datachecker.extract.task; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.constant.DynamicTpConstant; import org.opengauss.datachecker.common.entry.common.ExtractContext; @@ -28,11 +27,12 @@ import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.entry.extract.Topic; import org.opengauss.datachecker.common.exception.ExtractDataAccessException; +import org.opengauss.datachecker.common.exception.ExtractException; import org.opengauss.datachecker.common.service.DynamicThreadPoolManager; +import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.TaskUtil; import org.opengauss.datachecker.common.util.ThreadUtil; import org.opengauss.datachecker.extract.client.CheckingFeignClient; -import org.opengauss.datachecker.extract.kafka.KafkaAdminService; import org.opengauss.datachecker.extract.resource.ResourceManager; import org.opengauss.datachecker.extract.task.sql.QuerySqlEntry; import org.opengauss.datachecker.extract.task.sql.SelectSqlBuilder; @@ -72,7 +72,8 @@ import static org.opengauss.datachecker.common.constant.DynamicTpConstant.TABLE_ * @since 11 **/ public class ExtractTaskRunnable implements Runnable { - public static final Logger log = LogManager.getLogger("extract_business"); + private static final Logger log = LogUtils.getExtractLogger(); + private static final Logger logKafka = LogUtils.geKafkaLogger(); private static final int FETCH_SIZE = 10000; private final ExtractTask task; private final DynamicThreadPoolManager dynamicThreadPoolManager; @@ -471,6 +472,7 @@ public class ExtractTaskRunnable implements Runnable { * kafka operations */ class KafkaOperations { + private static final int DEFAULT_PARTITION = 0; private static final int MIN_PARTITION_NUM = 1; @@ -497,6 +499,16 @@ public class ExtractTaskRunnable implements Runnable { this.topicPartitionCount = topic.getPartitions(); } + /** + * send row data to topic,that has multiple partition + * + * @param row row + */ + public void sendMultiplePartitionsRowData(RowDataHash row) { + row.setPartition(calcSimplePartition(row.getPrimaryKeyHash())); + send(topicName, row.getPartition(), row.getPrimaryKey(), JSON.toJSONString(row)); + } + /** * send row data to topic,that has single partition * @@ -508,8 +520,17 @@ public class ExtractTaskRunnable implements Runnable { log.debug("row data hash zero :{}:{}", row.getPrimaryKey(), JSON.toJSONString(row)); return; } - kafkaTemplate - .send(new ProducerRecord<>(topicName, DEFAULT_PARTITION, row.getPrimaryKey(), JSON.toJSONString(row))); + send(topicName, DEFAULT_PARTITION, row.getPrimaryKey(), JSON.toJSONString(row)); + } + + private void send(String topicName, int partition, String key, String message) { + try { + kafkaTemplate.send(new ProducerRecord<>(topicName, partition, key, message)); + } catch (Exception kafkaException) { + logKafka.error("send kafka [{} , {}] record error {}", topicName, key, kafkaException); + throw new ExtractException( + "send kafka [" + topicName + " , " + key + "] record " + kafkaException.getMessage()); + } } /** @@ -519,17 +540,6 @@ public class ExtractTaskRunnable implements Runnable { kafkaTemplate.flush(); } - /** - * send row data to topic,that has multiple partition - * - * @param row row - */ - public void sendMultiplePartitionsRowData(RowDataHash row) { - row.setPartition(calcSimplePartition(row.getPrimaryKeyHash())); - kafkaTemplate - .send(new ProducerRecord<>(topicName, row.getPartition(), row.getPrimaryKey(), JSON.toJSONString(row))); - } - /** * this topic has multiple partitions * -- Gitee