diff --git a/config/application-sink.yml b/config/application-sink.yml index fbc3b6705ec29a5481bb71b7e50b81305aa7cfda..b26aee59886fee234f52cf998ddebbf034f4e63c 100644 --- a/config/application-sink.yml +++ b/config/application-sink.yml @@ -5,12 +5,12 @@ logging: spring: check: server-uri: http://127.0.0.1:9000 - core-pool-size: 2 - maximum-pool-size: 10 - maximum-topic-size: 30 + core-pool-size: 1 + maximum-pool-size: 5 + maximum-topic-size: 5 maximum-table-slice-size: 100000 extract: - schema: jack + schema: test databaseType: OG query-dop: 8 # jdbc Parallel Query config debezium-enable: false # no need config,but not delete @@ -26,7 +26,7 @@ spring: druid: dataSourceOne: driver-class-name: org.opengauss.Driver - url: jdbc:opengauss://127.0.0.1:5432/postgres?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC + url: jdbc:opengauss://localhost:5432/postgres?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC username: password: 'xxxx' # The password text may contain special characters, which need to be enclosed in quotation marks # Configure initialization connection pool size, minimum number of connections, and maximum number of connections diff --git a/config/application-source.yml b/config/application-source.yml index b55106c4bbb2e15c1f96e1add5a8155fbf1bc025..905f460d90cbd9c366c5828d7dfada5a4a416b6e 100644 --- a/config/application-source.yml +++ b/config/application-source.yml @@ -6,9 +6,9 @@ logging: spring: check: server-uri: http://127.0.0.1:9000 - core-pool-size: 2 - maximum-pool-size: 10 - maximum-topic-size: 30 + core-pool-size: 1 + maximum-pool-size: 5 + maximum-topic-size: 5 maximum-table-slice-size: 100000 extract: schema: test @@ -31,7 +31,7 @@ spring: druid: dataSourceOne: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://127.0.0.1:3306/mysql?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&allowPublicKeyRetrieval=true + url: jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&allowPublicKeyRetrieval=true # driver-class-name: org.opengauss.Driver # For openGauss # url: # jdbc:opengauss://127.0.0.1:5432/postgres?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC # For openGauss username: diff --git a/config/application.yml b/config/application.yml index 7fe37387ef8e506479ad01e51b3d066dfdcf7129..31129d029f44b44f172a97a6cadd6e9b371f4f49 100644 --- a/config/application.yml +++ b/config/application.yml @@ -6,8 +6,8 @@ spring: kafka: bootstrap-servers: localhost:9092 check: - core-pool-size: 2 - maximum-pool-size: 10 + core-pool-size: 1 + maximum-pool-size: 4 data: check: data-path: ./check_result diff --git a/config/log4j2.xml b/config/log4j2.xml index 49753d278ab02f71536d580f5ed19704552b9397..eb3ce143b2b65ff9e65082c1701f1820bd9550ea 100644 --- a/config/log4j2.xml +++ b/config/log4j2.xml @@ -20,7 +20,10 @@ logs INFO ${sys:logName} - + kafka + business + + @@ -40,6 +43,26 @@ + + + + + + + + + + + + + + + + + + @@ -71,7 +94,12 @@ - + + + + + + diff --git a/config/log4j2sink.xml b/config/log4j2sink.xml index 1cccf7c508d587666fb947e915b5567a7c1cb71c..f53477fc4ee5f9691589754f212b0139c91b9679 100644 --- a/config/log4j2sink.xml +++ b/config/log4j2sink.xml @@ -19,9 +19,10 @@ logs INFO ${sys:logName} - business - - + kafka + business + + @@ -41,8 +42,18 @@ - + + + + + + + + + + @@ -85,6 +96,9 @@ + + + diff --git a/config/log4j2source.xml b/config/log4j2source.xml index 221ac6d8760982cac0f09f7e8b89485d0b336531..91f421d842734b6ded0195af0409d37a8549bd15 100644 --- a/config/log4j2source.xml +++ b/config/log4j2source.xml @@ -20,9 +20,10 @@ logs INFO ${sys:logName} - business - - + kafka + business + + @@ -42,8 +43,18 @@ - + + + + + + + + + + @@ -84,6 +95,9 @@ + + + diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/DeleteTopicsEventListener.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/DeleteTopicsEventListener.java index 8eee8a0a5677fa3e2481f25c895a634be11d5714..4f466276af1ba36ea0b45929b8bf061af66781b6 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/DeleteTopicsEventListener.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/DeleteTopicsEventListener.java @@ -15,16 +15,15 @@ package org.opengauss.datachecker.check.event; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.KafkaAdminClient; -import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.common.KafkaFuture; +import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.check.client.FeignClientService; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.util.LogUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; @@ -34,56 +33,46 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; /** * @author :wangchao * @date :Created in 2023/3/7 * @since :11 */ -@Slf4j @Component public class DeleteTopicsEventListener implements ApplicationListener { - private static final int DELETE_RETRY_TIMES = 3; + private static final Logger log = LogUtils.geKafkaLogger(); + @Resource private FeignClientService feignClient; @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; private AdminClient adminClient = null; - private AtomicInteger retryTimes = new AtomicInteger(0); @Override public void onApplicationEvent(DeleteTopicsEvent event) { try { + log.info("delete topic event : {}", event.getMessage()); final Object source = event.getSource(); initAdminClient(); final DeleteTopics deleteOption = (DeleteTopics) source; deleteTopic(deleteOption.getTopicList()); feignClient.notifyCheckTableFinished(Endpoint.SOURCE, deleteOption.getTableName()); feignClient.notifyCheckTableFinished(Endpoint.SINK, deleteOption.getTableName()); - log.info("delete topic event : {}", event.getMessage()); } catch (Exception exception) { log.error("delete topic has error ", exception); } } private void deleteTopic(List deleteTopicList) { - DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(deleteTopicList); - final KafkaFuture kafkaFuture = deleteTopicsResult.all(); - retryTimes.incrementAndGet(); try { + log.info("delete topic [{}] start", deleteTopicList); + DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(deleteTopicList); + final KafkaFuture kafkaFuture = deleteTopicsResult.all(); kafkaFuture.get(); - List checkedList = adminClient.listTopics().listings().get().stream().map(TopicListing::name) - .filter(deleteTopicList::contains).collect(Collectors.toList()); - if (CollectionUtils.isNotEmpty(checkedList)) { - if (retryTimes.get() <= DELETE_RETRY_TIMES) { - deleteTopic(checkedList); - } else { - log.error("retry to delete {} topic error : delete too many times(3) ", checkedList); - } - } + log.info("delete topic [{}] finished", deleteTopicList); } catch (InterruptedException | ExecutionException ignore) { + log.error("delete topic [{}] error : ", deleteTopicList, ignore); } } @@ -92,6 +81,7 @@ public class DeleteTopicsEventListener implements ApplicationListener props = new HashMap<>(1); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); this.adminClient = KafkaAdminClient.create(props); + log.info("init admin client [{}]", bootstrapServers); } } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/KafkaTopicDeleteProvider.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/KafkaTopicDeleteProvider.java index 579d12e514000271cb6f26038fd8843807b0c5ab..2a99bdf6553707749e2830a3a4d597935e09d461 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/KafkaTopicDeleteProvider.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/KafkaTopicDeleteProvider.java @@ -15,12 +15,13 @@ package org.opengauss.datachecker.check.event; -import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.check.config.DataCheckProperties; import org.opengauss.datachecker.check.modules.task.TaskManagerService; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.TopicUtil; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; @@ -41,11 +42,12 @@ import java.util.concurrent.TimeUnit; * @date :Created in 2023/3/7 * @since :11 */ -@Slf4j @Service public class KafkaTopicDeleteProvider implements ApplicationContextAware { - private ApplicationContext applicationContext; + private static final Logger logKafka = LogUtils.geKafkaLogger(); private static volatile Map deleteTableMap = new ConcurrentHashMap<>(); + + private ApplicationContext applicationContext; @Resource private DataCheckProperties properties; @Resource @@ -101,7 +103,7 @@ public class KafkaTopicDeleteProvider implements ApplicationContextAware { }); if (CollectionUtils.isNotEmpty(deleteOptions)) { deleteOptions.forEach(deleteOption -> { - log.info("publish delete-topic-event table = [{}] , current-pending-quantity = [{}]", + logKafka.info("publish delete-topic-event table = [{}] , current-pending-quantity = [{}]", deleteOption.getTableName(), deleteTableMap.size()); applicationContext.publishEvent(new DeleteTopicsEvent(deleteOption, deleteOption.toString())); deleteTableMap.remove(deleteOption.getTableName()); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java index 457eb2eab03c69db5c79899f1001cf4ff9708d64..5d217ec2518824801b70746b973e45107428aa09 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java @@ -43,6 +43,7 @@ import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.exception.LargeDataDiffException; import org.opengauss.datachecker.common.exception.MerkleTreeDepthException; +import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.SpringUtil; import org.opengauss.datachecker.common.util.TopicUtil; import org.springframework.lang.NonNull; @@ -70,7 +71,8 @@ import java.util.concurrent.ConcurrentHashMap; * @since :11 */ public class DataCheckRunnable implements Runnable { - public static final Logger log = LogManager.getLogger("check_business"); + private static final Logger log = LogUtils.getCheckLogger(); + private static final Logger logKafka = LogUtils.geKafkaLogger(); private static final int THRESHOLD_MIN_BUCKET_SIZE = 2; private final List sourceBucketList = Collections.synchronizedList(new ArrayList<>()); @@ -117,6 +119,7 @@ public class DataCheckRunnable implements Runnable { private KafkaConsumerHandler buildKafkaHandler(DataCheckRunnableSupport support) { KafkaConsumerService kafkaConsumerService = support.getKafkaConsumerService(); + logKafka.info("create kafka consumer for [{}] [{}]", tableName, partitions); return new KafkaConsumerHandler(kafkaConsumerService.buildKafkaConsumer(false), kafkaConsumerService.getRetryFetchRecordTimes()); } @@ -146,6 +149,9 @@ public class DataCheckRunnable implements Runnable { checkResult(); cleanCheckThreadEnvironment(); checkRateCache.add(buildCheckTable()); + kafkaConsumerHandler.closeConsumer(); + logKafka.info("close table consumer of topic, [{},{}] : [{} : {}] ", tableName, partitions, sourceTopic, + sinkTopic); log.info("check table result {} complete!", tableName); } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java index 29b5a9fcbdb556c40de74cad3ca9886aabe5ada7..e71435669ef3f4bfa358b6ca2f08c0835a9d4ac9 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java @@ -39,6 +39,7 @@ import java.util.Map; */ @Slf4j public class KafkaConsumerHandler { + private static final int KAFKA_CONSUMER_POLL_DURATION = 20; private final KafkaConsumer kafkaConsumer; @@ -122,4 +123,8 @@ public class KafkaConsumerHandler { dataList.add(JSON.parseObject(record.value(), RowDataHash.class)); }); } + + public void closeConsumer() { + kafkaConsumer.close(Duration.ofSeconds(1)); + } } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/LogUtils.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/LogUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..55e39c38567def851e4348ca80ffc0a62a33885e --- /dev/null +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/LogUtils.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.common.util; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * LogUtils + * + * @author :wangchao + * @date :Created in 2023/6/14 + * @since :11 + */ +public class LogUtils { + private static final String CHECK_BUSINESS = "check_business"; + private static final String EXTRACT_BUSINESS = "extract_business"; + private static final String KAFKA_BUSINESS = "kafka_business"; + + /** + * get kafka business logger + * + * @return logger + */ + public static Logger geKafkaLogger() { + return LogManager.getLogger(KAFKA_BUSINESS); + } + + /** + * get check business logger + * + * @return logger + */ + public static Logger getCheckLogger() { + return LogManager.getLogger(CHECK_BUSINESS); + } + + /** + * get extrect business logger + * + * @return logger + */ + public static Logger getExtractLogger() { + return LogManager.getLogger(EXTRACT_BUSINESS); + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java index 0ffb1e35bfc9bca8de5e285d998f306ea05ae877..3a922ee70eeb1f84c88795253021e796ff7bc879 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java @@ -24,8 +24,9 @@ import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.common.KafkaFuture; +import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.entry.enums.Endpoint; -import org.opengauss.datachecker.common.exception.CreateTopicException; +import org.opengauss.datachecker.common.util.LogUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.KafkaException; import org.springframework.stereotype.Component; @@ -36,7 +37,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -49,13 +49,11 @@ import java.util.stream.Collectors; * @since :11 */ @Component -@Slf4j public class KafkaAdminService { + private static final Logger log = LogUtils.geKafkaLogger(); @Value("${spring.kafka.bootstrap-servers}") private String springKafkaBootstrapServers; private AdminClient adminClient; - @Value("${spring.check.maximum-topic-size}") - private int maximumTopicSize = 50; private String endpointTopicPrefix = ""; private ReentrantLock lock = new ReentrantLock(); @@ -74,6 +72,7 @@ public class KafkaAdminService { adminClient = KafkaAdminClient.create(props); try { adminClient.listTopics().listings().get(); + log.info("init and listTopics admin client [{}]", springKafkaBootstrapServers); } catch (ExecutionException | InterruptedException ex) { log.error("kafka Client link exception: ", ex); throw new KafkaException("kafka Client link exception"); @@ -89,18 +88,10 @@ public class KafkaAdminService { public boolean createTopic(String topic, int partitions) { lock.lock(); try { - KafkaFuture> names = adminClient.listTopics().names(); - if (names.get().contains(topic)) { - return true; - } else { - CreateTopicsResult topicsResult = - adminClient.createTopics(List.of(new NewTopic(topic, partitions, (short) 1))); - log.info("topic={} create,numPartitions={}, short replicationFactor={}", topic, partitions, 1); - return topicsResult.all().isDone(); - } - } catch (InterruptedException | ExecutionException e) { - log.error("topic={} is delete error : {}", topic, e); - throw new CreateTopicException(topic); + CreateTopicsResult topicsResult = + adminClient.createTopics(List.of(new NewTopic(topic, partitions, (short) 1))); + log.info("create topic success , name= [{}] numPartitions = [{}]", topic, partitions); + return topicsResult.all().isDone(); } finally { lock.unlock(); } @@ -117,7 +108,7 @@ public class KafkaAdminService { kafkaFutureMap.forEach((topic, future) -> { try { future.get(); - log.debug("topic={} is delete successfull", topic); + log.info("topic={} is delete successfull", topic); } catch (InterruptedException | ExecutionException e) { log.error("topic={} is delete error : {}", topic, e); } @@ -132,7 +123,7 @@ public class KafkaAdminService { */ public List getAllTopic(String prefix) { try { - log.debug("topic prefix :{}", prefix); + log.info("get topic from kafka list topics and prefix [{}]", prefix); return adminClient.listTopics().listings().get().stream().map(TopicListing::name) .filter(name -> name.startsWith(prefix)).collect(Collectors.toList()); } catch (InterruptedException | ExecutionException e) { @@ -148,6 +139,7 @@ public class KafkaAdminService { */ public List getAllTopic() { try { + log.info("get topic from kafka list topics"); return adminClient.listTopics().listings().get().stream().map(TopicListing::name) .collect(Collectors.toList()); } catch (InterruptedException | ExecutionException e) { @@ -164,6 +156,7 @@ public class KafkaAdminService { */ public boolean isTopicExists(String topicName) { try { + log.info("check topic [{}] has exists --> check kafka list topics", topicName); return adminClient.listTopics().listings().get().stream().map(TopicListing::name) .anyMatch(name -> name.equalsIgnoreCase(topicName)); } catch (InterruptedException | ExecutionException e) { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java index e5e4135bf6203d4aa3e591f6196c4549147af7f6..e4b79f300990f27ae6fb374192176afb5f6fd9fb 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.extract.service; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; +import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.constant.Constants; import org.opengauss.datachecker.common.entry.enums.DML; import org.opengauss.datachecker.common.entry.enums.Endpoint; @@ -34,6 +35,7 @@ import org.opengauss.datachecker.common.exception.ProcessMultipleException; import org.opengauss.datachecker.common.exception.TableNotExistException; import org.opengauss.datachecker.common.exception.TaskNotFoundException; import org.opengauss.datachecker.common.service.DynamicThreadPoolManager; +import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.ThreadUtil; import org.opengauss.datachecker.common.util.TopicUtil; import org.opengauss.datachecker.extract.cache.MetaDataCache; @@ -76,7 +78,7 @@ import static org.opengauss.datachecker.common.constant.DynamicTpConstant.EXTRAC @Slf4j @Service public class DataExtractServiceImpl implements DataExtractService { - + private static final Logger logKafka = LogUtils.geKafkaLogger(); /** * Maximum number of sleeps of threads executing data extraction tasks */ @@ -304,6 +306,7 @@ public class DataExtractServiceImpl implements DataExtractService { } Topic topic = task.getTopic(); Endpoint endpoint = extractProperties.getEndpoint(); + logKafka.info("try to create [{}] [{}]", topic.getTopicName(endpoint), topic.getPartitions()); kafkaAdminService.createTopic(topic.getTopicName(endpoint), topic.getPartitions()); topicCache.add(topic); log.info("current topic cache size = {}", topicCache.size()); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java index 87076cb2ca18201a56eaac34990e64bb7ea5668c..af49e7929ae1367f6ec88791c28e1c4223346386 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java @@ -17,7 +17,6 @@ package org.opengauss.datachecker.extract.task; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.constant.DynamicTpConstant; import org.opengauss.datachecker.common.entry.common.ExtractContext; @@ -28,11 +27,12 @@ import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.entry.extract.Topic; import org.opengauss.datachecker.common.exception.ExtractDataAccessException; +import org.opengauss.datachecker.common.exception.ExtractException; import org.opengauss.datachecker.common.service.DynamicThreadPoolManager; +import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.TaskUtil; import org.opengauss.datachecker.common.util.ThreadUtil; import org.opengauss.datachecker.extract.client.CheckingFeignClient; -import org.opengauss.datachecker.extract.kafka.KafkaAdminService; import org.opengauss.datachecker.extract.resource.ResourceManager; import org.opengauss.datachecker.extract.task.sql.QuerySqlEntry; import org.opengauss.datachecker.extract.task.sql.SelectSqlBuilder; @@ -72,7 +72,8 @@ import static org.opengauss.datachecker.common.constant.DynamicTpConstant.TABLE_ * @since 11 **/ public class ExtractTaskRunnable implements Runnable { - public static final Logger log = LogManager.getLogger("extract_business"); + private static final Logger log = LogUtils.getExtractLogger(); + private static final Logger logKafka = LogUtils.geKafkaLogger(); private static final int FETCH_SIZE = 10000; private final ExtractTask task; private final DynamicThreadPoolManager dynamicThreadPoolManager; @@ -471,6 +472,7 @@ public class ExtractTaskRunnable implements Runnable { * kafka operations */ class KafkaOperations { + private static final int DEFAULT_PARTITION = 0; private static final int MIN_PARTITION_NUM = 1; @@ -497,6 +499,16 @@ public class ExtractTaskRunnable implements Runnable { this.topicPartitionCount = topic.getPartitions(); } + /** + * send row data to topic,that has multiple partition + * + * @param row row + */ + public void sendMultiplePartitionsRowData(RowDataHash row) { + row.setPartition(calcSimplePartition(row.getPrimaryKeyHash())); + send(topicName, row.getPartition(), row.getPrimaryKey(), JSON.toJSONString(row)); + } + /** * send row data to topic,that has single partition * @@ -508,8 +520,17 @@ public class ExtractTaskRunnable implements Runnable { log.debug("row data hash zero :{}:{}", row.getPrimaryKey(), JSON.toJSONString(row)); return; } - kafkaTemplate - .send(new ProducerRecord<>(topicName, DEFAULT_PARTITION, row.getPrimaryKey(), JSON.toJSONString(row))); + send(topicName, DEFAULT_PARTITION, row.getPrimaryKey(), JSON.toJSONString(row)); + } + + private void send(String topicName, int partition, String key, String message) { + try { + kafkaTemplate.send(new ProducerRecord<>(topicName, partition, key, message)); + } catch (Exception kafkaException) { + logKafka.error("send kafka [{} , {}] record error {}", topicName, key, kafkaException); + throw new ExtractException( + "send kafka [" + topicName + " , " + key + "] record " + kafkaException.getMessage()); + } } /** @@ -519,17 +540,6 @@ public class ExtractTaskRunnable implements Runnable { kafkaTemplate.flush(); } - /** - * send row data to topic,that has multiple partition - * - * @param row row - */ - public void sendMultiplePartitionsRowData(RowDataHash row) { - row.setPartition(calcSimplePartition(row.getPrimaryKeyHash())); - kafkaTemplate - .send(new ProducerRecord<>(topicName, row.getPartition(), row.getPrimaryKey(), JSON.toJSONString(row))); - } - /** * this topic has multiple partitions *