diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java index 073aed26d5fac1b12949522aee798f73ccd5d797..e6645e6f2b4b40db6e85559afc7bcba75df9bd1a 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java @@ -18,6 +18,10 @@ package org.opengauss.datachecker.check.modules.check; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.thread.ThreadUtil; +import feign.RetryableException; + import org.apache.commons.collections4.CollectionUtils; import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.check.cache.CheckRateCache; @@ -41,6 +45,7 @@ import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.entry.extract.TableMetadataHash; +import org.opengauss.datachecker.common.exception.CheckingException; import org.opengauss.datachecker.common.exception.DispatchClientException; import org.opengauss.datachecker.common.exception.MerkleTreeDepthException; import org.opengauss.datachecker.common.util.LogUtils; @@ -71,6 +76,7 @@ import java.util.Set; public class IncrementCheckThread extends Thread { private static final Logger log = LogUtils.getLogger(); private static final int THRESHOLD_MIN_BUCKET_SIZE = 2; + private static final int MAX_RETRY_TIME = 30; private static final String THREAD_NAME_PRIFEX = "increment-data-check-"; private final String tableName; @@ -151,7 +157,7 @@ public class IncrementCheckThread extends Thread { secondaryCheckCompare(diffIdList); stopWatch.stop(); } else { - log.error("check table {} metadata error", tableName); + log.warn("the table structure of {} is inconsistent.", tableName); } // Verification result verification repair report stopWatch.start("checkResult"); @@ -159,7 +165,9 @@ public class IncrementCheckThread extends Thread { checkRateCache.add(buildCheckTable()); stopWatch.stop(); } catch (Exception ex) { - log.error("{}check error", ErrorCode.INCREMENT_CHECK_ERROR, ex); + stopWatch.stop(); + log.error("check table[{}] error{}", tableName, stopWatch.prettyPrint()); + log.error("{}check table[{}] error", ErrorCode.INCREMENT_CHECK_ERROR, tableName, ex); } finally { log.info(" {} check {} ", process, stopWatch.shortSummary()); } @@ -204,9 +212,9 @@ public class IncrementCheckThread extends Thread { private void initFirstCheckBucketList() { // Get the Kafka partition number corresponding to the current task // Initialize source bucket column list data - initFirstCheckBucketList(Endpoint.SOURCE, sourceBucketList); + initFirstCheckBucketList(Endpoint.SOURCE, sourceBucketList, 0); // Initialize destination bucket column list data - initFirstCheckBucketList(Endpoint.SINK, sinkBucketList); + initFirstCheckBucketList(Endpoint.SINK, sinkBucketList, 0); // Align the source and destination bucket list alignAllBuckets(); sortBuckets(sourceBucketList); @@ -294,13 +302,15 @@ public class IncrementCheckThread extends Thread { TableMetadataHash sinkTableHash = querySinkTableMetadataHash(tableName); boolean isEqual = Objects.equals(sourceTableHash, sinkTableHash); if (!isEqual) { - isExistTableMiss = true; if (sourceTableHash.getTableHash() == -1) { onlyExistEndpoint = Endpoint.SINK; + isExistTableMiss = true; } else if (sinkTableHash.getTableHash() == -1) { onlyExistEndpoint = Endpoint.SOURCE; + isExistTableMiss = true; } else { onlyExistEndpoint = null; + isExistTableMiss = false; } } else { isExistTableMiss = false; @@ -388,15 +398,26 @@ public class IncrementCheckThread extends Thread { * And assemble Kafka data into the specified bucket list {@code bucketList} * * - * @param endpoint endpoint + * @param endpoint endpoint * @param bucketList bucket list + * @param retry retry */ - private void initFirstCheckBucketList(Endpoint endpoint, List bucketList) { + private void initFirstCheckBucketList(Endpoint endpoint, List bucketList, int retry) { + List dataList = null; StopWatch rowDataQueryWatch = new StopWatch("first query row data " + endpoint); rowDataQueryWatch.start(dataLog.getTableName() + " " + dataLog.getCompositePrimaryValues().size()); - List dataList = queryRowDataWapper.queryCheckRowData(endpoint, dataLog); - rowDataQueryWatch.stop(); - LogUtils.debug(log, "query row data cost: {}", rowDataQueryWatch.shortSummary()); + try { + dataList = queryRowDataWapper.queryCheckRowData(endpoint, dataLog); + rowDataQueryWatch.stop(); + LogUtils.debug(log, "query row data cost: {}", rowDataQueryWatch.shortSummary()); + } catch (RetryableException ex) { + if (retry >= MAX_RETRY_TIME) { + throw new CheckingException("query row data failed, retry time is " + MAX_RETRY_TIME); + } + ThreadUtil.safeSleep(1000); + LogUtils.warn(log, "retry to query row data cost: {}", rowDataQueryWatch.shortSummary()); + initFirstCheckBucketList(endpoint, bucketList, retry + 1); + } buildBucket(dataList, endpoint, bucketList); } @@ -512,15 +533,27 @@ public class IncrementCheckThread extends Thread { private void checkResult() { final AbstractCheckDiffResultBuilder builder = AbstractCheckDiffResultBuilder.builder(); - CheckDiffResult result = - builder.table(tableName).process(process).beginOffset(dataLog.getBeginOffset()).schema(sinkSchema) - .partitions(0).rowCount(rowCount).startTime(startTime).endTime(LocalDateTime.now()) - .isExistTableMiss(isExistTableMiss, onlyExistEndpoint).checkMode(CheckMode.INCREMENT) - .isTableStructureEquals(isTableStructureEquals) - .keyUpdateSet(difference.getDiffering().keySet()) - .keyInsertSet(difference.getOnlyOnLeft().keySet()) - .keyDeleteSet(difference.getOnlyOnRight().keySet()) - .build(); + builder.table(tableName) + .process(process) + .beginOffset(dataLog.getBeginOffset()) + .schema(sinkSchema) + .partitions(0) + .rowCount(rowCount) + .startTime(startTime) + .endTime(LocalDateTime.now()) + .isExistTableMiss(isExistTableMiss, onlyExistEndpoint) + .checkMode(CheckMode.INCREMENT) + .isTableStructureEquals(isTableStructureEquals); + if (CollUtil.isNotEmpty(difference.getDiffering())) { + builder.keyUpdateSet(difference.getDiffering().keySet()); + } + if (CollUtil.isNotEmpty(difference.getOnlyOnLeft())) { + builder.keyInsertSet(difference.getOnlyOnLeft().keySet()); + } + if (CollUtil.isNotEmpty(difference.getOnlyOnRight())) { + builder.keyDeleteSet(difference.getOnlyOnRight().keySet()); + } + CheckDiffResult result = builder.build(); checkResultManagerService.addResult(new CheckPartition(tableName, 0), result); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java index 278ccb26b94df3a13abada642a950ac022f63f33..3670bee355648bbc4b65584d9680818b1765da2a 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java @@ -36,7 +36,6 @@ import org.opengauss.datachecker.extract.util.MetaDataUtil; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.Assert; -import org.springframework.util.StopWatch; import javax.annotation.Resource; import java.util.Comparator; @@ -236,27 +235,20 @@ public class DataManipulationService { * @return Table structure hash */ public TableMetadataHash queryTableMetadataHash(String tableName) { - StopWatch stopWatch = new StopWatch("TableMetadataHash"); - stopWatch.start(tableName); final TableMetadataHash tableMetadataHash = new TableMetadataHash(); - final List allTableNames = metaDataService.queryAllTableNames(); tableMetadataHash.setTableName(tableName); - allTableNames.stream() - .filter(table -> table.equalsIgnoreCase(tableName)) - .findFirst() - .ifPresentOrElse(table -> { - List columnsMetaData = metaDataService.queryTableColumnMetaDataOfSchema(tableName); - StringBuffer buffer = new StringBuffer(); - columnsMetaData.sort(Comparator.comparing(ColumnsMetaData::getOrdinalPosition)); - columnsMetaData.forEach(column -> { - buffer.append(column.getColumnName()).append(column.getOrdinalPosition()); - }); - tableMetadataHash.setTableHash(HASH_UTIL.hashBytes(buffer.toString())); - }, () -> { - tableMetadataHash.setTableHash(-1L); - }); - stopWatch.stop(); - LogUtils.debug(log, stopWatch.prettyPrint()); + TableMetadata tableMetadata = dataAccessService.queryTableMetadata(tableName); + if (Objects.nonNull(tableMetadata)) { + List columnsMetaData = metaDataService.queryTableColumnMetaDataOfSchema(tableName); + StringBuffer buffer = new StringBuffer(); + columnsMetaData.sort(Comparator.comparing(ColumnsMetaData::getOrdinalPosition)); + columnsMetaData.forEach(column -> { + buffer.append(column.getColumnName()).append(column.getOrdinalPosition()); + }); + tableMetadataHash.setTableHash(HASH_UTIL.hashBytes(buffer.toString())); + } else { + tableMetadataHash.setTableHash(-1L); + } return tableMetadataHash; } }