diff --git a/config/log4j2.xml b/config/log4j2.xml index d3e2b1514a0ce4ebaf3163b0b6ab3bf22f37b1a3..46a03ad6b3e86a4f52b37c2b5c8888116d3ac2f7 100644 --- a/config/log4j2.xml +++ b/config/log4j2.xml @@ -14,7 +14,7 @@ ~ See the Mulan PSL v2 for more details. --> - + logs @@ -34,11 +34,18 @@ + filePattern="${LOG_HOME}/history/${LOG_NAME}.%d{yyyyMMddHHmm}.zip"> - + + + + + + + + diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/KafkaConsumerConfig.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/KafkaConsumerConfig.java index ad7dae8eeadf046e6b4b93efd37396d7f94c374d..954dc6b38467ee320e4784acfcf543fcae255efa 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/KafkaConsumerConfig.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/KafkaConsumerConfig.java @@ -17,15 +17,20 @@ package org.opengauss.datachecker.check.config; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.opengauss.datachecker.common.config.ConfigCache; +import org.opengauss.datachecker.common.constant.ConfigConstants; import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; +import org.opengauss.datachecker.common.exception.CheckingException; +import org.opengauss.datachecker.common.util.ThreadUtil; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; import static org.opengauss.datachecker.common.constant.ConfigConstants.KAFKA_SERVERS; import static org.opengauss.datachecker.common.constant.ConfigConstants.KAFKA_DEFAULT_GROUP_ID; @@ -44,10 +49,59 @@ import static org.opengauss.datachecker.common.constant.ConfigConstants.KAFKA_FE */ @Component public class KafkaConsumerConfig { + private LinkedBlockingQueue> consumerPool = new LinkedBlockingQueue<>(); + + /** + * 初始化消费者池 + */ + public void initConsumerPool() { + int maxPoolSize = ConfigCache.getIntValue(ConfigConstants.MAXIMUM_POOL_SIZE); + String process = ConfigCache.getValue(ConfigConstants.PROCESS_NO); + for (int i = 0; i < maxPoolSize; i++) { + consumerPool.add((KafkaConsumer) consumerFactory(process).createConsumer()); + } + } + + /** + * 获取一个空闲Kafka consumer + * + * @return consumer + */ + public KafkaConsumer takeConsumer() { + try { + return consumerPool.take(); + } catch (InterruptedException e) { + throw new CheckingException("take consumer interruptedException"); + } + } + + /** + * 归还当前kafka consumer 到消费者池 + * + * @param consumer consumer + */ + public void returnConsumer(KafkaConsumer consumer) { + consumerPool.add(consumer); + } + + /** + * 等待全部消费者归还后,关闭消费者池 + */ + public void closeConsumerPool() { + int maxPoolSize = ConfigCache.getIntValue(ConfigConstants.MAXIMUM_POOL_SIZE); + while (consumerPool.size() < maxPoolSize) { + ThreadUtil.sleepMaxHalfSecond(); + } + while (!consumerPool.isEmpty()) { + KafkaConsumer consumer = takeConsumer(); + consumer.close(); + } + } + /** * consumerConfigs * - * @param groupId + * @param groupId groupId * @return consumerConfigs */ public Map consumerConfigs(String groupId) { diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/StartLoadRunner.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/StartLoadRunner.java index 3e2ce66a2d663ab8f20542e26ab953816432caee..ffa1f0db2abf12f56f528d033ad8eb12f80b986f 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/StartLoadRunner.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/StartLoadRunner.java @@ -49,7 +49,6 @@ public class StartLoadRunner implements ApplicationRunner { LogUtils.info(log, "start load runner :{}", Arrays.deepToString(args.getSourceArgs())); configManagement.init(); startRunner.initCheckEnvironment(checkEnvironment); - kafkaServiceManager.initAdminClient(); startRunner.start(); } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/CheckDiffResult.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/CheckDiffResult.java index 94c4fdc448d222ff2153e09cc9a979827f46d0ef..2bf8f142ed7c7c2c5d5e9921e6ea395111455598 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/CheckDiffResult.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/CheckDiffResult.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.check.modules.check; import com.alibaba.fastjson.annotation.JSONType; import lombok.Data; +import org.apache.commons.lang3.StringUtils; import org.opengauss.datachecker.common.entry.check.Difference; import org.opengauss.datachecker.common.entry.enums.CheckMode; import org.opengauss.datachecker.common.entry.enums.Endpoint; @@ -148,8 +149,13 @@ public class CheckDiffResult { } if (CheckResultUtils.isEmptyDiff(keyDeleteSet, keyUpdateSet, keyInsertSet) && CheckResultUtils.isEmptyDiff( keyDelete, keyUpdate, keyInsert)) { - result = RESULT_SUCCESS; - message += result; + if (StringUtils.isEmpty(error)) { + result = RESULT_SUCCESS; + message += result; + } else { + result = RESULT_FAILED; + message += error; + } } else { result = RESULT_FAILED; message += String.format(FAILED_MESSAGE, keyInsertSet.size() + keyInsert.size(), @@ -157,6 +163,9 @@ public class CheckDiffResult { if (totalRepair > 0 && !isNotLargeDiffKeys) { message += CHECKED_DIFF_TOO_LARGE; } + if (StringUtils.isNotEmpty(error)) { + message += error; + } } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java index b93f1dda4e0c97e89781097604420cdfa59a53c0..e1420403447b430cc42704e2ff14c992bdd76ed4 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java @@ -24,14 +24,11 @@ import org.apache.kafka.common.TopicPartition; import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.SliceExtend; +import org.opengauss.datachecker.common.exception.CheckConsumerPollEmptyException; import org.opengauss.datachecker.common.util.LogUtils; -import org.opengauss.datachecker.common.util.ThreadUtil; import java.time.Duration; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.atomic.AtomicLong; /** @@ -44,9 +41,9 @@ import java.util.concurrent.atomic.AtomicLong; public class KafkaConsumerHandler { private static final Logger log = LogUtils.getLogger(KafkaConsumerHandler.class); private static final int KAFKA_CONSUMER_POLL_DURATION = 20; + private static final int MAX_CONSUMER_POLL_TIMES = 50; - private final KafkaConsumer kafkaConsumer; - + private KafkaConsumer kafkaConsumer; /** * Constructor * @@ -57,6 +54,23 @@ public class KafkaConsumerHandler { kafkaConsumer = consumer; } + /** + * Constructor + * + * @param consumer consumer + */ + public KafkaConsumerHandler(KafkaConsumer consumer) { + kafkaConsumer = consumer; + } + + /** + * 获取kafka consumer + * + */ + public KafkaConsumer getConsumer() { + return kafkaConsumer; + } + /** * Query the Kafka partition data corresponding to the specified table * @@ -83,19 +97,35 @@ public class KafkaConsumerHandler { * * @param topicPartition topic partition * @param sExtend slice extend - * @param dataList data list + * @param attempts */ - public void pollTpSliceData(TopicPartition topicPartition, SliceExtend sExtend, List dataList) { + public void consumerAssign(TopicPartition topicPartition, SliceExtend sExtend, int attempts) { kafkaConsumer.assign(List.of(topicPartition)); - kafkaConsumer.seek(topicPartition, sExtend.getStartOffset()); + if (attempts == 0) { + kafkaConsumer.seek(topicPartition, sExtend.getStartOffset()); + } + } + + /** + * consumer poll data from the topic partition, and filter bu slice extend. then add data in the data list. + * + * @param sExtend slice extend + * @param dataList data list + */ + public synchronized void pollTpSliceData(SliceExtend sExtend, List dataList) { AtomicLong currentCount = new AtomicLong(0); + int pollEmptyCount = 0; while (currentCount.get() < sExtend.getCount()) { ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); if (records.count() <= 0) { - ThreadUtil.sleepHalfSecond(); + pollEmptyCount++; + if (pollEmptyCount > MAX_CONSUMER_POLL_TIMES) { + throw new CheckConsumerPollEmptyException(sExtend.getName()); + } continue; } + pollEmptyCount = 0; records.forEach(record -> { RowDataHash row = JSON.parseObject(record.value(), RowDataHash.class); if (row.getSNo() == sExtend.getNo() && StringUtils.equals(record.key(), sExtend.getName())) { @@ -170,8 +200,8 @@ public class KafkaConsumerHandler { */ public void closeConsumer() { if (kafkaConsumer != null) { - kafkaConsumer.unsubscribe(); kafkaConsumer.close(); + kafkaConsumer = null; } } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java index 5b9adb2e814cfb82ee9c0bc4f92dc0f5fccb4847..c7cb97387c45f73c8a3805e7fc8c8c9df6477e82 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java @@ -241,7 +241,7 @@ public class SliceCheckResultManager { .setKeyUpdateSet(getKeyList(updateKeySet, hasMore, "update key has more;")); failed.setDiffCount(failed.getKeyInsertSize() + failed.getKeyUpdateSize() + failed.getKeyDeleteSize()); String message = String.format(FAILED_MESSAGE, failed.getKeyInsertSize(), failed.getKeyUpdateSize(), - failed.getKeyDeleteSize()); + failed.getKeyDeleteSize()) + resultCommon.getError(); if (resultCommon.isTableStructureEquals()) { failed.setMessage(message); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/ConfigManagement.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/ConfigManagement.java index d4fd0d185597b03d4d1d8180ee93b78a113e2a16..7c156db6d6c557b4f053e2dcd73d0e68a5c244db 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/ConfigManagement.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/ConfigManagement.java @@ -60,9 +60,12 @@ public class ConfigManagement { private int maxTopicSize; @Value("${spring.check.rest-api-page-size}") private int restApiPageSize; + @Value("${data.check.auto-delete-topic}") + private int autoDeleteTopic; @Value("${data.check.sql_mode_pad_char_to_full_length}") private boolean sqlModePadCharToFullLength; - + @Value("${spring.check.maximum-pool-size}") + private int maxPoolSize = 10; /** * config management init */ @@ -76,6 +79,8 @@ public class ConfigManagement { ConfigCache.put(ConfigConstants.SQL_MODE_PAD_CHAR_TO_FULL_LENGTH, sqlModePadCharToFullLength); ConfigCache.put(ConfigConstants.ENABLE_HEART_BEAT_HEATH, enableHeartBeatHeath); ConfigCache.put(ConfigConstants.REST_API_PAGE_SIZE, restApiPageSize); + ConfigCache.put(ConfigConstants.AUTO_DELETE_TOPIC, autoDeleteTopic); + ConfigCache.put(ConfigConstants.MAXIMUM_POOL_SIZE, maxPoolSize); initKafka(); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/KafkaServiceManager.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/KafkaServiceManager.java index a4138a5e47ee4900d83a7d880a46cdfa02f076b2..9783a38dfe2a571de3a6dfc3c3106d5591f53970 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/KafkaServiceManager.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/KafkaServiceManager.java @@ -64,10 +64,24 @@ public class KafkaServiceManager { @Resource private KafkaConsumerConfig kafkaConsumerConfig; + /** + * 返回一个KafkaTemplate 实例 + * + * @return kafkaTemplate + */ public KafkaTemplate getKafkaTemplate() { return kafkaTemplate; } + /** + * 销毁kafkaTemplate + */ + @PreDestroy + public void destroy() { + kafkaTemplate.destroy(); + adminClient.close(); + } + public KafkaConsumer getKafkaConsumer(boolean isNewGroup) { Consumer consumer; if (isNewGroup) { @@ -81,7 +95,7 @@ public class KafkaServiceManager { } /** - * Initialize Admin Client + * Initialize Admin Client and init consumer pool */ public void initAdminClient() { Map props = new HashMap<>(1); @@ -97,6 +111,7 @@ public class KafkaServiceManager { log.error("kafka Client link exception: ", ex); throw new KafkaException("kafka Client link exception"); } + kafkaConsumerConfig.initConsumerPool(); } /** @@ -183,4 +198,11 @@ public class KafkaServiceManager { kafkaConsumer.close(); } } + + /** + * 等待全部消费者归还后,关闭消费者池 + */ + public void closeConsumerPool() { + kafkaConsumerConfig.closeConsumerPool(); + } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java index 9dcd6c99e0c9ded809d9ca6daf6b79948a1d0cef..832e8a0e11e9d480f28e92fd01f1e5d9fbc665b6 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java @@ -57,6 +57,7 @@ public class TaskRegisterCenter { private static final Logger log = LogUtils.getLogger(TaskRegisterCenter.class); private static final int STATUS_UPDATED_ALL = 3; + private static final int STATUS_FAILED = -1; private final ReentrantLock lock = new ReentrantLock(); @Resource @@ -116,8 +117,14 @@ public class TaskRegisterCenter { Endpoint endpoint = sliceExt.getEndpoint(); MapUtils.put(sliceExtendMap, sliceName, endpoint, sliceExt); LogUtils.debug(log, "{} update slice [{}] status [{}->{}]", endpoint, sliceName, oldStatus, curStatus); + SliceExtend source = MapUtils.get(sliceExtendMap, sliceName, Endpoint.SOURCE); + SliceExtend sink = MapUtils.get(sliceExtendMap, sliceName, Endpoint.SINK); if (curStatus == STATUS_UPDATED_ALL) { - notifySliceCheckHandle(slice); + sliceCheckEventHandler.handle(new SliceCheckEvent(slice, source, sink)); + remove(slice); + }else if (curStatus == STATUS_FAILED) { + sliceCheckEventHandler.handleFailed(new SliceCheckEvent(slice, source, sink)); + remove(slice); } } } finally { @@ -125,14 +132,6 @@ public class TaskRegisterCenter { } } - private void notifySliceCheckHandle(SliceVo slice) { - String sliceName = slice.getName(); - SliceExtend source = MapUtils.get(sliceExtendMap, sliceName, Endpoint.SOURCE); - SliceExtend sink = MapUtils.get(sliceExtendMap, sliceName, Endpoint.SINK); - sliceCheckEventHandler.handle(new SliceCheckEvent(slice, source, sink)); - remove(slice); - } - private void notifyIgnoreSliceCheckHandle(String table) { removeIgnoreTables(table); List> tableSliceList = center.entrySet() diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TopicInitialize.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TopicInitialize.java index 57e9d00f31ef48f7f1071a34f24b212c40816836..eb2a3d8db33e8101ed70f97a64e3f18b9c6efd83 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TopicInitialize.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TopicInitialize.java @@ -93,13 +93,18 @@ public class TopicInitialize { * */ public void drop() { - if (StringUtils.isNotEmpty(checkPointSwapTopicName)) { - kafkaServiceManager.deleteTopic(List.of(checkPointSwapTopicName)); - LogUtils.info(log, "drop check point swap topic [{}]", checkPointSwapTopicName); - } - if (CollectionUtils.isNotEmpty(topicList)) { - topicList.forEach(topic -> kafkaServiceManager.deleteTopic(List.of(topic))); - LogUtils.info(log, "drop data check fixed topic name {}", topicList); + kafkaServiceManager.closeConsumerPool(); + if (ConfigCache.isDeleteTopic()) { + if (StringUtils.isNotEmpty(checkPointSwapTopicName)) { + kafkaServiceManager.deleteTopic(List.of(checkPointSwapTopicName)); + LogUtils.info(log, "drop check point swap topic [{}]", checkPointSwapTopicName); + } + if (CollectionUtils.isNotEmpty(topicList)) { + topicList.forEach(topic -> kafkaServiceManager.deleteTopic(List.of(topic))); + LogUtils.info(log, "drop data check fixed topic name {}", topicList); + } + } else { + LogUtils.info(log, "delete topic is disabled by config [{},{}]", checkPointSwapTopicName, topicList); } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java index 459d28a71f70a375c9e72c0bf9400fc3411f4274..5b23f5571297df1fed07e0436a6a7ae65de74626 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java @@ -15,6 +15,7 @@ package org.opengauss.datachecker.check.slice; +import org.opengauss.datachecker.check.config.KafkaConsumerConfig; import org.opengauss.datachecker.check.modules.check.CheckDiffResult; import org.opengauss.datachecker.check.modules.check.KafkaConsumerHandler; import org.opengauss.datachecker.check.modules.check.KafkaConsumerService; @@ -42,6 +43,8 @@ public class SliceCheckContext { @Resource private KafkaConsumerService kafkaConsumerService; @Resource + private KafkaConsumerConfig kafkaConsumerConfig; + @Resource private SliceProgressService sliceProgressService; @Resource private SliceCheckResultManager sliceCheckResultManager; @@ -54,9 +57,18 @@ public class SliceCheckContext { * @param groupId groupId * @return KafkaConsumerHandler */ - public KafkaConsumerHandler buildKafkaHandler(String groupId) { - return new KafkaConsumerHandler(kafkaConsumerService.buildKafkaConsumer(groupId), - kafkaConsumerService.getRetryFetchRecordTimes()); + public synchronized KafkaConsumerHandler buildKafkaHandler(String groupId) { + KafkaConsumerHandler kafkaHandler = new KafkaConsumerHandler(kafkaConsumerConfig.takeConsumer()); + return new KafkaConsumerHandler(kafkaConsumerConfig.takeConsumer()); + } + + /** + * 从kafka消费者缓存池中,获取一个consumer,并创建kafka处理器对象 + * + * @return KafkaConsumerHandler + */ + public synchronized KafkaConsumerHandler createKafkaHandler() { + return new KafkaConsumerHandler(kafkaConsumerConfig.takeConsumer()); } /** @@ -109,4 +121,13 @@ public class SliceCheckContext { public void saveProcessHistoryLogging(SliceVo slice) { processLogService.saveProcessHistoryLogging(slice.getTable(), slice.getNo()); } + + /** + * 归还当前kafka consumer 到消费者池 + * + * @param consumer consumer + */ + public void returnConsumer(KafkaConsumerHandler consumer) { + kafkaConsumerConfig.returnConsumer(consumer.getConsumer()); + } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java index 9c9973f32c061bc5eed7bac3c07bf8a1a1c21f26..0e7771b786740b219d1ef773c9c5ba954e7561c8 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java @@ -86,28 +86,51 @@ public class SliceCheckEventHandler { } } - private void handleTableStructureDiff(SliceCheckEvent checkEvent) { + /** + * 添加校验失败分片事件处理流程 + * + * @param checkEvent + */ + public void handleFailed(SliceCheckEvent checkEvent) { + LogUtils.warn(log, "slice check event , table slice has unknown error [{}][{} : {}]", checkEvent.getCheckName(), + checkEvent.getSource() + .getTableHash(), checkEvent.getSink() + .getTableHash()); + long count = getCheckSliceCount(checkEvent); + sliceCheckContext.refreshSliceCheckProgress(checkEvent.getSlice(), count); + CheckDiffResult result = buildSliceDiffResult(checkEvent.getSlice(), (int) count, true, "slice has unknown error"); + sliceCheckContext.addCheckResult(checkEvent.getSlice(), result); + registerCenter.refreshCheckedTableCompleted(checkEvent.getSlice() + .getTable()); + } + + private static long getCheckSliceCount(SliceCheckEvent checkEvent) { SliceExtend source = checkEvent.getSource(); SliceExtend sink = checkEvent.getSink(); long count = Math.max(source.getCount(), sink.getCount()); + return count; + } + + private void handleTableStructureDiff(SliceCheckEvent checkEvent) { + long count = getCheckSliceCount(checkEvent); sliceCheckContext.refreshSliceCheckProgress(checkEvent.getSlice(), count); - CheckDiffResult result = buildTableStructureDiffResult(checkEvent.getSlice(), (int) count); + CheckDiffResult result = buildSliceDiffResult(checkEvent.getSlice(), (int) count, false, "table structure diff"); sliceCheckContext.addTableStructureDiffResult(checkEvent.getSlice(), result); } - private CheckDiffResult buildTableStructureDiffResult(SliceVo slice, int count) { + private CheckDiffResult buildSliceDiffResult(SliceVo slice, int count, boolean isTableStructure, String message) { CheckDiffResultBuilder builder = CheckDiffResultBuilder.builder(); builder.checkMode(ConfigCache.getCheckMode()) - .process(ConfigCache.getValue(ConfigConstants.PROCESS_NO)) - .schema(slice.getSchema()) - .table(slice.getTable()) - .sno(slice.getNo()) - .startTime(LocalDateTime.now()) - .endTime(LocalDateTime.now()) - .isTableStructureEquals(false) - .isExistTableMiss(false, null) - .rowCount(count) - .error("table structure diff"); + .process(ConfigCache.getValue(ConfigConstants.PROCESS_NO)) + .schema(slice.getSchema()) + .table(slice.getTable()) + .sno(slice.getNo()) + .startTime(LocalDateTime.now()) + .endTime(LocalDateTime.now()) + .isTableStructureEquals(isTableStructure) + .isExistTableMiss(false, null) + .rowCount(count) + .error(message); return builder.build(); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java index 9af6bc133c07c95a201ff00f0c42ba26985f9db6..29c6bd0bc931616bb8b374430dbd788eef62aaa6 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java @@ -44,6 +44,7 @@ import org.opengauss.datachecker.common.entry.extract.SliceExtend; import org.opengauss.datachecker.common.entry.extract.SliceVo; import org.opengauss.datachecker.common.entry.extract.Topic; import org.opengauss.datachecker.common.exception.BucketNumberInconsistentException; +import org.opengauss.datachecker.common.exception.CheckConsumerPollEmptyException; import org.opengauss.datachecker.common.exception.MerkleTreeDepthException; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.TopicUtil; @@ -71,6 +72,8 @@ import java.util.concurrent.CountDownLatch; public class SliceCheckWorker implements Runnable { private static final Logger LOGGER = LogUtils.getLogger(SliceCheckWorker.class); private static final int THRESHOLD_MIN_BUCKET_SIZE = 2; + // 设置最大尝试次数 + private static final int MAX_ATTEMPTS=5; private final SliceVo slice; @@ -313,11 +316,13 @@ public class SliceCheckWorker implements Runnable { int avgSliceCount = (int) (sourceTuple.getSlice() .getCount() + sinkTuple.getSlice() .getCount()) / 2; + KafkaConsumerHandler consumer = checkContext.createKafkaHandler(); checkTupleList.forEach(check -> { - initBucketList(check.getEndpoint(), check.getSlice(), check.getBuckets(), bucketDiff, avgSliceCount); + initBucketList(check.getEndpoint(), check.getSlice(), check.getBuckets(), bucketDiff, avgSliceCount, consumer); countDownLatch.countDown(); }); countDownLatch.await(); + checkContext.returnConsumer(consumer); LogUtils.debug(LOGGER, "fetch slice {} data from topic, cost {} millis", slice.toSimpleString(), costMillis(startFetch)); // Align the source and destination bucket list @@ -331,16 +336,24 @@ public class SliceCheckWorker implements Runnable { } private void initBucketList(Endpoint endpoint, SliceExtend sliceExtend, List bucketList, - Map> bucketDiff, int avgSliceCount) { + Map> bucketDiff, int avgSliceCount, KafkaConsumerHandler consumer) { // Use feign client to pull Kafka data List dataList = new LinkedList<>(); - TopicPartition topicPartition; - if (Objects.equals(Endpoint.SOURCE, endpoint)) { - topicPartition = new TopicPartition(topic.getSourceTopicName(), topic.getPtnNum()); - } else { - topicPartition = new TopicPartition(topic.getSinkTopicName(), topic.getPtnNum()); + TopicPartition topicPartition = new TopicPartition(Objects.equals(Endpoint.SOURCE, endpoint) ? + topic.getSourceTopicName() : topic.getSinkTopicName(), topic.getPtnNum()); + int attempts = 0; + while (attempts < MAX_ATTEMPTS) { + try { + consumer.consumerAssign(topicPartition, sliceExtend, attempts); + consumer.pollTpSliceData(sliceExtend, dataList); + break; // 如果成功,跳出循环 + } catch (CheckConsumerPollEmptyException ex) { + if (++attempts >= MAX_ATTEMPTS) { + checkContext.returnConsumer(consumer); + throw ex; // 如果达到最大尝试次数,重新抛出异常 + } + } } - getSliceDataFromTopicPartition(topicPartition, sliceExtend, dataList); if (CollectionUtils.isEmpty(dataList)) { return; } @@ -381,13 +394,10 @@ public class SliceCheckWorker implements Runnable { bucketList.sort(Comparator.comparingInt(Bucket::getNumber)); } - private void getSliceDataFromTopicPartition(TopicPartition topicPartition, SliceExtend sExtend, - List dataList) { - KafkaConsumerHandler consumer = checkContext.buildKafkaHandler(sExtend.getName()); - LogUtils.debug(LOGGER, "create consumer of topic, [{}] : [{}] ", slice.getTable(), topicPartition.toString()); - consumer.pollTpSliceData(topicPartition, sExtend, dataList); - consumer.closeConsumer(); - LogUtils.debug(LOGGER, "close consumer of topic, [{}] : [{}] ", slice.getTable(), topicPartition.toString()); + private void getSliceDataFromTopicPartition(KafkaConsumerHandler consumer, SliceExtend sExtend, + List dataList) throws CheckConsumerPollEmptyException { + + } /** diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/TableCheckWorker.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/TableCheckWorker.java index 686c4770ac8986ab968f5d304f9d0d2fc2d7c6c0..9d9c85b75f6e222b9a32dc422e72f2f274f60d66 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/TableCheckWorker.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/TableCheckWorker.java @@ -38,14 +38,13 @@ import org.opengauss.datachecker.common.entry.check.DifferencePair; import org.opengauss.datachecker.common.entry.check.Pair; import org.opengauss.datachecker.common.entry.enums.CheckMode; import org.opengauss.datachecker.common.entry.enums.Endpoint; -import org.opengauss.datachecker.common.entry.extract.ConditionLimit; -import org.opengauss.datachecker.common.entry.extract.RowDataHash; -import org.opengauss.datachecker.common.entry.extract.SliceExtend; -import org.opengauss.datachecker.common.entry.extract.SliceVo; +import org.opengauss.datachecker.common.entry.extract.*; import org.opengauss.datachecker.common.exception.BucketNumberInconsistentException; +import org.opengauss.datachecker.common.exception.CheckConsumerPollEmptyException; import org.opengauss.datachecker.common.exception.MerkleTreeDepthException; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.SpringUtil; +import org.opengauss.datachecker.common.util.TopicUtil; import org.springframework.lang.NonNull; import java.time.LocalDateTime; @@ -79,7 +78,8 @@ public class TableCheckWorker implements Runnable { private final DifferencePair, List, List> difference = DifferencePair.of(new LinkedList<>(), new LinkedList<>(), new LinkedList<>()); private final LocalDateTime startTime; - + private Topic topic = new Topic(); + private final String processNo; /** * slice check worker construct * @@ -91,6 +91,7 @@ public class TableCheckWorker implements Runnable { this.checkContext = sliceCheckContext; this.startTime = LocalDateTime.now(); this.slice = checkEvent.getSlice(); + this.processNo = ConfigCache.getValue(ConfigConstants.PROCESS_NO); } @Override @@ -100,6 +101,7 @@ public class TableCheckWorker implements Runnable { SliceExtend source = checkEvent.getSource(); SliceExtend sink = checkEvent.getSink(); this.sliceRowCont = Math.max(source.getCount(), sink.getCount()); + setTableFixedTopic(); log.info("check table of {}", slice.getName()); checkedTableSliceByTopicPartition(source, sink); } catch (Exception ignore) { @@ -284,27 +286,41 @@ public class TableCheckWorker implements Runnable { Map> bucketDiff = new ConcurrentHashMap<>(); // Get the Kafka partition number corresponding to the current task // Initialize source bucket column list data + KafkaConsumerHandler consumer = checkContext.createKafkaHandler(); CountDownLatch countDownLatch = new CountDownLatch(checkTupleList.size()); checkTupleList.forEach(check -> { - String topicName = checkContext.getTopicName(slice.getTable(), check.getEndpoint()); - TopicPartition topicPartition = new TopicPartition(topicName, 0); - initBucketList(check.getEndpoint(), topicPartition, check.getBuckets(), bucketDiff); + initBucketList(check.getEndpoint(), check.getSlice(), check.getBuckets(), bucketDiff, consumer); countDownLatch.countDown(); }); countDownLatch.await(); - + checkContext.returnConsumer(consumer); // Align the source and destination bucket list alignAllBuckets(sourceTuple, sinkTuple, bucketDiff); sortBuckets(sourceTuple.getBuckets()); sortBuckets(sinkTuple.getBuckets()); } - private void initBucketList(Endpoint endpoint, TopicPartition topicPartition, List bucketList, - Map> bucketDiff) { + private void initBucketList(Endpoint endpoint, SliceExtend sliceExtend,List bucketList, + Map> bucketDiff, KafkaConsumerHandler consumer) { // Use feign client to pull Kafka data List dataList = new LinkedList<>(); - getSliceDataFromTopicPartition(topicPartition, dataList); + TopicPartition topicPartition = new TopicPartition(Objects.equals(Endpoint.SOURCE, endpoint) ? + topic.getSourceTopicName() : topic.getSinkTopicName(), topic.getPtnNum()); + int maxAttempts = 5; // 设置最大尝试次数 + int attempts = 0; + while (attempts < maxAttempts) { + try { + consumer.consumerAssign(topicPartition, sliceExtend, attempts); + consumer.pollTpSliceData(sliceExtend, dataList); + break; // 如果成功,跳出循环 + } catch (CheckConsumerPollEmptyException ex) { + if (++attempts >= maxAttempts) { + checkContext.returnConsumer(consumer); + throw ex; // 如果达到最大尝试次数,重新抛出异常 + } + } + } if (CollectionUtils.isEmpty(dataList)) { return; } @@ -345,13 +361,13 @@ public class TableCheckWorker implements Runnable { bucketList.sort(Comparator.comparingInt(Bucket::getNumber)); } - private void getSliceDataFromTopicPartition(TopicPartition topicPartition, List dataList) { - KafkaConsumerHandler consumer = checkContext.buildKafkaHandler(slice.getTable()); - logKafka.debug("create consumer of topic, [{}] : [{}] ", slice.getTable(), topicPartition.toString()); - consumer.poolTopicPartitionsData(topicPartition.topic(), topicPartition.partition(), dataList); - consumer.closeConsumer(); - logKafka.debug("close consumer of topic, [{}] : [{}] ", slice.getTable(), topicPartition.toString()); - } +// private void getSliceDataFromTopicPartition(TopicPartition topicPartition, List dataList) { +// KafkaConsumerHandler consumer = checkContext.buildKafkaHandler(slice.getTable()); +// logKafka.debug("create consumer of topic, [{}] : [{}] ", slice.getTable(), topicPartition.toString()); +// consumer.poolTopicPartitionsData(topicPartition.topic(), topicPartition.partition(), dataList); +// consumer.closeConsumer(); +// logKafka.debug("close consumer of topic, [{}] : [{}] ", slice.getTable(), topicPartition.toString()); +// } /** *
@@ -374,4 +390,14 @@ public class TableCheckWorker implements Runnable {
             });
         }
     }
+    private void setTableFixedTopic() {
+        int maxTopicSize = ConfigCache.getIntValue(ConfigConstants.MAXIMUM_TOPIC_SIZE);
+        String table = slice.getTable();
+        String sourceTopicName = TopicUtil.getMoreFixedTopicName(processNo, Endpoint.SOURCE, table, maxTopicSize);
+        String sinkTopicName = TopicUtil.getMoreFixedTopicName(processNo, Endpoint.SINK, table, maxTopicSize);
+        topic.setSourceTopicName(sourceTopicName);
+        topic.setSinkTopicName(sinkTopicName);
+        topic.setPtnNum(0);
+        topic.setPartitions(1);
+    }
 }
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/config/ConfigCache.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/config/ConfigCache.java
index 35f191c065e354c4a9b5bc888175654dc8345774..2c6a4818b541ebc879f71c0ab27e74149f5d4e3e 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/config/ConfigCache.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/config/ConfigCache.java
@@ -126,6 +126,14 @@ public class ConfigCache {
         return Objects.isNull(value) ? 0 : value;
     }
 
+    /**
+     * 检查配置项,当前是否删除Topic
+     * @return
+     */
+    public static boolean isDeleteTopic() {
+        return getIntValue(ConfigConstants.AUTO_DELETE_TOPIC) == 1;
+    }
+
     /**
      * get config key when value type is Boolean
      *
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java
index c75e04218916e746ec4e60144ca6b3cb3f4c0f62..f7e18728a53eee6ae25fbcf8c3c40ecdf54a706d 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java
@@ -143,6 +143,11 @@ public interface ConfigConstants {
      */
     String EXTEND_MAXIMUM_POOL_SIZE = "spring.check.extend-maximum-pool-size";
 
+    /**
+     * with table slice check, config the maximum number of threads in the thread pool
+     */
+    String MAXIMUM_POOL_SIZE = "spring.check.maximum-pool-size";
+
     /**
      * data.check.bucket-expect-capacity
      */
@@ -297,4 +302,9 @@ public interface ConfigConstants {
      * spring.check.rest-api-page-size
      */
     String REST_API_PAGE_SIZE = "spring.check.rest-api-page-size";
+
+    /**
+     * data.check.auto-delete-topic
+     */
+    String AUTO_DELETE_TOPIC = "data.check.auto-delete-topic";
 }
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/RowDataHash.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/RowDataHash.java
index 923e65650746ccb2834b754fb2bca2d3c94cc194..e4dc22140d095836fe1ba3b394d47935e5138ac5 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/RowDataHash.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/RowDataHash.java
@@ -39,6 +39,7 @@ public class RowDataHash {
      */
     private String key;
 
+    private String sliceKey;
     /**
      * CSV scene for locating data in CSV files
      */
@@ -67,11 +68,12 @@ public class RowDataHash {
             return false;
         }
         RowDataHash that = (RowDataHash) o;
-        return kHash == that.kHash && vHash == that.vHash && sNo == that.sNo && getKey().equals(that.getKey());
+        return kHash == that.kHash && vHash == that.vHash && sNo == that.sNo && getKey().equals(that.getKey())
+                && getSliceKey().equals(that.getSliceKey());
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(getKey(), kHash, vHash, sNo);
+        return Objects.hash(getKey(), kHash, vHash, sNo, sliceKey);
     }
 }
\ No newline at end of file
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/TableMetadata.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/TableMetadata.java
index 9b10f8da89af8e108867c4a5c0147a748bdf5a67..8842669cafa4d5d082f00e0f6e5ca82fbf62aa7c 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/TableMetadata.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/TableMetadata.java
@@ -15,16 +15,15 @@
 
 package org.opengauss.datachecker.common.entry.extract;
 
-import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.Data;
 import lombok.ToString;
 import lombok.experimental.Accessors;
 import org.opengauss.datachecker.common.entry.enums.DataBaseType;
 import org.opengauss.datachecker.common.entry.enums.Endpoint;
+import org.springframework.util.CollectionUtils;
 
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
 
@@ -75,10 +74,6 @@ public class TableMetadata {
 
     private ConditionLimit conditionLimit;
 
-    public void setColumnsMetas(List columnsMetas) {
-        this.columnsMetas = columnsMetas;
-    }
-
     /**
      * Judge if this table is auto increment, if this true and no conditionLimit configured,
      * you can use id between start and end.
@@ -93,6 +88,15 @@ public class TableMetadata {
                            .isAutoIncrementColumn();
     }
 
+    /**
+     * 当前是否是单一主键表
+     *
+     * @return
+     */
+    public boolean isSinglePrimary() {
+        return !CollectionUtils.isEmpty(primaryMetas) && primaryMetas.size() == 1;
+    }
+
     public ColumnsMetaData getSinglePrimary() {
         if (hasPrimary()) {
             return primaryMetas.get(0);
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/CheckingPollingException.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/CheckConsumerPollEmptyException.java
similarity index 81%
rename from datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/CheckingPollingException.java
rename to datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/CheckConsumerPollEmptyException.java
index 078efb83500d99e09ff9d965f3cc0117569a3146..23cb88b7ee957a85e4a5d29574a57e58e5394a5f 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/CheckingPollingException.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/CheckConsumerPollEmptyException.java
@@ -22,11 +22,11 @@ package org.opengauss.datachecker.common.exception;
  * @date :Created in 2022/5/23
  * @since :11
  */
-public class CheckingPollingException extends CheckingException {
+public class CheckConsumerPollEmptyException extends CheckingException {
     private static final long serialVersionUID = 6526279344405897976L;
 
-    public CheckingPollingException(String message) {
-        super(message);
+    public CheckConsumerPollEmptyException(String message) {
+        super("check consumer poll empty " + message);
     }
 
 }
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ShutdownService.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ShutdownService.java
index 42379572390d49200ae7d9e7a6d6b9fcb85a5e94..7daa050b07a05f341c5be671e85deb2070dadd07 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ShutdownService.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ShutdownService.java
@@ -43,8 +43,6 @@ public class ShutdownService {
         ThreadUtil.sleep(ConfigCache.getIntValue(ConfigConstants.TIMEOUT_PER_SHUTDOWN_PHASE));
         LogUtils.info(log, "The check server wait 5s and will be shutdown , {} . check server exited .", message);
         isShutdown.set(true);
-        ThreadUtil.killThreadByName("kafka-producer-network-thread");
-
         dynamicThreadPoolManager.closeDynamicThreadPoolMonitor();
         while (monitor.get() > 0) {
             ThreadUtil.sleepHalfSecond();
@@ -52,7 +50,6 @@ public class ShutdownService {
         processLogService.saveStopProcessLog();
         threadExecutorList.forEach(ExecutorConfigurationSupport::shutdown);
         executorServiceList.forEach(ExecutorService::shutdownNow);
-        ThreadUtil.sleepHalfSecond();
         System.exit(SpringApplication.exit(SpringUtil.getApplicationContext()));
     }
 
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/ThreadUtil.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/ThreadUtil.java
index 8208bd7da2a19e9d70785ac48e1ab0c09574f78f..8c70d83dfde27e947f743d51ccbd42dacefc17c4 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/ThreadUtil.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/ThreadUtil.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * ThreadUtil
@@ -94,17 +95,33 @@ public class ThreadUtil {
      * @param name thread name
      */
     public static void killThreadByName(String name) {
-        ThreadGroup currentGroup = Thread.currentThread()
-                                         .getThreadGroup();
-        int noThreads = currentGroup.activeCount();
-        Thread[] lstThreads = new Thread[noThreads];
-        currentGroup.enumerate(lstThreads);
-        Arrays.stream(lstThreads)
-              .filter(thread -> StringUtils.startsWith(thread.getName(), name))
-              .forEach(thread -> {
-                  thread.interrupt();
-                  LogUtils.warn(log, "thread [{}] has interrupted", thread.getName());
-              });
+        AtomicInteger threadCount = new AtomicInteger(0);
+        do {
+            ThreadGroup currentGroup = Thread.currentThread().getThreadGroup();
+            int noThreads = currentGroup.activeCount();
+            Thread[] lstThreads = new Thread[noThreads];
+            currentGroup.enumerate(lstThreads);
+            threadCount.set(0);
+            Arrays.stream(lstThreads)
+                    .filter(thread -> {
+                        if (StringUtils.containsIgnoreCase(thread.getName(), name)) {
+                            threadCount.incrementAndGet();
+                            return true;
+                        }
+                        return false;
+                    })
+                    .forEach(thread -> {
+                        if (thread.getState().equals(Thread.State.WAITING)) {
+                            log.warn("thread [{}] :[{} ] has interrupted", thread.getName(), thread.getState());
+                            thread.interrupt();
+                        } else {
+                            threadCount.decrementAndGet();
+                            log.warn("thread [{}] :[{} ]  has stop", thread.getName(), thread.getState());
+                            thread.stop();
+                        }
+                    });
+        } while (threadCount.get() > 0);
+
     }
 
     /**
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java
index 545810fe65201ccfe0683a410da0b108fc20c064..7f9470241d8977c879a6a9e63f6ba3b4332275ab 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java
@@ -41,6 +41,7 @@ import java.time.LocalDateTime;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * AbstractDataAccessService
@@ -106,6 +107,10 @@ public abstract class AbstractDataAccessService implements DataAccessService {
      */
     public Health health(String schema, String sql) {
         try {
+            Connection connection = ConnectionMgr.getConnection();
+            if (Objects.isNull(connection)) {
+                return Health.buildFailed("can not connection current database");
+            }
             String result = adasQuerySchema(sql);
             if (StringUtils.equalsIgnoreCase(result, schema)) {
                 return Health.buildSuccess();
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java
index 3dafb4c3b8a26877efdb0c958f0a217c9bf8e82f..d22f8c10fefb51d23586bef079905153315d8c71 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java
@@ -120,7 +120,7 @@ public class MysqlDataAccessService extends AbstractDataAccessService {
     @Override
     public List queryPointList(Connection connection, DataAccessParam param) {
         String sql = "select s.%s from (SELECT @rowno:=@rowno+1 as rn,r.%s from %s.%s r,"
-            + "  (select @rowno := 0) t ORDER BY r.%s asc) s where mod(s.rn, %s) = 0";
+            + "  (select @rowno := 0) t ORDER BY r.%s asc) s where mod(s.rn, %s) = 1";
         sql = String.format(sql, param.getColName(), param.getColName(), param.getSchema(), param.getName(),
             param.getColName(), param.getOffset());
         return adasQueryPointList(connection, sql);
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java
index f268e96e78d5fb318d20127450662bea9ddfe2ed..2b5f617d8b65ef9a3260062ffa9dee745eeeff39 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java
@@ -138,7 +138,7 @@ public class OpgsDataAccessService extends AbstractDataAccessService {
     @Override
     public List queryPointList(Connection connection, DataAccessParam param) {
         String sql = "select s.%s from ( select row_number() over(order by r.%s  asc) as rn,r.%s  from %s.%s r) s"
-            + "  where mod(s.rn, %s ) = 0;";
+            + "  where mod(s.rn, %s ) = 1;";
         sql = String.format(sql, param.getColName(), param.getColName(), param.getColName(), param.getSchema(),
             param.getName(), param.getOffset());
         return adasQueryPointList(connection, sql);
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java
index 08d65520f8877391fd6ccdd5efbe01c3cf04dc57..d1c18450836f90d99a03103a8ebe9e162a3b74e6 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/ConfigManagement.java
@@ -56,7 +56,8 @@ public class ConfigManagement {
     private int maximumTableSliceSize;
     @Value("${spring.check.extend-maximum-pool-size}")
     private int extendMaxPoolSize = 10;
-
+    @Value("${spring.check.maximum-pool-size}")
+    private int maxPoolSize = 10;
     @Value("${spring.datasource.driver-class-name}")
     private String driverClassName;
     @Value("${spring.datasource.url}")
@@ -126,6 +127,7 @@ public class ConfigManagement {
         ConfigCache.put(ConfigConstants.FETCH_SIZE, 1000);
         ConfigCache.put(ConfigConstants.TIMEOUT_PER_SHUTDOWN_PHASE, timeoutPerShutdownPhase);
         ConfigCache.put(ConfigConstants.EXTEND_MAXIMUM_POOL_SIZE, extendMaxPoolSize);
+        ConfigCache.put(ConfigConstants.MAXIMUM_POOL_SIZE, maxPoolSize);
         ConfigCache.put(ConfigConstants.MAX_RETRY_TIMES, maxRetryTimes);
 
         loadKafkaProperties();
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java
index 37603377cbdbc074d51c751cd5c26ea05220ff54..b8640805585f6c67861a26f50142edeea236ef2b 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java
@@ -31,12 +31,14 @@ import org.opengauss.datachecker.extract.data.BaseDataService;
 import org.opengauss.datachecker.extract.data.csv.CsvListener;
 import org.opengauss.datachecker.extract.slice.factory.SliceFactory;
 
+import java.nio.file.Path;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
 
 import static org.opengauss.datachecker.common.constant.DynamicTpConstant.EXTRACT_EXECUTOR;
 
@@ -116,16 +118,10 @@ public class SliceDispatcher implements Runnable {
                             continue;
                         }
                     }
-                    sliceRegister.batchRegister(tableSliceList);
-                    listener.releaseSliceCache(table);
-                    if (tableSliceList.size() <= 20) {
-                        LogUtils.debug(log, "table [{}] get main executor success", table);
-                        tableSliceList.forEach(sliceVo -> doTableSlice(executor, sliceVo));
+                    if (tableMetadata.isSinglePrimary()) {
+                        doSinglePrimarySliceDispatcher(tableSliceList, tableMetadata, executor, topicSize, extendMaxPoolSize);
                     } else {
-                        ThreadPoolExecutor extendExecutor =
-                            (ThreadPoolExecutor) dynamicThreadPoolManager.getFreeExecutor(topicSize, extendMaxPoolSize);
-                        LogUtils.debug(log, "table [{}] get extend executor success", table);
-                        tableSliceList.forEach(sliceVo -> doTableSlice(extendExecutor, sliceVo));
+                        doMultiPrimarySliceDispatcher(tableSliceList, tableMetadata, executor, topicSize, extendMaxPoolSize);
                     }
                 }
                 if (listener.isFinished()) {
@@ -143,6 +139,38 @@ public class SliceDispatcher implements Runnable {
         }
     }
 
+    private void doMultiPrimarySliceDispatcher(List tableSliceList, TableMetadata tableMetadata, ThreadPoolExecutor executor, int topicSize, int extendMaxPoolSize) {
+        List sliceNameList = tableSliceList.stream().map(SliceVo::getName).map(Path::of).collect(Collectors.toList());
+        SliceVo multiPkSlice = new SliceVo();
+        String table = tableMetadata.getTableName();
+        multiPkSlice.setName(table);
+        multiPkSlice.setTable(table);
+        multiPkSlice.setNo(0);
+        multiPkSlice.setTotal(1);
+        multiPkSlice.setSchema(tableMetadata.getSchema());
+        multiPkSlice.setEndpoint(ConfigCache.getEndPoint());
+        multiPkSlice.setStatus(0);
+        multiPkSlice.setTableHash(tableMetadata.getTableHash());
+        sliceRegister.batchRegister(List.of(multiPkSlice));
+        listener.releaseSliceCache(table);
+        executor.submit(sliceFactory.createTableProcessor(table, sliceNameList));
+    }
+
+    private void doSinglePrimarySliceDispatcher(List tableSliceList, TableMetadata tableMetadata, ThreadPoolExecutor executor, int topicSize, int extendMaxPoolSize) {
+        sliceRegister.batchRegister(tableSliceList);
+        String table = tableMetadata.getTableName();
+        listener.releaseSliceCache(table);
+        if (tableSliceList.size() <= 20) {
+            LogUtils.debug(log, "table [{}] get main executor success", table);
+            tableSliceList.forEach(sliceVo -> doTableSlice(executor, sliceVo));
+        } else {
+            ThreadPoolExecutor extendExecutor =
+                    (ThreadPoolExecutor) dynamicThreadPoolManager.getFreeExecutor(topicSize, extendMaxPoolSize);
+            LogUtils.debug(log, "table [{}] get extend executor success", table);
+            tableSliceList.forEach(sliceVo -> doTableSlice(extendExecutor, sliceVo));
+        }
+    }
+
     private void notifyIgnoreCsvName(Endpoint endPoint, String ignoreCsvTableName, String reason) {
         listener.notifyCheckIgnoreTable(ignoreCsvTableName, reason);
         if (Objects.equals(Endpoint.SOURCE, endPoint)) {
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java
index a667160882fa7eeab1b2887c72fc1eb74ff4a814..d8bedb576bf39e84b71b15b1307ff01cc44e5f35 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java
@@ -34,6 +34,7 @@ import org.opengauss.datachecker.extract.task.sql.SliceQueryStatement;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
 
+import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
 import java.util.Objects;
 
@@ -65,6 +66,15 @@ public class SliceProcessorContext {
         processLogService.saveProcessHistoryLogging(slice.getTable(), slice.getNo());
     }
 
+
+    /**
+     * 销毁kafkaTemplate
+     */
+    @PreDestroy
+    public void destroy() {
+        kafkaTemplate.destroy();
+    }
+
     /**
      * 创建分片kafka代理
      *
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java
index be5579619f21c4ee59f3d1f3e28b6fcbc9c3a17d..da36dd6122be913fa1bc14b3043d46aa06262408 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java
@@ -49,6 +49,7 @@ public class SliceResultSetSender {
     private final List columnMetas;
     private final List primary;
     private final String tableName;
+    private String sliceKey;
 
     /**
      * constructor
@@ -86,6 +87,7 @@ public class SliceResultSetSender {
      */
     public void setRecordSendKey(String key) {
         this.kafkaOperate.setRecordSendKey(key);
+        this.sliceKey = key;
     }
 
     /**
@@ -140,6 +142,7 @@ public class SliceResultSetSender {
         RowDataHash hashData = new RowDataHash();
         hashData.setKey(primaryValue)
                 .setKHash(primaryHash)
+                .setSliceKey(sliceKey)
                 .setVHash(rowHash);
         return hashData;
     }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/AbstractProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/AbstractProcessor.java
index 2f9cc2fe40fd98988e39958809c867a87f3752f3..733fcc62ef93c24944ef8bc183bbc45c77957c1c 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/AbstractProcessor.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/AbstractProcessor.java
@@ -15,12 +15,19 @@
 
 package org.opengauss.datachecker.extract.slice.process;
 
+import org.apache.logging.log4j.Logger;
 import org.opengauss.datachecker.common.config.ConfigCache;
 import org.opengauss.datachecker.common.constant.ConfigConstants;
 import org.opengauss.datachecker.common.entry.extract.SliceExtend;
+import org.opengauss.datachecker.common.util.LogUtils;
 import org.opengauss.datachecker.extract.slice.SliceProcessorContext;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.util.concurrent.ListenableFuture;
 
 import java.math.BigDecimal;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 /**
  * AbstractProcessor
@@ -30,6 +37,16 @@ import java.math.BigDecimal;
  * @since :11
  */
 public abstract class AbstractProcessor implements SliceProcessor {
+    /**
+     * JDBC fetch size
+     */
+    protected static final int FETCH_SIZE = 10000;
+
+    /**
+     * log
+     */
+    private static final Logger log = LogUtils.getLogger(AbstractProcessor.class);
+
     protected SliceProcessorContext context;
     protected int objectSizeExpansionFactor;
 
@@ -66,4 +83,86 @@ public abstract class AbstractProcessor implements SliceProcessor {
     protected void feedbackStatus(SliceExtend sliceExtend) {
         context.feedbackStatus(sliceExtend);
     }
+
+    /**
+     * Analyze the sending result SendResult of the sharding record 
+ * and obtain the offset range written to the topic in the current set, offset (min, max) + * + * @param batchFutures batchFutures + * @return offset (min, max) + */ + protected long[] getBatchFutureRecordOffsetScope(List>> batchFutures) { + Iterator>> futureIterator = batchFutures.iterator(); + ListenableFuture> candidate = futureIterator.next(); + long minOffset = getFutureOffset(candidate); + long maxOffset = minOffset; + + while (futureIterator.hasNext()) { + long next = getFutureOffset(futureIterator.next()); + if (next < minOffset) { + minOffset = next; + } + if (next > maxOffset) { + maxOffset = next; + } + } + return new long[]{minOffset, maxOffset}; + } + + private long getFutureOffset(ListenableFuture> next) { + try { + SendResult sendResult = next.get(); + return sendResult.getRecordMetadata() + .offset(); + } catch (InterruptedException | ExecutionException e) { + LogUtils.warn(log, "get record offset InterruptedException or ExecutionException"); + } + return 0; + } + + /** + * min offset + * + * @param offsetList offsetList + * @return min + */ + protected long getMinOffset(List offsetList) { + return offsetList.stream() + .mapToLong(a -> a[0]) + .min() + .orElse(0L); + } + + /** + * 获取最大偏移量集合中的最大值 + * + * @param maxOffsetList maxOffsetList + * @return 最大值 + */ + protected long getMaxMaxOffset(List maxOffsetList) { + return maxOffsetList.stream().max(Long::compareTo).get(); + } + + /** + * 获取最小偏移量集合中的最小值 + * + * @param minOffsetList minOffsetList + * @return 最小值 + */ + protected long getMinMinOffset(List minOffsetList) { + return minOffsetList.stream().min(Long::compareTo).get(); + } + + /** + * max offset + * + * @param offsetList offsetList + * @return max + */ + protected long getMaxOffset(List offsetList) { + return offsetList.stream() + .mapToLong(a -> a[1]) + .max() + .orElse(0L); + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/AbstractSliceProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/AbstractSliceProcessor.java index 37816767c5389be22cf9aa4ecb0cbb66c99ecce0..f3978bf8e44b64c3a190347d02b2a402fc7c2e9d 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/AbstractSliceProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/AbstractSliceProcessor.java @@ -41,8 +41,7 @@ import java.util.concurrent.ExecutionException; * @since :11 */ public abstract class AbstractSliceProcessor extends AbstractProcessor { - protected static final int FETCH_SIZE = 10000; - protected static final Logger log = LogUtils.getLogger(AbstractSliceProcessor.class); + private static final Logger log = LogUtils.getLogger(AbstractSliceProcessor.class); protected SliceVo slice; protected final String topic; @@ -89,66 +88,4 @@ public abstract class AbstractSliceProcessor extends AbstractProcessor { return Duration.between(start, end) .toMillis(); } - - /** - * min offset - * - * @param offsetList offsetList - * @return min - */ - protected long getMinOffset(List offsetList) { - return offsetList.stream() - .mapToLong(a -> a[0]) - .min() - .orElse(0L); - } - - /** - * max offset - * - * @param offsetList offsetList - * @return max - */ - protected long getMaxOffset(List offsetList) { - return offsetList.stream() - .mapToLong(a -> a[1]) - .max() - .orElse(0L); - } - - /** - * Analyze the sending result SendResult of the sharding record
- * and obtain the offset range written to the topic in the current set, offset (min, max) - * - * @param batchFutures batchFutures - * @return offset (min, max) - */ - protected long[] getBatchFutureRecordOffsetScope(List>> batchFutures) { - Iterator>> futureIterator = batchFutures.iterator(); - ListenableFuture> candidate = futureIterator.next(); - long minOffset = getFutureOffset(candidate); - long maxOffset = minOffset; - - while (futureIterator.hasNext()) { - long next = getFutureOffset(futureIterator.next()); - if (next < minOffset) { - minOffset = next; - } - if (next > maxOffset) { - maxOffset = next; - } - } - return new long[] {minOffset, maxOffset}; - } - - private long getFutureOffset(ListenableFuture> next) { - try { - SendResult sendResult = next.get(); - return sendResult.getRecordMetadata() - .offset(); - } catch (InterruptedException | ExecutionException e) { - LogUtils.warn(log,"get record offset InterruptedException or ExecutionException"); - } - return 0; - } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvSliceProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvSliceProcessor.java index c6951ad35d2583a5574f2206523ecf29b42ad94b..f6192031cc1573702ab8e8936a323410ed27b139 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvSliceProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvSliceProcessor.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.extract.slice.process; import com.opencsv.CSVReader; import com.opencsv.exceptions.CsvValidationException; +import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.entry.extract.SliceExtend; import org.opengauss.datachecker.common.entry.extract.SliceVo; @@ -48,6 +49,7 @@ import java.util.TreeMap; * @since :11 */ public class CsvSliceProcessor extends AbstractSliceProcessor { + private static final Logger log = LogUtils.getLogger(CsvSliceProcessor.class); protected MemoryOperations memoryOperations; public CsvSliceProcessor(SliceVo slice, SliceProcessorContext context) { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvTableProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvTableProcessor.java index 4e2aeda0940b8225cda68c7017dad77e56b7470d..81bc463958eced2d8991cc62cee0df9a5c71707b 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvTableProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvTableProcessor.java @@ -17,24 +17,25 @@ package org.opengauss.datachecker.extract.slice.process; import com.opencsv.CSVReader; import org.apache.commons.collections4.CollectionUtils; +import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.entry.extract.SliceExtend; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.exception.ExtractDataAccessException; +import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.extract.resource.MemoryOperations; import org.opengauss.datachecker.extract.slice.SliceProcessorContext; import org.opengauss.datachecker.extract.slice.common.SliceKafkaAgents; import org.opengauss.datachecker.extract.slice.common.SliceResultSetSender; +import org.springframework.kafka.support.SendResult; +import org.springframework.util.concurrent.ListenableFuture; import java.io.FileReader; import java.io.IOException; import java.nio.file.Path; import java.time.Duration; import java.time.LocalDateTime; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.TreeMap; +import java.util.*; /** * CsvTableProcessor @@ -44,6 +45,7 @@ import java.util.TreeMap; * @since :11 */ public class CsvTableProcessor extends AbstractTableProcessor { + private static final Logger log = LogUtils.getLogger(CsvTableProcessor.class); private static final int DEFAULT_CSV_FILE_ROW_FETCH_SIZE = 1000; protected MemoryOperations memoryOperations; @@ -64,7 +66,7 @@ public class CsvTableProcessor extends AbstractTableProcessor { Objects.isNull(tableFilePaths) ? 0 : tableFilePaths.size()); TableMetadata tableMetadata = context.getTableMetaData(table); if (CollectionUtils.isNotEmpty(tableFilePaths)) { - tableRowCount = executeQueryStatement(tableMetadata, tableFilePaths); + tableRowCount = executeQueryStatement(tableMetadata, tableFilePaths, sliceExtend); } else { log.info("table [{}] is empty ", table); } @@ -86,15 +88,18 @@ public class CsvTableProcessor extends AbstractTableProcessor { return tableSliceExtend; } - private long executeQueryStatement(TableMetadata tableMetadata, List tablePaths) throws IOException { + private long executeQueryStatement(TableMetadata tableMetadata, List tablePaths, SliceExtend tableSliceExtend) throws IOException { final LocalDateTime start = LocalDateTime.now(); long tableRowCount = 0; SliceKafkaAgents kafkaAgents = context.createSliceFixedKafkaAgents(topic, table); SliceResultSetSender sliceSender = new SliceResultSetSender(tableMetadata, kafkaAgents); + sliceSender.setRecordSendKey(table); try { String csvDataRootPath = ConfigCache.getCsvData(); int tableFileCount = tablePaths.size(); long fetchSize = DEFAULT_CSV_FILE_ROW_FETCH_SIZE; + List minOffsetList = new LinkedList<>(); + List maxOffsetList = new LinkedList<>(); for (int i = 1; i <= tableFileCount; i++) { Path slicePath = tablePaths.get(i - 1); log.info("start [{}-{}] - {} ", tableFileCount, i, slicePath); @@ -102,6 +107,8 @@ public class CsvTableProcessor extends AbstractTableProcessor { fetchSize = Math.max(fetchSize, tableRowCount / i); long estimatedSize = estimatedMemorySize(tableMetadata.getAvgRowLength(), fetchSize); memoryOperations.takeMemory(estimatedSize); + List offsetList = new LinkedList<>(); + List>> batchFutures = new LinkedList<>(); try (CSVReader reader = new CSVReader(new FileReader(sliceFilePath.toString()))) { String[] nextLine; int rowCount = 0; @@ -109,8 +116,18 @@ public class CsvTableProcessor extends AbstractTableProcessor { try { while ((nextLine = reader.readNext()) != null) { rowCount++; - sliceSender.csvTranslateAndSendSync(nextLine, result, rowCount,i); + batchFutures.add(sliceSender.csvTranslateAndSendSync(nextLine, result, rowCount, i)); + if (batchFutures.size() == FETCH_SIZE) { + offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); + batchFutures.clear(); + } } + if (batchFutures.size() > 0) { + offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); + batchFutures.clear(); + } + minOffsetList.add(getMinOffset(offsetList)); + maxOffsetList.add(getMaxOffset(offsetList)); } catch (Exception ex) { log.error("csvTranslateAndSend error: ", ex); } @@ -121,6 +138,9 @@ public class CsvTableProcessor extends AbstractTableProcessor { } memoryOperations.release(); } + tableSliceExtend.setStartOffset(getMinMinOffset(minOffsetList)); + tableSliceExtend.setEndOffset(getMaxMaxOffset(maxOffsetList)); + tableSliceExtend.setCount(tableRowCount); } catch (Exception ex) { log.error("jdbc query {} error : {}", table, ex.getMessage()); throw new ExtractDataAccessException(); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java index 01bf4b1d6b639c60e37804256196da63f96acd38..b90ad5d4d51f7eec8d3cc42db65ee4862893f221 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java @@ -16,6 +16,7 @@ package org.opengauss.datachecker.extract.slice.process; import com.alibaba.druid.pool.DruidDataSource; +import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.entry.extract.SliceExtend; import org.opengauss.datachecker.common.entry.extract.SliceVo; import org.opengauss.datachecker.common.entry.extract.TableMetadata; @@ -51,6 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger; * @since :11 */ public class JdbcSliceProcessor extends AbstractSliceProcessor { + private static final Logger log = LogUtils.getLogger(JdbcSliceProcessor.class); private final JdbcDataOperations jdbcOperation; private final AtomicInteger rowCount = new AtomicInteger(0); private final DruidDataSource dataSource; diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java index 64259adeb7f6a66508fc3c63184df48140756775..fa8e9c80a07803b2b398deff0b9784f1dfa78c2a 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java @@ -15,14 +15,18 @@ package org.opengauss.datachecker.extract.slice.process; +import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.entry.extract.SliceExtend; import org.opengauss.datachecker.common.exception.ExtractDataAccessException; +import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.extract.resource.JdbcDataOperations; import org.opengauss.datachecker.extract.slice.SliceProcessorContext; import org.opengauss.datachecker.extract.slice.common.SliceResultSetSender; import org.opengauss.datachecker.extract.task.sql.AutoSliceQueryStatement; import org.opengauss.datachecker.extract.task.sql.FullQueryStatement; import org.opengauss.datachecker.extract.task.sql.QuerySqlEntry; +import org.springframework.kafka.support.SendResult; +import org.springframework.util.concurrent.ListenableFuture; import java.sql.Connection; import java.sql.PreparedStatement; @@ -31,6 +35,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.time.Duration; import java.time.LocalDateTime; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -43,6 +48,7 @@ import java.util.TreeMap; * @since :11 */ public class JdbcTableProcessor extends AbstractTableProcessor { + private static final Logger log = LogUtils.getLogger(JdbcTableProcessor.class); private final JdbcDataOperations jdbcOperation; private SliceResultSetSender sliceSender; @@ -63,11 +69,12 @@ public class JdbcTableProcessor extends AbstractTableProcessor { SliceExtend tableSliceExtend = createTableSliceExtend(); try { sliceSender = new SliceResultSetSender(tableMetadata, context.createSliceFixedKafkaAgents(topic, table)); + sliceSender.setRecordSendKey(table); long tableRowCount; if (noTableSlice()) { - tableRowCount = executeFullTable(); + tableRowCount = executeFullTable(tableSliceExtend); } else { - tableRowCount = executeMultiSliceTable(); + tableRowCount = executeMultiSliceTable(tableSliceExtend); } tableSliceExtend.setCount(tableRowCount); feedbackStatus(tableSliceExtend); @@ -81,7 +88,7 @@ public class JdbcTableProcessor extends AbstractTableProcessor { } } - private long executeMultiSliceTable() { + private long executeMultiSliceTable(SliceExtend tableSliceExtend) { final LocalDateTime start = LocalDateTime.now(); Connection connection = null; List querySqlList = getAutoSliceQuerySqlList(); @@ -90,8 +97,12 @@ public class JdbcTableProcessor extends AbstractTableProcessor { try { long estimatedSize = estimatedMemorySize(tableMetadata.getAvgRowLength(), fetchSize); connection = jdbcOperation.tryConnectionAndClosedAutoCommit(estimatedSize); + List minOffsetList = new LinkedList<>(); + List maxOffsetList = new LinkedList<>(); for (int i = 0; i < querySqlList.size(); i++) { QuerySqlEntry sqlEntry = querySqlList.get(i); + List offsetList = new LinkedList<>(); + List>> batchFutures = new LinkedList<>(); log.info(" {} , {}", table, sqlEntry.toString()); try (PreparedStatement ps = connection.prepareStatement(sqlEntry.getSql()); ResultSet resultSet = ps.executeQuery()) { @@ -101,13 +112,26 @@ public class JdbcTableProcessor extends AbstractTableProcessor { int rowCount = 0; while (resultSet.next()) { rowCount++; - sliceSender.resultSetTranslateAndSendSync(rsmd, resultSet, result, i); + batchFutures.add(sliceSender.resultSetTranslateAndSendSync(rsmd, resultSet, result, i)); + if (batchFutures.size() == FETCH_SIZE) { + offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); + batchFutures.clear(); + } } + if (batchFutures.size() > 0) { + offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); + batchFutures.clear(); + } + minOffsetList.add(getMinOffset(offsetList)); + maxOffsetList.add(getMaxOffset(offsetList)); sliceSender.resultFlush(); tableRowCount += rowCount; log.info("finish {} - {} - {}, {}", table, i, rowCount, tableRowCount); } } + tableSliceExtend.setStartOffset(getMinMinOffset(minOffsetList)); + tableSliceExtend.setEndOffset(getMaxMaxOffset(maxOffsetList)); + tableSliceExtend.setCount(tableRowCount); } catch (SQLException ex) { log.error("jdbc query {} error : {}", table, ex.getMessage()); throw new ExtractDataAccessException(); @@ -120,7 +144,7 @@ public class JdbcTableProcessor extends AbstractTableProcessor { return tableRowCount; } - private long executeFullTable() { + private long executeFullTable(SliceExtend tableSliceExtend) { final LocalDateTime start = LocalDateTime.now(); Connection connection = null; long tableRowCount = 0; @@ -130,18 +154,29 @@ public class JdbcTableProcessor extends AbstractTableProcessor { connection = jdbcOperation.tryConnectionAndClosedAutoCommit(estimatedSize); QuerySqlEntry sqlEntry = getFullQuerySqlEntry(); log.info(" {} , {}", table, sqlEntry.toString()); + List offsetList = new LinkedList<>(); + List>> batchFutures = new LinkedList<>(); try (PreparedStatement ps = connection.prepareStatement(sqlEntry.getSql()); ResultSet resultSet = ps.executeQuery()) { resultSet.setFetchSize(fetchSize); ResultSetMetaData rsmd = resultSet.getMetaData(); Map result = new TreeMap<>(); - int rowCount = 0; while (resultSet.next()) { - rowCount++; - sliceSender.resultSetTranslateAndSendSync(rsmd, resultSet, result, 0); + tableRowCount++; + batchFutures.add(sliceSender.resultSetTranslateAndSendSync(rsmd, resultSet, result, 0)); + if (batchFutures.size() == FETCH_SIZE) { + offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); + batchFutures.clear(); + } + } + if (batchFutures.size() > 0) { + offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); + batchFutures.clear(); } - tableRowCount += rowCount; - log.info("finish {} , {}: {}", table, rowCount, tableRowCount); + tableSliceExtend.setStartOffset(getMinOffset(offsetList)); + tableSliceExtend.setEndOffset(getMaxOffset(offsetList)); + tableSliceExtend.setCount(tableRowCount); + log.info("finish {} , {}", table, tableRowCount); } } catch (SQLException ex) { log.error("jdbc query {} error : {}", table, ex.getMessage()); @@ -161,6 +196,7 @@ public class JdbcTableProcessor extends AbstractTableProcessor { } private List getAutoSliceQuerySqlList() { + // 单主键根据主键进行SQL分片,联合主键根据第一主键值进行SQL分片 AutoSliceQueryStatement statement = context.createAutoSliceQueryStatement(tableMetadata); return statement.builderByTaskOffset(tableMetadata, getMaximumTableSliceSize()); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java index 104370fcf34d45ea32a752f9992054ab8ca6bb74..7f83ae0a79ce8fa6ec88ff911c02860e127f959d 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java @@ -58,6 +58,7 @@ public class CheckPoint { * @param dataAccessService dataAccessService */ public CheckPoint(DataAccessService dataAccessService) { + this.dataSource = (DruidDataSource) dataAccessService.getDataSource(); this.dataAccessService = dataAccessService; } @@ -93,11 +94,9 @@ public class CheckPoint { .setName(SqlUtil.escape(tableName, dataBaseType)) .setColName(SqlUtil.escape(pkName, dataBaseType)); Connection connection = Objects.nonNull(dataSource) ? getConnection() : ConnectionMgr.getConnection(); - String minCheckPoint = dataAccessService.min(connection, param); param.setOffset(slice); Object maxPoint = dataAccessService.max(connection, param); List checkPointList = dataAccessService.queryPointList(connection, param); - checkPointList.add(minCheckPoint); checkPointList.add(maxPoint); checkPointList = checkPointList.stream() .distinct() diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/SimpleTypeHandlerFactory.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/SimpleTypeHandlerFactory.java index 3bec0220e72664baf911656f738425602ac0e6ae..c18503b2aa621a9dd4079e28874a1402cab8a6e7 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/SimpleTypeHandlerFactory.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/SimpleTypeHandlerFactory.java @@ -35,6 +35,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.TimeZone; +import java.util.concurrent.ConcurrentHashMap; /** * 类型处理器工厂 @@ -50,7 +51,7 @@ public class SimpleTypeHandlerFactory { private static final DateTimeFormatter YEAR = DateTimeFormatter.ofPattern("yyyy"); private static final DateTimeFormatter TIME = DateTimeFormatter.ofPattern("HH:mm:ss"); private static final DateTimeFormatterMap TIMESTAMP_MAPPER = new DateTimeFormatterMap(); - private static final Map FLOAT_FORMAT_CACHE = new HashMap<>(); + private static final Map FLOAT_FORMAT_CACHE = new ConcurrentHashMap<>(); private static final int O_NUMERIC_SCALE_F84 = -84; private static final int O_NUMERIC_SCALE_0 = 0; diff --git a/datachecker-extract/src/main/resources/mapper/MysqlMetaDataMapper.xml b/datachecker-extract/src/main/resources/mapper/MysqlMetaDataMapper.xml index f26f3e54f610a234e8fde858b4e9212e1faeb4c5..a22026bfb1da99061911cf1077aae675a7b76460 100644 --- a/datachecker-extract/src/main/resources/mapper/MysqlMetaDataMapper.xml +++ b/datachecker-extract/src/main/resources/mapper/MysqlMetaDataMapper.xml @@ -48,20 +48,20 @@ @@ -88,7 +88,7 @@ from ${param.schema}.${param.name} r, (select @rowno := 0) t ORDER BY r.${param.colName} asc ) s - where mod(s.rn, #{param.offset}) = 0 + where mod(s.rn, #{param.offset}) = 1 select s.${param.colName} from ( select row_number() over(order by r.${param.colName} asc) as rn,r.${param.colName} from ${param.schema}.${param.name} r ) s - where mod(s.rn, #{param.offset}) = 0 + where mod(s.rn, #{param.offset}) = 1 select * from ( select rownum rn ,${param.colName} from ${param.schema}.${param.name} order by ${param.colName} asc - ) where mod(rn,#{param.offset})=0 + ) where mod(rn,#{param.offset})=1