From db62822faa2f7da16b4a6f4d653e113c28f9d77c Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Mon, 11 Nov 2024 16:12:14 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=A1=A8=E5=94=AF=E4=B8=80?= =?UTF-8?q?=E6=80=A7=E7=B4=A2=E5=BC=95=E5=9C=BA=E6=99=AF=E8=A1=A8=E6=A0=A1?= =?UTF-8?q?=E9=AA=8C=E3=80=82=20=E4=BF=AE=E5=A4=8Dconsumer=E6=8B=89?= =?UTF-8?q?=E5=8F=96=E5=BC=82=E5=B8=B8=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../modules/check/KafkaConsumerHandler.java | 27 ++-- .../check/slice/SliceCheckContext.java | 15 +- .../check/slice/SliceCheckWorker.java | 135 ++++++++---------- .../data/access/MysqlDataAccessService.java | 11 +- .../data/access/OracleDataAccessService.java | 10 +- 5 files changed, 98 insertions(+), 100 deletions(-) diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java index e142040..3a83c2a 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java @@ -16,6 +16,7 @@ package org.opengauss.datachecker.check.modules.check; import com.alibaba.fastjson.JSON; + import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -26,6 +27,7 @@ import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.SliceExtend; import org.opengauss.datachecker.common.exception.CheckConsumerPollEmptyException; import org.opengauss.datachecker.common.util.LogUtils; +import org.opengauss.datachecker.common.util.ThreadUtil; import java.time.Duration; import java.util.*; @@ -44,10 +46,11 @@ public class KafkaConsumerHandler { private static final int MAX_CONSUMER_POLL_TIMES = 50; private KafkaConsumer kafkaConsumer; + /** * Constructor * - * @param consumer consumer + * @param consumer consumer * @param retryTimes retryTimes */ public KafkaConsumerHandler(KafkaConsumer consumer, int retryTimes) { @@ -66,6 +69,7 @@ public class KafkaConsumerHandler { /** * 获取kafka consumer * + * @return consumer */ public KafkaConsumer getConsumer() { return kafkaConsumer; @@ -74,7 +78,7 @@ public class KafkaConsumerHandler { /** * Query the Kafka partition data corresponding to the specified table * - * @param topic Kafka topic + * @param topic Kafka topic * @param partitions Kafka partitions * @return kafka partitions data */ @@ -96,8 +100,8 @@ public class KafkaConsumerHandler { * consumer poll data from the topic partition, and filter bu slice extend. then add data in the data list. * * @param topicPartition topic partition - * @param sExtend slice extend - * @param attempts + * @param sExtend slice extend + * @param attempts attempts */ public void consumerAssign(TopicPartition topicPartition, SliceExtend sExtend, int attempts) { kafkaConsumer.assign(List.of(topicPartition)); @@ -109,20 +113,21 @@ public class KafkaConsumerHandler { /** * consumer poll data from the topic partition, and filter bu slice extend. then add data in the data list. * - * @param sExtend slice extend + * @param sExtend slice extend * @param dataList data list */ public synchronized void pollTpSliceData(SliceExtend sExtend, List dataList) { AtomicLong currentCount = new AtomicLong(0); int pollEmptyCount = 0; while (currentCount.get() < sExtend.getCount()) { - ConsumerRecords records = - kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); + ConsumerRecords records = kafkaConsumer.poll( + Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); if (records.count() <= 0) { pollEmptyCount++; if (pollEmptyCount > MAX_CONSUMER_POLL_TIMES) { throw new CheckConsumerPollEmptyException(sExtend.getName()); } + ThreadUtil.sleep(KAFKA_CONSUMER_POLL_DURATION); continue; } pollEmptyCount = 0; @@ -139,8 +144,8 @@ public class KafkaConsumerHandler { /** * Query the Kafka partition data corresponding to the specified table * - * @param topic Kafka topic - * @param partitions Kafka partitions + * @param topic Kafka topic + * @param partitions Kafka partitions * @param shouldChangeConsumerGroup if true change consumer Group random * @return kafka partitions data */ @@ -188,8 +193,8 @@ public class KafkaConsumerHandler { } private void getTopicRecords(List dataList, KafkaConsumer kafkaConsumer) { - ConsumerRecords consumerRecords = - kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); + ConsumerRecords consumerRecords = kafkaConsumer.poll( + Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); consumerRecords.forEach(record -> { dataList.add(JSON.parseObject(record.value(), RowDataHash.class)); }); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java index 5b23f55..9f0bc4f 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java @@ -81,10 +81,19 @@ public class SliceCheckContext { kafkaConsumerService.getRetryFetchRecordTimes()); } + /** + * get consumer retry fetch record times + * + * @return duration times + */ + public int getRetryFetchRecordTimes() { + return kafkaConsumerService.getRetryFetchRecordTimes(); + } + /** * get source or sink table topic * - * @param table table + * @param table table * @param endpoint source or sink * @return topic name */ @@ -97,7 +106,7 @@ public class SliceCheckContext { /** * refresh slice check progress * - * @param slice slice + * @param slice slice * @param rowCount slice of row count */ public void refreshSliceCheckProgress(SliceVo slice, long rowCount) { @@ -107,7 +116,7 @@ public class SliceCheckContext { /** * add slice check Result * - * @param slice slice + * @param slice slice * @param result check result */ public void addCheckResult(SliceVo slice, CheckDiffResult result) { diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java index 29c6bd0..26b42af 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.check.slice; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; + import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.kafka.common.TopicPartition; @@ -47,6 +48,7 @@ import org.opengauss.datachecker.common.exception.BucketNumberInconsistentExcept import org.opengauss.datachecker.common.exception.CheckConsumerPollEmptyException; import org.opengauss.datachecker.common.exception.MerkleTreeDepthException; import org.opengauss.datachecker.common.util.LogUtils; +import org.opengauss.datachecker.common.util.ThreadUtil; import org.opengauss.datachecker.common.util.TopicUtil; import org.springframework.lang.NonNull; @@ -72,8 +74,6 @@ import java.util.concurrent.CountDownLatch; public class SliceCheckWorker implements Runnable { private static final Logger LOGGER = LogUtils.getLogger(SliceCheckWorker.class); private static final int THRESHOLD_MIN_BUCKET_SIZE = 2; - // 设置最大尝试次数 - private static final int MAX_ATTEMPTS=5; private final SliceVo slice; @@ -81,17 +81,19 @@ public class SliceCheckWorker implements Runnable { private final SliceCheckEvent checkEvent; private final SliceCheckContext checkContext; private final TaskRegisterCenter registerCenter; - private final DifferencePair, List, List> difference = - DifferencePair.of(new LinkedList<>(), new LinkedList<>(), new LinkedList<>()); + private final DifferencePair, List, List> difference = DifferencePair.of( + new LinkedList<>(), new LinkedList<>(), new LinkedList<>()); private final LocalDateTime startTime; private long sliceRowCount; + // 设置最大尝试次数 + private int maxAttemptsTimes; private Topic topic = new Topic(); /** * slice check worker construct * - * @param checkEvent check event + * @param checkEvent check event * @param sliceCheckContext slice check context */ public SliceCheckWorker(SliceCheckEvent checkEvent, SliceCheckContext sliceCheckContext, @@ -102,6 +104,7 @@ public class SliceCheckWorker implements Runnable { this.slice = checkEvent.getSlice(); this.registerCenter = registerCenter; this.processNo = ConfigCache.getValue(ConfigConstants.PROCESS_NO); + this.maxAttemptsTimes = sliceCheckContext.getRetryFetchRecordTimes(); } @Override @@ -168,22 +171,17 @@ public class SliceCheckWorker implements Runnable { LogUtils.debug(LOGGER, "slice {} fetch empty", slice.getName()); } else { // sourceSize is less than thresholdMinBucketSize, that is, there is only one bucket. Compare - DifferencePair, List, List> subDifference = - compareBucketCommon(sourceTuple.getBuckets() - .get(0), sinkTuple.getBuckets() - .get(0)); - difference.getDiffering() - .addAll(subDifference.getDiffering()); - difference.getOnlyOnLeft() - .addAll(subDifference.getOnlyOnLeft()); - difference.getOnlyOnRight() - .addAll(subDifference.getOnlyOnRight()); + DifferencePair, List, List> subDifference = null; + subDifference = compareBucketCommon(sourceTuple.getBuckets().get(0), sinkTuple.getBuckets().get(0)); + difference.getDiffering().addAll(subDifference.getDiffering()); + difference.getOnlyOnLeft().addAll(subDifference.getOnlyOnLeft()); + difference.getOnlyOnRight().addAll(subDifference.getOnlyOnRight()); } } else { throw new BucketNumberInconsistentException(String.format( - "table[%s] slice[%s] build the bucket number is inconsistent, source-bucket-count=[%s] sink-bucket-count=[%s]" - + " Please synchronize data again! ", slice.getTable(), slice.getNo(), sourceTuple.getBucketSize(), - sinkTuple.getBucketSize())); + "table[%s] slice[%s] build the bucket number is inconsistent, source-bucket-count=[%s] " + + "sink-bucket-count=[%s] Please synchronize data again! ", slice.getTable(), slice.getNo(), + sourceTuple.getBucketSize(), sinkTuple.getBucketSize())); } } @@ -193,24 +191,23 @@ public class SliceCheckWorker implements Runnable { private void checkResult(String resultMsg) { CheckDiffResultBuilder builder = CheckDiffResultBuilder.builder(); - builder.process(ConfigCache.getValue(ConfigConstants.PROCESS_NO)) - .table(slice.getTable()) - .sno(slice.getNo()) - .error(resultMsg) - .topic(getConcatTableTopics()) - .schema(slice.getSchema()) - .fileName(slice.getName()) - .conditionLimit(getConditionLimit()) - .partitions(slice.getPtnNum()) - .isTableStructureEquals(true) - .startTime(startTime) - .endTime(LocalDateTime.now()) - .isExistTableMiss(false, null) - .rowCount((int) sliceRowCount) - .errorRate(20) - .checkMode(ConfigCache.getValue(ConfigConstants.CHECK_MODE, CheckMode.class)) - .keyDiff(difference.getOnlyOnLeft(), difference.getDiffering(), difference.getOnlyOnRight()); + .table(slice.getTable()) + .sno(slice.getNo()) + .error(resultMsg) + .topic(getConcatTableTopics()) + .schema(slice.getSchema()) + .fileName(slice.getName()) + .conditionLimit(getConditionLimit()) + .partitions(slice.getPtnNum()) + .isTableStructureEquals(true) + .startTime(startTime) + .endTime(LocalDateTime.now()) + .isExistTableMiss(false, null) + .rowCount((int) sliceRowCount) + .errorRate(20) + .checkMode(ConfigCache.getValue(ConfigConstants.CHECK_MODE, CheckMode.class)) + .keyDiff(difference.getOnlyOnLeft(), difference.getDiffering(), difference.getOnlyOnRight()); CheckDiffResult result = builder.build(); LogUtils.debug(LOGGER, "result {}", result); checkContext.addCheckResult(slice, result); @@ -233,18 +230,13 @@ public class SliceCheckWorker implements Runnable { return; } diffNodeList.forEach(diffNode -> { - Bucket sourceBucket = diffNode.getSource() - .getBucket(); - Bucket sinkBucket = diffNode.getSink() - .getBucket(); - DifferencePair, List, List> subDifference = - compareBucketCommon(sourceBucket, sinkBucket); - difference.getDiffering() - .addAll(subDifference.getDiffering()); - difference.getOnlyOnLeft() - .addAll(subDifference.getOnlyOnLeft()); - difference.getOnlyOnRight() - .addAll(subDifference.getOnlyOnRight()); + Bucket sourceBucket = diffNode.getSource().getBucket(); + Bucket sinkBucket = diffNode.getSink().getBucket(); + DifferencePair, List, List> subDifference = compareBucketCommon( + sourceBucket, sinkBucket); + difference.getDiffering().addAll(subDifference.getDiffering()); + difference.getOnlyOnLeft().addAll(subDifference.getOnlyOnLeft()); + difference.getOnlyOnRight().addAll(subDifference.getOnlyOnRight()); }); diffNodeList.clear(); } @@ -257,13 +249,10 @@ public class SliceCheckWorker implements Runnable { List entriesOnlyOnLeft = collectorDeleteOrInsert(bucketDifference.entriesOnlyOnLeft()); List entriesOnlyOnRight = collectorDeleteOrInsert(bucketDifference.entriesOnlyOnRight()); List differing = collectorUpdate(bucketDifference.entriesDiffering()); - - LogUtils.debug(LOGGER, "diff slice {} insert {}", slice.getName(), bucketDifference.entriesOnlyOnLeft() - .size()); - LogUtils.debug(LOGGER, "diff slice {} delete {}", slice.getName(), bucketDifference.entriesOnlyOnRight() - .size()); - LogUtils.debug(LOGGER, "diff slice {} update {}", slice.getName(), bucketDifference.entriesDiffering() - .size()); + LogUtils.debug(LOGGER, "diff slice {} insert {}", slice.getName(), bucketDifference.entriesOnlyOnLeft().size()); + LogUtils.debug(LOGGER, "diff slice {} delete {}", slice.getName(), + bucketDifference.entriesOnlyOnRight().size()); + LogUtils.debug(LOGGER, "diff slice {} update {}", slice.getName(), bucketDifference.entriesDiffering().size()); return DifferencePair.of(entriesOnlyOnLeft, entriesOnlyOnRight, differing); } @@ -313,12 +302,11 @@ public class SliceCheckWorker implements Runnable { // Initialize source bucket column list data long startFetch = System.currentTimeMillis(); CountDownLatch countDownLatch = new CountDownLatch(checkTupleList.size()); - int avgSliceCount = (int) (sourceTuple.getSlice() - .getCount() + sinkTuple.getSlice() - .getCount()) / 2; + int avgSliceCount = (int) (sourceTuple.getSlice().getCount() + sinkTuple.getSlice().getCount()) / 2; KafkaConsumerHandler consumer = checkContext.createKafkaHandler(); checkTupleList.forEach(check -> { - initBucketList(check.getEndpoint(), check.getSlice(), check.getBuckets(), bucketDiff, avgSliceCount, consumer); + initBucketList(check.getEndpoint(), check.getSlice(), check.getBuckets(), bucketDiff, avgSliceCount, + consumer); countDownLatch.countDown(); }); countDownLatch.await(); @@ -336,30 +324,33 @@ public class SliceCheckWorker implements Runnable { } private void initBucketList(Endpoint endpoint, SliceExtend sliceExtend, List bucketList, - Map> bucketDiff, int avgSliceCount, KafkaConsumerHandler consumer) { + Map> bucketDiff, int avgSliceCount, KafkaConsumerHandler consumer) { // Use feign client to pull Kafka data List dataList = new LinkedList<>(); - TopicPartition topicPartition = new TopicPartition(Objects.equals(Endpoint.SOURCE, endpoint) ? - topic.getSourceTopicName() : topic.getSinkTopicName(), topic.getPtnNum()); + TopicPartition topicPartition = new TopicPartition( + Objects.equals(Endpoint.SOURCE, endpoint) ? topic.getSourceTopicName() : topic.getSinkTopicName(), + topic.getPtnNum()); int attempts = 0; - while (attempts < MAX_ATTEMPTS) { + while (attempts < maxAttemptsTimes) { try { consumer.consumerAssign(topicPartition, sliceExtend, attempts); consumer.pollTpSliceData(sliceExtend, dataList); break; // 如果成功,跳出循环 } catch (CheckConsumerPollEmptyException ex) { - if (++attempts >= MAX_ATTEMPTS) { + if (++attempts >= maxAttemptsTimes) { checkContext.returnConsumer(consumer); throw ex; // 如果达到最大尝试次数,重新抛出异常 } + ThreadUtil.sleepOneSecond(); + LogUtils.warn(LOGGER, "poll slice data {} {} , retry ({})", sliceExtend.getName(), sliceExtend.getNo(), + attempts); } } if (CollectionUtils.isEmpty(dataList)) { return; } - BuilderBucketHandler bucketBuilder = - new BuilderBucketHandler(ConfigCache.getIntValue(ConfigConstants.BUCKET_CAPACITY)); - + BuilderBucketHandler bucketBuilder = new BuilderBucketHandler( + ConfigCache.getIntValue(ConfigConstants.BUCKET_CAPACITY)); Map bucketMap = new ConcurrentHashMap<>(InitialCapacity.CAPACITY_128); // Use the pulled data to build the bucket list bucketBuilder.builder(dataList, avgSliceCount, bucketMap); @@ -394,12 +385,6 @@ public class SliceCheckWorker implements Runnable { bucketList.sort(Comparator.comparingInt(Bucket::getNumber)); } - private void getSliceDataFromTopicPartition(KafkaConsumerHandler consumer, SliceExtend sExtend, - List dataList) throws CheckConsumerPollEmptyException { - - - } - /** *
      * Align the bucket list data according to the statistical results of source
@@ -411,12 +396,10 @@ public class SliceCheckWorker implements Runnable {
         if (MapUtils.isNotEmpty(bucketDiff)) {
             bucketDiff.forEach((number, pair) -> {
                 if (pair.getSource() == -1) {
-                    sourceTuple.getBuckets()
-                               .add(BuilderBucketHandler.builderEmpty(number));
+                    sourceTuple.getBuckets().add(BuilderBucketHandler.builderEmpty(number));
                 }
                 if (pair.getSink() == -1) {
-                    sinkTuple.getBuckets()
-                             .add(BuilderBucketHandler.builderEmpty(number));
+                    sinkTuple.getBuckets().add(BuilderBucketHandler.builderEmpty(number));
                 }
             });
         }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java
index d388461..afee709 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java
@@ -91,12 +91,11 @@ public class MysqlDataAccessService extends AbstractDataAccessService {
     @Override
     public List queryTableUniqueColumns(String tableName) {
         String schema = properties.getSchema();
-        String sql = "select kcu.table_name tableName, kcu.column_name columnName,kcu.ordinal_position colIdx,"
-            + " kcu.constraint_name indexIdentifier from  information_schema.table_constraints tc "
-            + " left join information_schema.KEY_COLUMN_USAGE kcu on tc.table_schema =kcu.table_schema"
-            + " and tc.constraint_name=kcu.constraint_name and tc.table_name = kcu.table_name"
-            + " where tc.table_schema='" + schema + "' and tc.table_name='" + tableName + "'"
-            + " and tc.constraint_type='UNIQUE' ;";
+        String sql = "select s.table_schema,s.table_name tableName,s.column_name columnName,c.ordinal_position colIdx,"
+            + " s.index_name indexIdentifier from information_schema.statistics s "
+            + " left join information_schema.columns c on s.table_schema=c.table_schema  "
+            + " and s.table_schema=c.table_schema and s.table_name=c.table_name and s.column_name=c.column_name "
+            + " where s.table_schema='" + schema + "' and s.table_name='" + tableName + "'" + " and s.non_unique=0;";
         List uniqueColumns = adasQueryTableUniqueColumns(sql);
         return translateUniqueToPrimaryColumns(uniqueColumns);
     }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java
index e29bafb..bfaef45 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java
@@ -85,10 +85,12 @@ public class OracleDataAccessService extends AbstractDataAccessService {
     @Override
     public List queryTableUniqueColumns(String tableName) {
         String schema = properties.getSchema();
-        String sql = "SELECT uc.table_name tableName,uc.constraint_name indexIdentifier,ucc.column_name columnName,"
-            + " uc.constraint_type,ucc.position colIdx FROM USER_CONSTRAINTS uc "
-            + " JOIN USER_CONS_COLUMNS ucc ON uc.constraint_name=ucc.constraint_name "
-            + " WHERE uc.constraint_type='U' and uc.owner='" + schema + "'and uc.table_name='" + tableName + "'";
+        String sql = " SELECT ui.index_name indexIdentifier,ui.table_owner,ui.table_name tableName,"
+            + " utc.column_name columnName, utc.column_id colIdx"
+            + " from user_indexes ui left join user_ind_columns uic on ui.index_name=uic.index_name "
+            + " and ui.table_name=uic.table_name  "
+            + " left join user_tab_columns utc on ui.table_name =utc.table_name and uic.column_name=utc.column_name"
+            + " where ui.uniqueness='UNIQUE' and ui.table_owner='" + schema + "' and ui.table_name='" + tableName + "'";
         List uniqueColumns = adasQueryTableUniqueColumns(sql);
         return translateUniqueToPrimaryColumns(uniqueColumns);
     }
-- 
Gitee