diff --git a/config/application-sink.yml b/config/application-sink.yml
index fbc3b6705ec29a5481bb71b7e50b81305aa7cfda..b26aee59886fee234f52cf998ddebbf034f4e63c 100644
--- a/config/application-sink.yml
+++ b/config/application-sink.yml
@@ -5,12 +5,12 @@ logging:
spring:
check:
server-uri: http://127.0.0.1:9000
- core-pool-size: 2
- maximum-pool-size: 10
- maximum-topic-size: 30
+ core-pool-size: 1
+ maximum-pool-size: 5
+ maximum-topic-size: 5
maximum-table-slice-size: 100000
extract:
- schema: jack
+ schema: test
databaseType: OG
query-dop: 8 # jdbc Parallel Query config
debezium-enable: false # no need config,but not delete
@@ -26,7 +26,7 @@ spring:
druid:
dataSourceOne:
driver-class-name: org.opengauss.Driver
- url: jdbc:opengauss://127.0.0.1:5432/postgres?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
+ url: jdbc:opengauss://localhost:5432/postgres?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
username:
password: 'xxxx' # The password text may contain special characters, which need to be enclosed in quotation marks
# Configure initialization connection pool size, minimum number of connections, and maximum number of connections
diff --git a/config/application-source.yml b/config/application-source.yml
index b55106c4bbb2e15c1f96e1add5a8155fbf1bc025..905f460d90cbd9c366c5828d7dfada5a4a416b6e 100644
--- a/config/application-source.yml
+++ b/config/application-source.yml
@@ -6,9 +6,9 @@ logging:
spring:
check:
server-uri: http://127.0.0.1:9000
- core-pool-size: 2
- maximum-pool-size: 10
- maximum-topic-size: 30
+ core-pool-size: 1
+ maximum-pool-size: 5
+ maximum-topic-size: 5
maximum-table-slice-size: 100000
extract:
schema: test
@@ -31,7 +31,7 @@ spring:
druid:
dataSourceOne:
driver-class-name: com.mysql.cj.jdbc.Driver
- url: jdbc:mysql://127.0.0.1:3306/mysql?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&allowPublicKeyRetrieval=true
+ url: jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&allowPublicKeyRetrieval=true
# driver-class-name: org.opengauss.Driver # For openGauss
# url: # jdbc:opengauss://127.0.0.1:5432/postgres?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC # For openGauss
username:
diff --git a/config/application.yml b/config/application.yml
index 7fe37387ef8e506479ad01e51b3d066dfdcf7129..31129d029f44b44f172a97a6cadd6e9b371f4f49 100644
--- a/config/application.yml
+++ b/config/application.yml
@@ -6,8 +6,8 @@ spring:
kafka:
bootstrap-servers: localhost:9092
check:
- core-pool-size: 2
- maximum-pool-size: 10
+ core-pool-size: 1
+ maximum-pool-size: 4
data:
check:
data-path: ./check_result
diff --git a/config/log4j2.xml b/config/log4j2.xml
index 49753d278ab02f71536d580f5ed19704552b9397..eb3ce143b2b65ff9e65082c1701f1820bd9550ea 100644
--- a/config/log4j2.xml
+++ b/config/log4j2.xml
@@ -20,7 +20,10 @@
logs
INFO
${sys:logName}
-
+ kafka
+ business
+
+
@@ -40,6 +43,26 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -71,7 +94,12 @@
-
+
+
+
+
+
+
diff --git a/config/log4j2sink.xml b/config/log4j2sink.xml
index 1cccf7c508d587666fb947e915b5567a7c1cb71c..f53477fc4ee5f9691589754f212b0139c91b9679 100644
--- a/config/log4j2sink.xml
+++ b/config/log4j2sink.xml
@@ -19,9 +19,10 @@
logs
INFO
${sys:logName}
- business
-
-
+ kafka
+ business
+
+
@@ -41,8 +42,18 @@
-
+
+
+
+
+
+
+
+
+
+
@@ -85,6 +96,9 @@
+
+
+
diff --git a/config/log4j2source.xml b/config/log4j2source.xml
index 221ac6d8760982cac0f09f7e8b89485d0b336531..91f421d842734b6ded0195af0409d37a8549bd15 100644
--- a/config/log4j2source.xml
+++ b/config/log4j2source.xml
@@ -20,9 +20,10 @@
logs
INFO
${sys:logName}
- business
-
-
+ kafka
+ business
+
+
@@ -42,8 +43,18 @@
-
+
+
+
+
+
+
+
+
+
+
@@ -84,6 +95,9 @@
+
+
+
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/DeleteTopicsEventListener.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/DeleteTopicsEventListener.java
index 8eee8a0a5677fa3e2481f25c895a634be11d5714..4f466276af1ba36ea0b45929b8bf061af66781b6 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/DeleteTopicsEventListener.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/DeleteTopicsEventListener.java
@@ -15,16 +15,15 @@
package org.opengauss.datachecker.check.event;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
-import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.KafkaFuture;
+import org.apache.logging.log4j.Logger;
import org.opengauss.datachecker.check.client.FeignClientService;
import org.opengauss.datachecker.common.entry.enums.Endpoint;
+import org.opengauss.datachecker.common.util.LogUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
@@ -34,56 +33,46 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
/**
* @author :wangchao
* @date :Created in 2023/3/7
* @since :11
*/
-@Slf4j
@Component
public class DeleteTopicsEventListener implements ApplicationListener {
- private static final int DELETE_RETRY_TIMES = 3;
+ private static final Logger log = LogUtils.geKafkaLogger();
+
@Resource
private FeignClientService feignClient;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
private AdminClient adminClient = null;
- private AtomicInteger retryTimes = new AtomicInteger(0);
@Override
public void onApplicationEvent(DeleteTopicsEvent event) {
try {
+ log.info("delete topic event : {}", event.getMessage());
final Object source = event.getSource();
initAdminClient();
final DeleteTopics deleteOption = (DeleteTopics) source;
deleteTopic(deleteOption.getTopicList());
feignClient.notifyCheckTableFinished(Endpoint.SOURCE, deleteOption.getTableName());
feignClient.notifyCheckTableFinished(Endpoint.SINK, deleteOption.getTableName());
- log.info("delete topic event : {}", event.getMessage());
} catch (Exception exception) {
log.error("delete topic has error ", exception);
}
}
private void deleteTopic(List deleteTopicList) {
- DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(deleteTopicList);
- final KafkaFuture kafkaFuture = deleteTopicsResult.all();
- retryTimes.incrementAndGet();
try {
+ log.info("delete topic [{}] start", deleteTopicList);
+ DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(deleteTopicList);
+ final KafkaFuture kafkaFuture = deleteTopicsResult.all();
kafkaFuture.get();
- List checkedList = adminClient.listTopics().listings().get().stream().map(TopicListing::name)
- .filter(deleteTopicList::contains).collect(Collectors.toList());
- if (CollectionUtils.isNotEmpty(checkedList)) {
- if (retryTimes.get() <= DELETE_RETRY_TIMES) {
- deleteTopic(checkedList);
- } else {
- log.error("retry to delete {} topic error : delete too many times(3) ", checkedList);
- }
- }
+ log.info("delete topic [{}] finished", deleteTopicList);
} catch (InterruptedException | ExecutionException ignore) {
+ log.error("delete topic [{}] error : ", deleteTopicList, ignore);
}
}
@@ -92,6 +81,7 @@ public class DeleteTopicsEventListener implements ApplicationListener props = new HashMap<>(1);
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
this.adminClient = KafkaAdminClient.create(props);
+ log.info("init admin client [{}]", bootstrapServers);
}
}
}
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/KafkaTopicDeleteProvider.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/KafkaTopicDeleteProvider.java
index 579d12e514000271cb6f26038fd8843807b0c5ab..2a99bdf6553707749e2830a3a4d597935e09d461 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/KafkaTopicDeleteProvider.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/event/KafkaTopicDeleteProvider.java
@@ -15,12 +15,13 @@
package org.opengauss.datachecker.check.event;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.logging.log4j.Logger;
import org.opengauss.datachecker.check.config.DataCheckProperties;
import org.opengauss.datachecker.check.modules.task.TaskManagerService;
import org.opengauss.datachecker.common.entry.enums.Endpoint;
+import org.opengauss.datachecker.common.util.LogUtils;
import org.opengauss.datachecker.common.util.TopicUtil;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
@@ -41,11 +42,12 @@ import java.util.concurrent.TimeUnit;
* @date :Created in 2023/3/7
* @since :11
*/
-@Slf4j
@Service
public class KafkaTopicDeleteProvider implements ApplicationContextAware {
- private ApplicationContext applicationContext;
+ private static final Logger logKafka = LogUtils.geKafkaLogger();
private static volatile Map deleteTableMap = new ConcurrentHashMap<>();
+
+ private ApplicationContext applicationContext;
@Resource
private DataCheckProperties properties;
@Resource
@@ -101,7 +103,7 @@ public class KafkaTopicDeleteProvider implements ApplicationContextAware {
});
if (CollectionUtils.isNotEmpty(deleteOptions)) {
deleteOptions.forEach(deleteOption -> {
- log.info("publish delete-topic-event table = [{}] , current-pending-quantity = [{}]",
+ logKafka.info("publish delete-topic-event table = [{}] , current-pending-quantity = [{}]",
deleteOption.getTableName(), deleteTableMap.size());
applicationContext.publishEvent(new DeleteTopicsEvent(deleteOption, deleteOption.toString()));
deleteTableMap.remove(deleteOption.getTableName());
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java
index 457eb2eab03c69db5c79899f1001cf4ff9708d64..5d217ec2518824801b70746b973e45107428aa09 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java
@@ -43,6 +43,7 @@ import org.opengauss.datachecker.common.entry.extract.RowDataHash;
import org.opengauss.datachecker.common.entry.extract.TableMetadata;
import org.opengauss.datachecker.common.exception.LargeDataDiffException;
import org.opengauss.datachecker.common.exception.MerkleTreeDepthException;
+import org.opengauss.datachecker.common.util.LogUtils;
import org.opengauss.datachecker.common.util.SpringUtil;
import org.opengauss.datachecker.common.util.TopicUtil;
import org.springframework.lang.NonNull;
@@ -70,7 +71,8 @@ import java.util.concurrent.ConcurrentHashMap;
* @since :11
*/
public class DataCheckRunnable implements Runnable {
- public static final Logger log = LogManager.getLogger("check_business");
+ private static final Logger log = LogUtils.getCheckLogger();
+ private static final Logger logKafka = LogUtils.geKafkaLogger();
private static final int THRESHOLD_MIN_BUCKET_SIZE = 2;
private final List sourceBucketList = Collections.synchronizedList(new ArrayList<>());
@@ -117,6 +119,7 @@ public class DataCheckRunnable implements Runnable {
private KafkaConsumerHandler buildKafkaHandler(DataCheckRunnableSupport support) {
KafkaConsumerService kafkaConsumerService = support.getKafkaConsumerService();
+ logKafka.info("create kafka consumer for [{}] [{}]", tableName, partitions);
return new KafkaConsumerHandler(kafkaConsumerService.buildKafkaConsumer(false),
kafkaConsumerService.getRetryFetchRecordTimes());
}
@@ -146,6 +149,9 @@ public class DataCheckRunnable implements Runnable {
checkResult();
cleanCheckThreadEnvironment();
checkRateCache.add(buildCheckTable());
+ kafkaConsumerHandler.closeConsumer();
+ logKafka.info("close table consumer of topic, [{},{}] : [{} : {}] ", tableName, partitions, sourceTopic,
+ sinkTopic);
log.info("check table result {} complete!", tableName);
}
}
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java
index 29b5a9fcbdb556c40de74cad3ca9886aabe5ada7..e71435669ef3f4bfa358b6ca2f08c0835a9d4ac9 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java
@@ -39,6 +39,7 @@ import java.util.Map;
*/
@Slf4j
public class KafkaConsumerHandler {
+
private static final int KAFKA_CONSUMER_POLL_DURATION = 20;
private final KafkaConsumer kafkaConsumer;
@@ -122,4 +123,8 @@ public class KafkaConsumerHandler {
dataList.add(JSON.parseObject(record.value(), RowDataHash.class));
});
}
+
+ public void closeConsumer() {
+ kafkaConsumer.close(Duration.ofSeconds(1));
+ }
}
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/LogUtils.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/LogUtils.java
new file mode 100644
index 0000000000000000000000000000000000000000..55e39c38567def851e4348ca80ffc0a62a33885e
--- /dev/null
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/LogUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd.
+ *
+ * openGauss is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ *
+ * http://license.coscl.org.cn/MulanPSL2
+ *
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ */
+
+package org.opengauss.datachecker.common.util;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * LogUtils
+ *
+ * @author :wangchao
+ * @date :Created in 2023/6/14
+ * @since :11
+ */
+public class LogUtils {
+ private static final String CHECK_BUSINESS = "check_business";
+ private static final String EXTRACT_BUSINESS = "extract_business";
+ private static final String KAFKA_BUSINESS = "kafka_business";
+
+ /**
+ * get kafka business logger
+ *
+ * @return logger
+ */
+ public static Logger geKafkaLogger() {
+ return LogManager.getLogger(KAFKA_BUSINESS);
+ }
+
+ /**
+ * get check business logger
+ *
+ * @return logger
+ */
+ public static Logger getCheckLogger() {
+ return LogManager.getLogger(CHECK_BUSINESS);
+ }
+
+ /**
+ * get extrect business logger
+ *
+ * @return logger
+ */
+ public static Logger getExtractLogger() {
+ return LogManager.getLogger(EXTRACT_BUSINESS);
+ }
+}
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java
index 0ffb1e35bfc9bca8de5e285d998f306ea05ae877..3a922ee70eeb1f84c88795253021e796ff7bc879 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/kafka/KafkaAdminService.java
@@ -24,8 +24,9 @@ import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.KafkaFuture;
+import org.apache.logging.log4j.Logger;
import org.opengauss.datachecker.common.entry.enums.Endpoint;
-import org.opengauss.datachecker.common.exception.CreateTopicException;
+import org.opengauss.datachecker.common.util.LogUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.KafkaException;
import org.springframework.stereotype.Component;
@@ -36,7 +37,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -49,13 +49,11 @@ import java.util.stream.Collectors;
* @since :11
*/
@Component
-@Slf4j
public class KafkaAdminService {
+ private static final Logger log = LogUtils.geKafkaLogger();
@Value("${spring.kafka.bootstrap-servers}")
private String springKafkaBootstrapServers;
private AdminClient adminClient;
- @Value("${spring.check.maximum-topic-size}")
- private int maximumTopicSize = 50;
private String endpointTopicPrefix = "";
private ReentrantLock lock = new ReentrantLock();
@@ -74,6 +72,7 @@ public class KafkaAdminService {
adminClient = KafkaAdminClient.create(props);
try {
adminClient.listTopics().listings().get();
+ log.info("init and listTopics admin client [{}]", springKafkaBootstrapServers);
} catch (ExecutionException | InterruptedException ex) {
log.error("kafka Client link exception: ", ex);
throw new KafkaException("kafka Client link exception");
@@ -89,18 +88,10 @@ public class KafkaAdminService {
public boolean createTopic(String topic, int partitions) {
lock.lock();
try {
- KafkaFuture> names = adminClient.listTopics().names();
- if (names.get().contains(topic)) {
- return true;
- } else {
- CreateTopicsResult topicsResult =
- adminClient.createTopics(List.of(new NewTopic(topic, partitions, (short) 1)));
- log.info("topic={} create,numPartitions={}, short replicationFactor={}", topic, partitions, 1);
- return topicsResult.all().isDone();
- }
- } catch (InterruptedException | ExecutionException e) {
- log.error("topic={} is delete error : {}", topic, e);
- throw new CreateTopicException(topic);
+ CreateTopicsResult topicsResult =
+ adminClient.createTopics(List.of(new NewTopic(topic, partitions, (short) 1)));
+ log.info("create topic success , name= [{}] numPartitions = [{}]", topic, partitions);
+ return topicsResult.all().isDone();
} finally {
lock.unlock();
}
@@ -117,7 +108,7 @@ public class KafkaAdminService {
kafkaFutureMap.forEach((topic, future) -> {
try {
future.get();
- log.debug("topic={} is delete successfull", topic);
+ log.info("topic={} is delete successfull", topic);
} catch (InterruptedException | ExecutionException e) {
log.error("topic={} is delete error : {}", topic, e);
}
@@ -132,7 +123,7 @@ public class KafkaAdminService {
*/
public List getAllTopic(String prefix) {
try {
- log.debug("topic prefix :{}", prefix);
+ log.info("get topic from kafka list topics and prefix [{}]", prefix);
return adminClient.listTopics().listings().get().stream().map(TopicListing::name)
.filter(name -> name.startsWith(prefix)).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
@@ -148,6 +139,7 @@ public class KafkaAdminService {
*/
public List getAllTopic() {
try {
+ log.info("get topic from kafka list topics");
return adminClient.listTopics().listings().get().stream().map(TopicListing::name)
.collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
@@ -164,6 +156,7 @@ public class KafkaAdminService {
*/
public boolean isTopicExists(String topicName) {
try {
+ log.info("check topic [{}] has exists --> check kafka list topics", topicName);
return adminClient.listTopics().listings().get().stream().map(TopicListing::name)
.anyMatch(name -> name.equalsIgnoreCase(topicName));
} catch (InterruptedException | ExecutionException e) {
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java
index e5e4135bf6203d4aa3e591f6196c4549147af7f6..e4b79f300990f27ae6fb374192176afb5f6fd9fb 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java
@@ -17,6 +17,7 @@ package org.opengauss.datachecker.extract.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.logging.log4j.Logger;
import org.opengauss.datachecker.common.constant.Constants;
import org.opengauss.datachecker.common.entry.enums.DML;
import org.opengauss.datachecker.common.entry.enums.Endpoint;
@@ -34,6 +35,7 @@ import org.opengauss.datachecker.common.exception.ProcessMultipleException;
import org.opengauss.datachecker.common.exception.TableNotExistException;
import org.opengauss.datachecker.common.exception.TaskNotFoundException;
import org.opengauss.datachecker.common.service.DynamicThreadPoolManager;
+import org.opengauss.datachecker.common.util.LogUtils;
import org.opengauss.datachecker.common.util.ThreadUtil;
import org.opengauss.datachecker.common.util.TopicUtil;
import org.opengauss.datachecker.extract.cache.MetaDataCache;
@@ -76,7 +78,7 @@ import static org.opengauss.datachecker.common.constant.DynamicTpConstant.EXTRAC
@Slf4j
@Service
public class DataExtractServiceImpl implements DataExtractService {
-
+ private static final Logger logKafka = LogUtils.geKafkaLogger();
/**
* Maximum number of sleeps of threads executing data extraction tasks
*/
@@ -304,6 +306,7 @@ public class DataExtractServiceImpl implements DataExtractService {
}
Topic topic = task.getTopic();
Endpoint endpoint = extractProperties.getEndpoint();
+ logKafka.info("try to create [{}] [{}]", topic.getTopicName(endpoint), topic.getPartitions());
kafkaAdminService.createTopic(topic.getTopicName(endpoint), topic.getPartitions());
topicCache.add(topic);
log.info("current topic cache size = {}", topicCache.size());
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java
index 87076cb2ca18201a56eaac34990e64bb7ea5668c..af49e7929ae1367f6ec88791c28e1c4223346386 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java
@@ -17,7 +17,6 @@ package org.opengauss.datachecker.extract.task;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opengauss.datachecker.common.constant.DynamicTpConstant;
import org.opengauss.datachecker.common.entry.common.ExtractContext;
@@ -28,11 +27,12 @@ import org.opengauss.datachecker.common.entry.extract.RowDataHash;
import org.opengauss.datachecker.common.entry.extract.TableMetadata;
import org.opengauss.datachecker.common.entry.extract.Topic;
import org.opengauss.datachecker.common.exception.ExtractDataAccessException;
+import org.opengauss.datachecker.common.exception.ExtractException;
import org.opengauss.datachecker.common.service.DynamicThreadPoolManager;
+import org.opengauss.datachecker.common.util.LogUtils;
import org.opengauss.datachecker.common.util.TaskUtil;
import org.opengauss.datachecker.common.util.ThreadUtil;
import org.opengauss.datachecker.extract.client.CheckingFeignClient;
-import org.opengauss.datachecker.extract.kafka.KafkaAdminService;
import org.opengauss.datachecker.extract.resource.ResourceManager;
import org.opengauss.datachecker.extract.task.sql.QuerySqlEntry;
import org.opengauss.datachecker.extract.task.sql.SelectSqlBuilder;
@@ -72,7 +72,8 @@ import static org.opengauss.datachecker.common.constant.DynamicTpConstant.TABLE_
* @since 11
**/
public class ExtractTaskRunnable implements Runnable {
- public static final Logger log = LogManager.getLogger("extract_business");
+ private static final Logger log = LogUtils.getExtractLogger();
+ private static final Logger logKafka = LogUtils.geKafkaLogger();
private static final int FETCH_SIZE = 10000;
private final ExtractTask task;
private final DynamicThreadPoolManager dynamicThreadPoolManager;
@@ -471,6 +472,7 @@ public class ExtractTaskRunnable implements Runnable {
* kafka operations
*/
class KafkaOperations {
+
private static final int DEFAULT_PARTITION = 0;
private static final int MIN_PARTITION_NUM = 1;
@@ -497,6 +499,16 @@ public class ExtractTaskRunnable implements Runnable {
this.topicPartitionCount = topic.getPartitions();
}
+ /**
+ * send row data to topic,that has multiple partition
+ *
+ * @param row row
+ */
+ public void sendMultiplePartitionsRowData(RowDataHash row) {
+ row.setPartition(calcSimplePartition(row.getPrimaryKeyHash()));
+ send(topicName, row.getPartition(), row.getPrimaryKey(), JSON.toJSONString(row));
+ }
+
/**
* send row data to topic,that has single partition
*
@@ -508,8 +520,17 @@ public class ExtractTaskRunnable implements Runnable {
log.debug("row data hash zero :{}:{}", row.getPrimaryKey(), JSON.toJSONString(row));
return;
}
- kafkaTemplate
- .send(new ProducerRecord<>(topicName, DEFAULT_PARTITION, row.getPrimaryKey(), JSON.toJSONString(row)));
+ send(topicName, DEFAULT_PARTITION, row.getPrimaryKey(), JSON.toJSONString(row));
+ }
+
+ private void send(String topicName, int partition, String key, String message) {
+ try {
+ kafkaTemplate.send(new ProducerRecord<>(topicName, partition, key, message));
+ } catch (Exception kafkaException) {
+ logKafka.error("send kafka [{} , {}] record error {}", topicName, key, kafkaException);
+ throw new ExtractException(
+ "send kafka [" + topicName + " , " + key + "] record " + kafkaException.getMessage());
+ }
}
/**
@@ -519,17 +540,6 @@ public class ExtractTaskRunnable implements Runnable {
kafkaTemplate.flush();
}
- /**
- * send row data to topic,that has multiple partition
- *
- * @param row row
- */
- public void sendMultiplePartitionsRowData(RowDataHash row) {
- row.setPartition(calcSimplePartition(row.getPrimaryKeyHash()));
- kafkaTemplate
- .send(new ProducerRecord<>(topicName, row.getPartition(), row.getPrimaryKey(), JSON.toJSONString(row)));
- }
-
/**
* this topic has multiple partitions
*