diff --git a/config/log4j2.xml b/config/log4j2.xml index e22d91228eec78ec756ea8f05191de699330d67b..f8cd66388ef617794a2a4af181f5aaa3dad08009 100644 --- a/config/log4j2.xml +++ b/config/log4j2.xml @@ -30,19 +30,13 @@ - + - - - - - - + diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java index e1420403447b430cc42704e2ff14c992bdd76ed4..3a83c2af8fcb58985ab04609f5b1f8d62a05f997 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java @@ -16,6 +16,7 @@ package org.opengauss.datachecker.check.modules.check; import com.alibaba.fastjson.JSON; + import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -26,6 +27,7 @@ import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.SliceExtend; import org.opengauss.datachecker.common.exception.CheckConsumerPollEmptyException; import org.opengauss.datachecker.common.util.LogUtils; +import org.opengauss.datachecker.common.util.ThreadUtil; import java.time.Duration; import java.util.*; @@ -44,10 +46,11 @@ public class KafkaConsumerHandler { private static final int MAX_CONSUMER_POLL_TIMES = 50; private KafkaConsumer kafkaConsumer; + /** * Constructor * - * @param consumer consumer + * @param consumer consumer * @param retryTimes retryTimes */ public KafkaConsumerHandler(KafkaConsumer consumer, int retryTimes) { @@ -66,6 +69,7 @@ public class KafkaConsumerHandler { /** * 获取kafka consumer * + * @return consumer */ public KafkaConsumer getConsumer() { return kafkaConsumer; @@ -74,7 +78,7 @@ public class KafkaConsumerHandler { /** * Query the Kafka partition data corresponding to the specified table * - * @param topic Kafka topic + * @param topic Kafka topic * @param partitions Kafka partitions * @return kafka partitions data */ @@ -96,8 +100,8 @@ public class KafkaConsumerHandler { * consumer poll data from the topic partition, and filter bu slice extend. then add data in the data list. * * @param topicPartition topic partition - * @param sExtend slice extend - * @param attempts + * @param sExtend slice extend + * @param attempts attempts */ public void consumerAssign(TopicPartition topicPartition, SliceExtend sExtend, int attempts) { kafkaConsumer.assign(List.of(topicPartition)); @@ -109,20 +113,21 @@ public class KafkaConsumerHandler { /** * consumer poll data from the topic partition, and filter bu slice extend. then add data in the data list. * - * @param sExtend slice extend + * @param sExtend slice extend * @param dataList data list */ public synchronized void pollTpSliceData(SliceExtend sExtend, List dataList) { AtomicLong currentCount = new AtomicLong(0); int pollEmptyCount = 0; while (currentCount.get() < sExtend.getCount()) { - ConsumerRecords records = - kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); + ConsumerRecords records = kafkaConsumer.poll( + Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); if (records.count() <= 0) { pollEmptyCount++; if (pollEmptyCount > MAX_CONSUMER_POLL_TIMES) { throw new CheckConsumerPollEmptyException(sExtend.getName()); } + ThreadUtil.sleep(KAFKA_CONSUMER_POLL_DURATION); continue; } pollEmptyCount = 0; @@ -139,8 +144,8 @@ public class KafkaConsumerHandler { /** * Query the Kafka partition data corresponding to the specified table * - * @param topic Kafka topic - * @param partitions Kafka partitions + * @param topic Kafka topic + * @param partitions Kafka partitions * @param shouldChangeConsumerGroup if true change consumer Group random * @return kafka partitions data */ @@ -188,8 +193,8 @@ public class KafkaConsumerHandler { } private void getTopicRecords(List dataList, KafkaConsumer kafkaConsumer) { - ConsumerRecords consumerRecords = - kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); + ConsumerRecords consumerRecords = kafkaConsumer.poll( + Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); consumerRecords.forEach(record -> { dataList.add(JSON.parseObject(record.value(), RowDataHash.class)); }); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java index 5b23f5571297df1fed07e0436a6a7ae65de74626..9f0bc4ffba682abcd1219eef8cf64ddedd26c0ec 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java @@ -81,10 +81,19 @@ public class SliceCheckContext { kafkaConsumerService.getRetryFetchRecordTimes()); } + /** + * get consumer retry fetch record times + * + * @return duration times + */ + public int getRetryFetchRecordTimes() { + return kafkaConsumerService.getRetryFetchRecordTimes(); + } + /** * get source or sink table topic * - * @param table table + * @param table table * @param endpoint source or sink * @return topic name */ @@ -97,7 +106,7 @@ public class SliceCheckContext { /** * refresh slice check progress * - * @param slice slice + * @param slice slice * @param rowCount slice of row count */ public void refreshSliceCheckProgress(SliceVo slice, long rowCount) { @@ -107,7 +116,7 @@ public class SliceCheckContext { /** * add slice check Result * - * @param slice slice + * @param slice slice * @param result check result */ public void addCheckResult(SliceVo slice, CheckDiffResult result) { diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java index 0e7771b786740b219d1ef773c9c5ba954e7561c8..36d7218a03e68c0a71140cde2551e74adeefaf16 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java @@ -93,9 +93,7 @@ public class SliceCheckEventHandler { */ public void handleFailed(SliceCheckEvent checkEvent) { LogUtils.warn(log, "slice check event , table slice has unknown error [{}][{} : {}]", checkEvent.getCheckName(), - checkEvent.getSource() - .getTableHash(), checkEvent.getSink() - .getTableHash()); + checkEvent.getSource(), checkEvent.getSink()); long count = getCheckSliceCount(checkEvent); sliceCheckContext.refreshSliceCheckProgress(checkEvent.getSlice(), count); CheckDiffResult result = buildSliceDiffResult(checkEvent.getSlice(), (int) count, true, "slice has unknown error"); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java index 29c6bd0bc931616bb8b374430dbd788eef62aaa6..26b42af02527f2a60a46d198ddf7e91fc8f77003 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.check.slice; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; + import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.kafka.common.TopicPartition; @@ -47,6 +48,7 @@ import org.opengauss.datachecker.common.exception.BucketNumberInconsistentExcept import org.opengauss.datachecker.common.exception.CheckConsumerPollEmptyException; import org.opengauss.datachecker.common.exception.MerkleTreeDepthException; import org.opengauss.datachecker.common.util.LogUtils; +import org.opengauss.datachecker.common.util.ThreadUtil; import org.opengauss.datachecker.common.util.TopicUtil; import org.springframework.lang.NonNull; @@ -72,8 +74,6 @@ import java.util.concurrent.CountDownLatch; public class SliceCheckWorker implements Runnable { private static final Logger LOGGER = LogUtils.getLogger(SliceCheckWorker.class); private static final int THRESHOLD_MIN_BUCKET_SIZE = 2; - // 设置最大尝试次数 - private static final int MAX_ATTEMPTS=5; private final SliceVo slice; @@ -81,17 +81,19 @@ public class SliceCheckWorker implements Runnable { private final SliceCheckEvent checkEvent; private final SliceCheckContext checkContext; private final TaskRegisterCenter registerCenter; - private final DifferencePair, List, List> difference = - DifferencePair.of(new LinkedList<>(), new LinkedList<>(), new LinkedList<>()); + private final DifferencePair, List, List> difference = DifferencePair.of( + new LinkedList<>(), new LinkedList<>(), new LinkedList<>()); private final LocalDateTime startTime; private long sliceRowCount; + // 设置最大尝试次数 + private int maxAttemptsTimes; private Topic topic = new Topic(); /** * slice check worker construct * - * @param checkEvent check event + * @param checkEvent check event * @param sliceCheckContext slice check context */ public SliceCheckWorker(SliceCheckEvent checkEvent, SliceCheckContext sliceCheckContext, @@ -102,6 +104,7 @@ public class SliceCheckWorker implements Runnable { this.slice = checkEvent.getSlice(); this.registerCenter = registerCenter; this.processNo = ConfigCache.getValue(ConfigConstants.PROCESS_NO); + this.maxAttemptsTimes = sliceCheckContext.getRetryFetchRecordTimes(); } @Override @@ -168,22 +171,17 @@ public class SliceCheckWorker implements Runnable { LogUtils.debug(LOGGER, "slice {} fetch empty", slice.getName()); } else { // sourceSize is less than thresholdMinBucketSize, that is, there is only one bucket. Compare - DifferencePair, List, List> subDifference = - compareBucketCommon(sourceTuple.getBuckets() - .get(0), sinkTuple.getBuckets() - .get(0)); - difference.getDiffering() - .addAll(subDifference.getDiffering()); - difference.getOnlyOnLeft() - .addAll(subDifference.getOnlyOnLeft()); - difference.getOnlyOnRight() - .addAll(subDifference.getOnlyOnRight()); + DifferencePair, List, List> subDifference = null; + subDifference = compareBucketCommon(sourceTuple.getBuckets().get(0), sinkTuple.getBuckets().get(0)); + difference.getDiffering().addAll(subDifference.getDiffering()); + difference.getOnlyOnLeft().addAll(subDifference.getOnlyOnLeft()); + difference.getOnlyOnRight().addAll(subDifference.getOnlyOnRight()); } } else { throw new BucketNumberInconsistentException(String.format( - "table[%s] slice[%s] build the bucket number is inconsistent, source-bucket-count=[%s] sink-bucket-count=[%s]" - + " Please synchronize data again! ", slice.getTable(), slice.getNo(), sourceTuple.getBucketSize(), - sinkTuple.getBucketSize())); + "table[%s] slice[%s] build the bucket number is inconsistent, source-bucket-count=[%s] " + + "sink-bucket-count=[%s] Please synchronize data again! ", slice.getTable(), slice.getNo(), + sourceTuple.getBucketSize(), sinkTuple.getBucketSize())); } } @@ -193,24 +191,23 @@ public class SliceCheckWorker implements Runnable { private void checkResult(String resultMsg) { CheckDiffResultBuilder builder = CheckDiffResultBuilder.builder(); - builder.process(ConfigCache.getValue(ConfigConstants.PROCESS_NO)) - .table(slice.getTable()) - .sno(slice.getNo()) - .error(resultMsg) - .topic(getConcatTableTopics()) - .schema(slice.getSchema()) - .fileName(slice.getName()) - .conditionLimit(getConditionLimit()) - .partitions(slice.getPtnNum()) - .isTableStructureEquals(true) - .startTime(startTime) - .endTime(LocalDateTime.now()) - .isExistTableMiss(false, null) - .rowCount((int) sliceRowCount) - .errorRate(20) - .checkMode(ConfigCache.getValue(ConfigConstants.CHECK_MODE, CheckMode.class)) - .keyDiff(difference.getOnlyOnLeft(), difference.getDiffering(), difference.getOnlyOnRight()); + .table(slice.getTable()) + .sno(slice.getNo()) + .error(resultMsg) + .topic(getConcatTableTopics()) + .schema(slice.getSchema()) + .fileName(slice.getName()) + .conditionLimit(getConditionLimit()) + .partitions(slice.getPtnNum()) + .isTableStructureEquals(true) + .startTime(startTime) + .endTime(LocalDateTime.now()) + .isExistTableMiss(false, null) + .rowCount((int) sliceRowCount) + .errorRate(20) + .checkMode(ConfigCache.getValue(ConfigConstants.CHECK_MODE, CheckMode.class)) + .keyDiff(difference.getOnlyOnLeft(), difference.getDiffering(), difference.getOnlyOnRight()); CheckDiffResult result = builder.build(); LogUtils.debug(LOGGER, "result {}", result); checkContext.addCheckResult(slice, result); @@ -233,18 +230,13 @@ public class SliceCheckWorker implements Runnable { return; } diffNodeList.forEach(diffNode -> { - Bucket sourceBucket = diffNode.getSource() - .getBucket(); - Bucket sinkBucket = diffNode.getSink() - .getBucket(); - DifferencePair, List, List> subDifference = - compareBucketCommon(sourceBucket, sinkBucket); - difference.getDiffering() - .addAll(subDifference.getDiffering()); - difference.getOnlyOnLeft() - .addAll(subDifference.getOnlyOnLeft()); - difference.getOnlyOnRight() - .addAll(subDifference.getOnlyOnRight()); + Bucket sourceBucket = diffNode.getSource().getBucket(); + Bucket sinkBucket = diffNode.getSink().getBucket(); + DifferencePair, List, List> subDifference = compareBucketCommon( + sourceBucket, sinkBucket); + difference.getDiffering().addAll(subDifference.getDiffering()); + difference.getOnlyOnLeft().addAll(subDifference.getOnlyOnLeft()); + difference.getOnlyOnRight().addAll(subDifference.getOnlyOnRight()); }); diffNodeList.clear(); } @@ -257,13 +249,10 @@ public class SliceCheckWorker implements Runnable { List entriesOnlyOnLeft = collectorDeleteOrInsert(bucketDifference.entriesOnlyOnLeft()); List entriesOnlyOnRight = collectorDeleteOrInsert(bucketDifference.entriesOnlyOnRight()); List differing = collectorUpdate(bucketDifference.entriesDiffering()); - - LogUtils.debug(LOGGER, "diff slice {} insert {}", slice.getName(), bucketDifference.entriesOnlyOnLeft() - .size()); - LogUtils.debug(LOGGER, "diff slice {} delete {}", slice.getName(), bucketDifference.entriesOnlyOnRight() - .size()); - LogUtils.debug(LOGGER, "diff slice {} update {}", slice.getName(), bucketDifference.entriesDiffering() - .size()); + LogUtils.debug(LOGGER, "diff slice {} insert {}", slice.getName(), bucketDifference.entriesOnlyOnLeft().size()); + LogUtils.debug(LOGGER, "diff slice {} delete {}", slice.getName(), + bucketDifference.entriesOnlyOnRight().size()); + LogUtils.debug(LOGGER, "diff slice {} update {}", slice.getName(), bucketDifference.entriesDiffering().size()); return DifferencePair.of(entriesOnlyOnLeft, entriesOnlyOnRight, differing); } @@ -313,12 +302,11 @@ public class SliceCheckWorker implements Runnable { // Initialize source bucket column list data long startFetch = System.currentTimeMillis(); CountDownLatch countDownLatch = new CountDownLatch(checkTupleList.size()); - int avgSliceCount = (int) (sourceTuple.getSlice() - .getCount() + sinkTuple.getSlice() - .getCount()) / 2; + int avgSliceCount = (int) (sourceTuple.getSlice().getCount() + sinkTuple.getSlice().getCount()) / 2; KafkaConsumerHandler consumer = checkContext.createKafkaHandler(); checkTupleList.forEach(check -> { - initBucketList(check.getEndpoint(), check.getSlice(), check.getBuckets(), bucketDiff, avgSliceCount, consumer); + initBucketList(check.getEndpoint(), check.getSlice(), check.getBuckets(), bucketDiff, avgSliceCount, + consumer); countDownLatch.countDown(); }); countDownLatch.await(); @@ -336,30 +324,33 @@ public class SliceCheckWorker implements Runnable { } private void initBucketList(Endpoint endpoint, SliceExtend sliceExtend, List bucketList, - Map> bucketDiff, int avgSliceCount, KafkaConsumerHandler consumer) { + Map> bucketDiff, int avgSliceCount, KafkaConsumerHandler consumer) { // Use feign client to pull Kafka data List dataList = new LinkedList<>(); - TopicPartition topicPartition = new TopicPartition(Objects.equals(Endpoint.SOURCE, endpoint) ? - topic.getSourceTopicName() : topic.getSinkTopicName(), topic.getPtnNum()); + TopicPartition topicPartition = new TopicPartition( + Objects.equals(Endpoint.SOURCE, endpoint) ? topic.getSourceTopicName() : topic.getSinkTopicName(), + topic.getPtnNum()); int attempts = 0; - while (attempts < MAX_ATTEMPTS) { + while (attempts < maxAttemptsTimes) { try { consumer.consumerAssign(topicPartition, sliceExtend, attempts); consumer.pollTpSliceData(sliceExtend, dataList); break; // 如果成功,跳出循环 } catch (CheckConsumerPollEmptyException ex) { - if (++attempts >= MAX_ATTEMPTS) { + if (++attempts >= maxAttemptsTimes) { checkContext.returnConsumer(consumer); throw ex; // 如果达到最大尝试次数,重新抛出异常 } + ThreadUtil.sleepOneSecond(); + LogUtils.warn(LOGGER, "poll slice data {} {} , retry ({})", sliceExtend.getName(), sliceExtend.getNo(), + attempts); } } if (CollectionUtils.isEmpty(dataList)) { return; } - BuilderBucketHandler bucketBuilder = - new BuilderBucketHandler(ConfigCache.getIntValue(ConfigConstants.BUCKET_CAPACITY)); - + BuilderBucketHandler bucketBuilder = new BuilderBucketHandler( + ConfigCache.getIntValue(ConfigConstants.BUCKET_CAPACITY)); Map bucketMap = new ConcurrentHashMap<>(InitialCapacity.CAPACITY_128); // Use the pulled data to build the bucket list bucketBuilder.builder(dataList, avgSliceCount, bucketMap); @@ -394,12 +385,6 @@ public class SliceCheckWorker implements Runnable { bucketList.sort(Comparator.comparingInt(Bucket::getNumber)); } - private void getSliceDataFromTopicPartition(KafkaConsumerHandler consumer, SliceExtend sExtend, - List dataList) throws CheckConsumerPollEmptyException { - - - } - /** *
      * Align the bucket list data according to the statistical results of source
@@ -411,12 +396,10 @@ public class SliceCheckWorker implements Runnable {
         if (MapUtils.isNotEmpty(bucketDiff)) {
             bucketDiff.forEach((number, pair) -> {
                 if (pair.getSource() == -1) {
-                    sourceTuple.getBuckets()
-                               .add(BuilderBucketHandler.builderEmpty(number));
+                    sourceTuple.getBuckets().add(BuilderBucketHandler.builderEmpty(number));
                 }
                 if (pair.getSink() == -1) {
-                    sinkTuple.getBuckets()
-                             .add(BuilderBucketHandler.builderEmpty(number));
+                    sinkTuple.getBuckets().add(BuilderBucketHandler.builderEmpty(number));
                 }
             });
         }
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/PrimaryColumnBean.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/PrimaryColumnBean.java
index 651e26e27bfb2e4a8acdd6299951b18fb4caba05..5f85c51609f5d9fbe231a06536e874629c601be2 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/PrimaryColumnBean.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/PrimaryColumnBean.java
@@ -15,7 +15,9 @@
 
 package org.opengauss.datachecker.common.entry.extract;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 /**
  * PrimaryColumnBean
@@ -25,6 +27,8 @@ import lombok.Data;
  * @since :11
  */
 @Data
+@NoArgsConstructor
+@AllArgsConstructor
 public class PrimaryColumnBean {
     /**
      * Table
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/UniqueColumnBean.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/UniqueColumnBean.java
new file mode 100644
index 0000000000000000000000000000000000000000..158f14723fd3246532768c66c9ed295c7e3f7ee8
--- /dev/null
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/UniqueColumnBean.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
+ *
+ * openGauss is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ *
+ *           http://license.coscl.org.cn/MulanPSL2
+ *
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ */
+
+package org.opengauss.datachecker.common.entry.extract;
+
+import lombok.Data;
+
+/**
+ * UniqueColumnBean
+ *
+ * @author :wangchao
+ * @date :Created in 2023/12/23
+ * @since :11
+ */
+@Data
+public class UniqueColumnBean {
+    /**
+     * Table
+     */
+    private String tableName;
+
+    /**
+     * Primary key column name
+     */
+    private String columnName;
+
+    /**
+     * Index identifier
+     */
+    private String indexIdentifier;
+
+    /**
+     * Column index
+     */
+    private Integer colIdx;
+}
\ No newline at end of file
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java
index 45d3c7ceb391166618bc96037975e5446eaed18e..54e94514887f51aa8dd5b9189cbf0303b5d50f74 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java
@@ -16,6 +16,7 @@
 package org.opengauss.datachecker.extract.data;
 
 import com.alibaba.druid.pool.DruidDataSource;
+
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.logging.log4j.Logger;
@@ -33,6 +34,7 @@ import org.opengauss.datachecker.extract.service.RuleAdapterService;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
+
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -103,15 +105,13 @@ public class BaseDataService {
      */
     public List bdsQueryTableMetadataList() {
         List metadataList = dataAccessService.dasQueryTableMetadataList();
-        return metadataList.stream()
-                           .filter(meta -> {
-                               boolean isChecking = ruleAdapterService.filterTableByRule(meta.getTableName());
-                               if (isChecking) {
-                                   tableNameList.add(meta.getTableName());
-                               }
-                               return isChecking;
-                           })
-                           .collect(Collectors.toList());
+        return metadataList.stream().filter(meta -> {
+            boolean isChecking = ruleAdapterService.filterTableByRule(meta.getTableName());
+            if (isChecking) {
+                tableNameList.add(meta.getTableName());
+            }
+            return isChecking;
+        }).collect(Collectors.toList());
     }
 
     /**
@@ -124,8 +124,7 @@ public class BaseDataService {
         if (CollectionUtils.isEmpty(columnBeanList)) {
             return new HashMap<>();
         }
-        return columnBeanList.stream()
-                             .collect(Collectors.groupingBy(PrimaryColumnBean::getTableName));
+        return columnBeanList.stream().collect(Collectors.groupingBy(PrimaryColumnBean::getTableName));
     }
 
     private List filterByTableRules(List tableNameList) {
@@ -189,28 +188,31 @@ public class BaseDataService {
     /**
      * update table metadata, and filter column rules
      *
-     * @param tableMetadata      table metadata
+     * @param tableMetadata table metadata
      * @param primaryColumnBeans primary column
      */
     public void updateTableColumnMetaData(TableMetadata tableMetadata, List primaryColumnBeans) {
         String tableName = tableMetadata.getTableName();
         final List columns = dataAccessService.queryTableColumnsMetaData(tableName);
-        if (Objects.isNull(columns)) {
+        if (CollectionUtils.isEmpty(columns)) {
             LogUtils.error(log, "table columns metadata is null ,{}", tableName);
             return;
         }
-        if (Objects.isNull(primaryColumnBeans)) {
-            primaryColumnBeans = dataAccessService.queryTablePrimaryColumns(tableName);
+        List tempPrimaryColumnBeans = primaryColumnBeans;
+        if (CollectionUtils.isEmpty(primaryColumnBeans)) {
+            tempPrimaryColumnBeans = dataAccessService.queryTablePrimaryColumns(tableName);
         }
-        if (Objects.nonNull(primaryColumnBeans)) {
-            List primaryColumnNameList = getPrimaryColumnNames(primaryColumnBeans);
+        if (CollectionUtils.isEmpty(tempPrimaryColumnBeans)) {
+            tempPrimaryColumnBeans = dataAccessService.queryTableUniqueColumns(tableName);
+        }
+        if (CollectionUtils.isNotEmpty(tempPrimaryColumnBeans)) {
+            List primaryColumnNameList = getPrimaryColumnNames(tempPrimaryColumnBeans);
             for (ColumnsMetaData column : columns) {
                 if (primaryColumnNameList.contains(column.getLowerCaseColumnName())) {
                     column.setColumnKey(ColumnKey.PRI);
                 }
             }
         }
-
         tableMetadata.setColumnsMetas(ruleAdapterService.executeColumnRule(columns));
         tableMetadata.setPrimaryMetas(getTablePrimaryColumn(columns));
         tableMetadata.setTableHash(calcTableHash(columns));
@@ -218,16 +220,17 @@ public class BaseDataService {
 
     private List getPrimaryColumnNames(List primaryColumnBeans) {
         return primaryColumnBeans.stream()
-                                 .map(PrimaryColumnBean::getColumnName)
-                                 .map(String::toLowerCase)
-                                 .collect(Collectors.toList());
+            .map(PrimaryColumnBean::getColumnName)
+            .map(String::toLowerCase)
+            .distinct()
+            .collect(Collectors.toList());
     }
 
     private List getTablePrimaryColumn(List columnsMetaData) {
         return columnsMetaData.stream()
-                              .filter(meta -> ColumnKey.PRI.equals(meta.getColumnKey()))
-                              .sorted(Comparator.comparing(ColumnsMetaData::getOrdinalPosition))
-                              .collect(Collectors.toList());
+            .filter(meta -> ColumnKey.PRI.equals(meta.getColumnKey()))
+            .sorted(Comparator.comparing(ColumnsMetaData::getOrdinalPosition))
+            .collect(Collectors.toList());
     }
 
     /**
@@ -255,9 +258,8 @@ public class BaseDataService {
     private long calcTableHash(List columnsMetas) {
         StringBuilder buffer = new StringBuilder();
         columnsMetas.sort(Comparator.comparing(ColumnsMetaData::getOrdinalPosition));
-        columnsMetas.forEach(column -> buffer.append(column.getColumnName()
-                                                           .toLowerCase(Locale.ENGLISH))
-                                             .append(column.getOrdinalPosition()));
+        columnsMetas.forEach(column -> buffer.append(column.getColumnName().toLowerCase(Locale.ENGLISH))
+            .append(column.getOrdinalPosition()));
         return HASH_UTIL.hashBytes(buffer.toString());
     }
 
@@ -289,9 +291,8 @@ public class BaseDataService {
         } else {
             String[] sqlModeArray = sqlMode.split(",");
             String newSqlMode = Arrays.stream(sqlModeArray)
-                                      .filter(mode -> !mode.equalsIgnoreCase(
-                                          ConfigConstants.SQL_MODE_NAME_PAD_CHAR_TO_FULL_LENGTH))
-                                      .collect(Collectors.joining(","));
+                .filter(mode -> !mode.equalsIgnoreCase(ConfigConstants.SQL_MODE_NAME_PAD_CHAR_TO_FULL_LENGTH))
+                .collect(Collectors.joining(","));
             boolean isPadCharFull = ConfigCache.getBooleanValue(ConfigConstants.SQL_MODE_PAD_CHAR_TO_FULL_LENGTH);
             if (isPadCharFull) {
                 newSqlMode += ConfigConstants.SQL_MODE_NAME_PAD_CHAR_TO_FULL_LENGTH;
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java
index a83d49811f0befc52ae145cf1aef556d7b4c4c9f..93b19b87bd7e11fc9da9aab42a3a3be1e4a55f37 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java
@@ -16,6 +16,9 @@
 package org.opengauss.datachecker.extract.data.access;
 
 import com.alibaba.druid.pool.DruidDataSource;
+
+import cn.hutool.core.collection.CollUtil;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.Logger;
 import org.opengauss.datachecker.common.config.ConfigCache;
@@ -25,6 +28,7 @@ import org.opengauss.datachecker.common.entry.common.Health;
 import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames;
 import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean;
 import org.opengauss.datachecker.common.entry.extract.TableMetadata;
+import org.opengauss.datachecker.common.entry.extract.UniqueColumnBean;
 import org.opengauss.datachecker.common.exception.ExtractDataAccessException;
 import org.opengauss.datachecker.common.util.DurationUtils;
 import org.opengauss.datachecker.common.util.LogUtils;
@@ -36,16 +40,19 @@ import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
 
 import javax.annotation.Resource;
 import javax.sql.DataSource;
+
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.time.Duration;
 import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * AbstractDataAccessService
@@ -113,7 +120,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
     public String adasQuerySchema(Connection connection, String executeQueryStatement) {
         String schema = "";
         try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement);
-             ResultSet resultSet = ps.executeQuery()) {
+            ResultSet resultSet = ps.executeQuery()) {
             if (resultSet.next()) {
                 schema = resultSet.getString(RS_COL_SCHEMA);
             }
@@ -129,7 +136,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
      * 数据库schema是否合法
      *
      * @param schema schema
-     * @param sql    sql
+     * @param sql sql
      * @return result
      */
     public Health health(String schema, String sql) {
@@ -160,7 +167,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
         Connection connection = getConnection();
         List list = new LinkedList<>();
         try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement);
-             ResultSet resultSet = ps.executeQuery()) {
+            ResultSet resultSet = ps.executeQuery()) {
             while (resultSet.next()) {
                 list.add(resultSet.getString(RS_COL_TABLE_NAME));
             }
@@ -185,7 +192,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
         Connection connection = getConnection();
         List list = new LinkedList<>();
         try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement);
-             ResultSet resultSet = ps.executeQuery()) {
+            ResultSet resultSet = ps.executeQuery()) {
             PrimaryColumnBean metadata;
             while (resultSet.next()) {
                 metadata = new PrimaryColumnBean();
@@ -203,6 +210,50 @@ public abstract class AbstractDataAccessService implements DataAccessService {
         return list;
     }
 
+    /**
+     * adas查询表的唯一性约束列信息
+     *
+     * @param executeQueryStatement executeQueryStatement
+     * @return List
+     */
+    public List adasQueryTableUniqueColumns(String executeQueryStatement) {
+        Connection connection = getConnection();
+        List list = new LinkedList<>();
+        try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement);
+            ResultSet resultSet = ps.executeQuery()) {
+            UniqueColumnBean metadata;
+            while (resultSet.next()) {
+                metadata = new UniqueColumnBean();
+                metadata.setTableName(resultSet.getString("tableName"));
+                metadata.setColumnName(resultSet.getString("columnName"));
+                metadata.setIndexIdentifier(resultSet.getString("indexIdentifier"));
+                metadata.setColIdx(resultSet.getInt("colIdx"));
+                list.add(metadata);
+            }
+        } catch (SQLException esql) {
+            LogUtils.error(log, "adasQueryTablePrimaryColumns error:", esql);
+        } finally {
+            closeConnection(connection);
+        }
+        return list;
+    }
+
+    /**
+     * 将UniqueColumnBean列表转换为PrimaryColumnBean列表
+     *
+     * @param uniqueColumns 输入的UniqueColumnBean列表,可能为空
+     * @return PrimaryColumnBean列表,永远不会为null,其中的元素是唯一的
+     */
+    public List translateUniqueToPrimaryColumns(List uniqueColumns) {
+        if (CollUtil.isEmpty(uniqueColumns)) {
+            return new ArrayList<>();
+        }
+        return uniqueColumns.stream()
+            .map(u -> new PrimaryColumnBean(u.getTableName(), u.getColumnName()))
+            .distinct()
+            .collect(Collectors.toList());
+    }
+
     /**
      * adasQueryTableMetadataList
      *
@@ -214,7 +265,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
         Connection connection = getConnection();
         List list = new LinkedList<>();
         try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement);
-             ResultSet resultSet = ps.executeQuery()) {
+            ResultSet resultSet = ps.executeQuery()) {
             TableMetadata metadata;
             while (resultSet.next()) {
                 metadata = new TableMetadata();
@@ -238,7 +289,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
      * 查询表数据抽样检查点清单
      *
      * @param connection connection
-     * @param sql        检查点查询SQL
+     * @param sql 检查点查询SQL
      * @return 检查点列表
      */
     protected List adasQueryPointList(Connection connection, String sql) {
@@ -259,7 +310,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
      * 查询表数据抽样检查点清单
      *
      * @param connection connection
-     * @param sql        检查点查询SQL
+     * @param sql 检查点查询SQL
      * @return 检查点列表
      */
     protected String adasQueryOnePoint(Connection connection, String sql) {
@@ -277,8 +328,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
     }
 
     private long durationBetweenToMillis(LocalDateTime start, LocalDateTime end) {
-        return Duration.between(start, end)
-                .toMillis();
+        return Duration.between(start, end).toMillis();
     }
 
     /**
@@ -292,15 +342,15 @@ public abstract class AbstractDataAccessService implements DataAccessService {
             return null;
         }
         return tableMetadata.setDataBaseType(properties.getDatabaseType())
-                .setEndpoint(properties.getEndpoint())
-                .setOgCompatibilityB(isOgCompatibilityB);
+            .setEndpoint(properties.getEndpoint())
+            .setOgCompatibilityB(isOgCompatibilityB);
     }
 
     /**
      * jdbc mode does not use it
      *
-     * @param table          table
-     * @param fileName       fileName
+     * @param table table
+     * @param fileName fileName
      * @param differenceList differenceList
      * @return result
      */
@@ -317,8 +367,8 @@ public abstract class AbstractDataAccessService implements DataAccessService {
      */
     protected List wrapperTableMetadata(List list) {
         list.forEach(meta -> meta.setDataBaseType(properties.getDatabaseType())
-                .setEndpoint(properties.getEndpoint())
-                .setOgCompatibilityB(isOgCompatibilityB));
+            .setEndpoint(properties.getEndpoint())
+            .setOgCompatibilityB(isOgCompatibilityB));
         return list;
     }
 
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java
index f8eb60ac7fa7822cf283ffabf10482cf56c8c484..322796bbd00a0004d5ebed5cdd7651482e37eb05 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java
@@ -269,4 +269,9 @@ public class CsvDataAccessService implements DataAccessService {
     public LowerCaseTableNames queryLowerCaseTableNames() {
         return LowerCaseTableNames.INSENSITIVE;
     }
+
+    @Override
+    public List queryTableUniqueColumns(String tableName) {
+        return null;
+    }
 }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/DataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/DataAccessService.java
index 5d2e84d1847b8fcb59b91fa845dda9a0f7058fc7..6eaef3875e047addeba0a532d739732a8783d80e 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/DataAccessService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/DataAccessService.java
@@ -25,6 +25,7 @@ import org.opengauss.datachecker.common.entry.extract.TableMetadata;
 import org.springframework.jdbc.core.RowMapper;
 
 import javax.sql.DataSource;
+
 import java.sql.Connection;
 import java.util.List;
 import java.util.Map;
@@ -111,7 +112,7 @@ public interface DataAccessService {
      * query table column min value
      *
      * @param connection connection
-     * @param param      param
+     * @param param param
      * @return min value of string
      */
     String min(Connection connection, DataAccessParam param);
@@ -120,7 +121,7 @@ public interface DataAccessService {
      * query table column max value
      *
      * @param connection connection
-     * @param param      param
+     * @param param param
      * @return max value of string
      */
     String max(Connection connection, DataAccessParam param);
@@ -136,10 +137,10 @@ public interface DataAccessService {
     /**
      * query row data by sql
      *
-     * @param sql       sql
-     * @param param     sql param
+     * @param sql sql
+     * @param param sql param
      * @param rowMapper row mapper
-     * @param        data type
+     * @param  data type
      * @return data
      */
      List query(String sql, Map param, RowMapper rowMapper);
@@ -147,10 +148,10 @@ public interface DataAccessService {
     /**
      * query data from csv file
      *
-     * @param table          table
-     * @param fileName       fileName
+     * @param table table
+     * @param fileName fileName
      * @param differenceList differenceList
-     * @return
+     * @return data
      */
     List> query(String table, String fileName, List differenceList);
 
@@ -165,7 +166,7 @@ public interface DataAccessService {
      * query table check point list
      *
      * @param connection connection
-     * @param param      param
+     * @param param param
      * @return point list
      */
     List queryPointList(Connection connection, DataAccessParam param);
@@ -187,4 +188,15 @@ public interface DataAccessService {
      * @return value
      */
     LowerCaseTableNames queryLowerCaseTableNames();
+
+    /**
+     * query table unique columns
+     * 
+     *     唯一性约束与唯一性索引
+     * 
+ * + * @param tableName table + * @return unique columns + */ + List queryTableUniqueColumns(String tableName); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java index 40319a2872ca28d42b2a0d5903fea28d9d106aaf..afee7093c45a40b82e9f3cd6c64e27c7e2b5ed2f 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java @@ -21,6 +21,7 @@ import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean; import org.opengauss.datachecker.common.entry.extract.TableMetadata; +import org.opengauss.datachecker.common.entry.extract.UniqueColumnBean; import org.opengauss.datachecker.extract.data.mapper.MysqlMetaDataMapper; import java.sql.Connection; @@ -52,8 +53,8 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public Health health() { String schema = properties.getSchema(); - String sql = "SELECT SCHEMA_NAME tableSchema FROM information_schema.SCHEMATA info WHERE SCHEMA_NAME='" - + schema + "' limit 1"; + String sql = "SELECT SCHEMA_NAME tableSchema FROM information_schema.SCHEMATA info WHERE SCHEMA_NAME='" + schema + + "' limit 1"; return health(schema, sql); } @@ -65,8 +66,8 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public List dasQueryTableNameList() { String schema = properties.getSchema(); - String sql = "SELECT info.table_name tableName FROM information_schema.tables info WHERE table_schema='" - + schema + "'"; + String sql = "select info.table_name tableName from information_schema.tables info where table_schema='" + + schema + "' and table_type='BASE TABLE'"; return adasQueryTableNameList(sql); } @@ -83,11 +84,22 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public List queryTablePrimaryColumns() { String sql = "select table_name tableName ,lower(column_name) columnName from information_schema.columns " - + "where table_schema='" + properties.getSchema() - + "' and column_key='PRI' order by ordinal_position asc "; + + "where table_schema='" + properties.getSchema() + "' and column_key='PRI' order by ordinal_position asc "; return adasQueryTablePrimaryColumns(sql); } + @Override + public List queryTableUniqueColumns(String tableName) { + String schema = properties.getSchema(); + String sql = "select s.table_schema,s.table_name tableName,s.column_name columnName,c.ordinal_position colIdx," + + " s.index_name indexIdentifier from information_schema.statistics s " + + " left join information_schema.columns c on s.table_schema=c.table_schema " + + " and s.table_schema=c.table_schema and s.table_name=c.table_name and s.column_name=c.column_name " + + " where s.table_schema='" + schema + "' and s.table_name='" + tableName + "'" + " and s.non_unique=0;"; + List uniqueColumns = adasQueryTableUniqueColumns(sql); + return translateUniqueToPrimaryColumns(uniqueColumns); + } + @Override public List queryTablePrimaryColumns(String tableName) { return mysqlMetaDataMapper.queryTablePrimaryColumnsByTableName(properties.getSchema(), tableName); @@ -97,11 +109,11 @@ public class MysqlDataAccessService extends AbstractDataAccessService { public List dasQueryTableMetadataList() { LowerCaseTableNames lowerCaseTableNames = getLowerCaseTableNames(); String colTableName = Objects.equals(LowerCaseTableNames.SENSITIVE, lowerCaseTableNames) - ? "info.table_name tableName" - : "lower(info.table_name) tableName"; + ? "info.table_name tableName" + : "lower(info.table_name) tableName"; String sql = " SELECT info.TABLE_SCHEMA tableSchema," + colTableName + ",info.table_rows tableRows , " - + "info.avg_row_length avgRowLength FROM information_schema.tables info WHERE TABLE_SCHEMA='" - + properties.getSchema() + "'"; + + "info.avg_row_length avgRowLength FROM information_schema.tables info WHERE TABLE_SCHEMA='" + + properties.getSchema() + "'"; return wrapperTableMetadata(adasQueryTableMetadataList(sql)); } @@ -130,9 +142,9 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public List queryPointList(Connection connection, DataAccessParam param) { String sql = "select s.%s from (SELECT @rowno:=@rowno+1 as rn,r.%s from %s.%s r," - + " (select @rowno := 0) t ORDER BY r.%s asc) s where mod(s.rn, %s) = 1"; + + " (select @rowno := 0) t ORDER BY r.%s asc) s where mod(s.rn, %s) = 1"; sql = String.format(sql, param.getColName(), param.getColName(), param.getSchema(), param.getName(), - param.getColName(), param.getOffset()); + param.getColName(), param.getOffset()); return adasQueryPointList(connection, sql); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java index 6b5dca7e6a359b1015d8e016ff17d556ce985050..86a0843112937fe99c9c0fa7f7a4283a0c057d40 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java @@ -24,9 +24,11 @@ import org.opengauss.datachecker.common.entry.enums.OgCompatibility; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean; import org.opengauss.datachecker.common.entry.extract.TableMetadata; +import org.opengauss.datachecker.common.entry.extract.UniqueColumnBean; import org.opengauss.datachecker.extract.data.mapper.OpgsMetaDataMapper; import javax.annotation.PostConstruct; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -75,22 +77,43 @@ public class OpgsDataAccessService extends AbstractDataAccessService { return health(schema, sql); } + /** + *
+     * DAS查询表名列表
+     *  select c.relname tableName from pg_class c  LEFT JOIN pg_namespace n on n.oid = c.relnamespace
+     *  where n.nspname=? and c.relkind ='r';
+     *  
+ * + * @return tableNameList + */ @Override public List dasQueryTableNameList() { String schema = properties.getSchema(); String sql = "select c.relname tableName from pg_class c LEFT JOIN pg_namespace n on n.oid = c.relnamespace " - + " where n.nspname='" + schema + "' and c.relkind ='r';"; + + " where n.nspname='" + schema + "' and c.relkind ='r';"; return adasQueryTableNameList(sql); } + /** + *
+     *     查询表主键列信息
+     *      select c.relname tableName,ns.nspname,ns.oid,a.attname columnName from pg_class c
+     *      left join pg_namespace ns on c.relnamespace=ns.oid
+     *      left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped
+     *      inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey)
+     *      where ns.nspname='test' and cs.contype='p';
+     * 
+ * + * @return primaryColumnList 主键列信息列表 + */ @Override public List queryTablePrimaryColumns() { String schema = properties.getSchema(); String sql = "select c.relname tableName,ns.nspname,ns.oid,a.attname columnName from pg_class c " - + "left join pg_namespace ns on c.relnamespace=ns.oid " - + "left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped " - + "inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey) " - + "where ns.nspname='" + schema + "' and cs.contype='p';"; + + "left join pg_namespace ns on c.relnamespace=ns.oid " + + "left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped " + + "inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey) " + + "where ns.nspname='" + schema + "' and cs.contype='p';"; return adasQueryTablePrimaryColumns(sql); } @@ -98,13 +121,25 @@ public class OpgsDataAccessService extends AbstractDataAccessService { public List queryTablePrimaryColumns(String tableName) { String schema = properties.getSchema(); String sql = "select c.relname tableName,ns.nspname,ns.oid,a.attname columnName from pg_class c " - + "left join pg_namespace ns on c.relnamespace=ns.oid " - + "left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped " - + "inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey) " - + "where ns.nspname='" + schema + "' and c.relname='" + tableName + "' and cs.contype='p';"; + + "left join pg_namespace ns on c.relnamespace=ns.oid " + + "left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped " + + "inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey) " + + "where ns.nspname='" + schema + "' and c.relname='" + tableName + "' and cs.contype='p';"; return adasQueryTablePrimaryColumns(sql); } + @Override + public List queryTableUniqueColumns(String tableName) { + String schema = properties.getSchema(); + String sql = "SELECT c.relname AS tableName, ns.nspname, i.indexrelid indexIdentifier, " + + " a.attname AS columnName, a.attnum colIdx FROM pg_index i" + + " JOIN pg_class c ON i.indrelid = c.oid join pg_namespace ns on c.relnamespace=ns.oid" + + " JOIN pg_attribute a ON i.indrelid = a.attrelid AND a.attnum = ANY(i.indkey) " + + " where ns.nspname='" + schema + "' and c.relname='" + tableName + "' and i.indisunique = true;"; + List uniqueColumns = adasQueryTableUniqueColumns(sql); + return translateUniqueToPrimaryColumns(uniqueColumns); + } + @Override public List queryTableColumnsMetaData(String tableName) { return opgsMetaDataMapper.queryTableColumnsMetaData(properties.getSchema(), tableName); @@ -119,12 +154,12 @@ public class OpgsDataAccessService extends AbstractDataAccessService { public List dasQueryTableMetadataList() { LowerCaseTableNames lowerCaseTableNames = getLowerCaseTableNames(); String colTableName = Objects.equals(LowerCaseTableNames.SENSITIVE, lowerCaseTableNames) - ? "c.relname tableName" - : "lower(c.relname) tableName"; + ? "c.relname tableName" + : "lower(c.relname) tableName"; String sql = " select n.nspname tableSchema, " + colTableName + ",c.reltuples tableRows, " - + "case when c.reltuples>0 then pg_table_size(c.oid)/c.reltuples else 0 end as avgRowLength " - + "from pg_class c LEFT JOIN pg_namespace n on n.oid = c.relnamespace " + "where n.nspname='" - + properties.getSchema() + "' and c.relkind ='r';"; + + "case when c.reltuples>0 then pg_table_size(c.oid)/c.reltuples else 0 end as avgRowLength " + + "from pg_class c LEFT JOIN pg_namespace n on n.oid = c.relnamespace " + "where n.nspname='" + + properties.getSchema() + "' and c.relkind ='r';"; return wrapperTableMetadata(adasQueryTableMetadataList(sql)); } @@ -153,9 +188,9 @@ public class OpgsDataAccessService extends AbstractDataAccessService { @Override public List queryPointList(Connection connection, DataAccessParam param) { String sql = "select s.%s from ( select row_number() over(order by r.%s asc) as rn,r.%s from %s.%s r) s" - + " where mod(s.rn, %s ) = 1;"; + + " where mod(s.rn, %s ) = 1;"; sql = String.format(sql, param.getColName(), param.getColName(), param.getColName(), param.getSchema(), - param.getName(), param.getOffset()); + param.getName(), param.getOffset()); return adasQueryPointList(connection, sql); } @@ -164,7 +199,6 @@ public class OpgsDataAccessService extends AbstractDataAccessService { return opgsMetaDataMapper.checkDatabaseNotEmpty(properties.getSchema()); } - @Override public LowerCaseTableNames queryLowerCaseTableNames() { String sql = "SHOW VARIABLES LIKE \"lower_case_table_names\";"; @@ -181,7 +215,8 @@ public class OpgsDataAccessService extends AbstractDataAccessService { } finally { closeConnection(connection); } - return isOgCompatibilityB() ? result.getOrDefault(DOLPHIN_LOWER_CASE_TABLE_NAMES, LowerCaseTableNames.UNKNOWN) - : result.getOrDefault(LOWER_CASE_TABLE_NAMES, LowerCaseTableNames.UNKNOWN); + return isOgCompatibilityB() + ? result.getOrDefault(DOLPHIN_LOWER_CASE_TABLE_NAMES, LowerCaseTableNames.UNKNOWN) + : result.getOrDefault(LOWER_CASE_TABLE_NAMES, LowerCaseTableNames.UNKNOWN); } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java index b2426c17f563ee28d57f59e9e4be03489a14da63..bfaef4565f44cfc4eae433bd9fd544614aa93095 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java @@ -21,6 +21,7 @@ import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean; import org.opengauss.datachecker.common.entry.extract.TableMetadata; +import org.opengauss.datachecker.common.entry.extract.UniqueColumnBean; import org.opengauss.datachecker.extract.data.mapper.OracleMetaDataMapper; import java.sql.Connection; @@ -76,11 +77,24 @@ public class OracleDataAccessService extends AbstractDataAccessService { @Override public List queryTablePrimaryColumns() { String sql = "SELECT A.TABLE_NAME tableName, A.COLUMN_NAME columnName FROM ALL_CONS_COLUMNS A,ALL_CONSTRAINTS B" - + " WHERE A.constraint_name = B.constraint_name AND B.constraint_type = 'P' AND A.OWNER = '" - + properties.getSchema() + "'"; + + " WHERE A.constraint_name = B.constraint_name AND B.constraint_type = 'P' AND A.OWNER = '" + + properties.getSchema() + "'"; return adasQueryTablePrimaryColumns(sql); } + @Override + public List queryTableUniqueColumns(String tableName) { + String schema = properties.getSchema(); + String sql = " SELECT ui.index_name indexIdentifier,ui.table_owner,ui.table_name tableName," + + " utc.column_name columnName, utc.column_id colIdx" + + " from user_indexes ui left join user_ind_columns uic on ui.index_name=uic.index_name " + + " and ui.table_name=uic.table_name " + + " left join user_tab_columns utc on ui.table_name =utc.table_name and uic.column_name=utc.column_name" + + " where ui.uniqueness='UNIQUE' and ui.table_owner='" + schema + "' and ui.table_name='" + tableName + "'"; + List uniqueColumns = adasQueryTableUniqueColumns(sql); + return translateUniqueToPrimaryColumns(uniqueColumns); + } + @Override public List queryTablePrimaryColumns(String tableName) { return oracleMetaDataMapper.queryTablePrimaryColumnsByTableName(properties.getSchema(), tableName); @@ -91,12 +105,11 @@ public class OracleDataAccessService extends AbstractDataAccessService { String schema = properties.getSchema(); LowerCaseTableNames lowerCaseTableNames = getLowerCaseTableNames(); String colTableName = Objects.equals(LowerCaseTableNames.SENSITIVE, lowerCaseTableNames) - ? "t.table_name tableName" - : "lower(t.table_name) tableName"; + ? "t.table_name tableName" + : "lower(t.table_name) tableName"; String sql = "SELECT t.owner tableSchema," + colTableName + ",t.num_rows tableRows,avg_row_len avgRowLength" - + " FROM ALL_TABLES t LEFT JOIN (SELECT DISTINCT table_name from ALL_CONSTRAINTS where OWNER = '" - + schema + "' AND constraint_type='P') pc on t.table_name=pc.table_name WHERE t.OWNER = '" - + schema + "'"; + + " FROM ALL_TABLES t LEFT JOIN (SELECT DISTINCT table_name from ALL_CONSTRAINTS where OWNER = '" + schema + + "' AND constraint_type='P') pc on t.table_name=pc.table_name WHERE t.OWNER = '" + schema + "'"; return wrapperTableMetadata(adasQueryTableMetadataList(sql)); }