diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java index 582f64450504a48a5ec4eec5dabf2b6673f0f437..3d43385fb9853f5835806d2066788f2f1a094fe9 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java @@ -147,14 +147,70 @@ public class ExtractFallbackFactory implements FallbackFactory querySourceTableMetadataHash(String tableName) { + return Result.error("Remote call failed"); + } + + /** + * querySinkTableMetadataHash + * + * @param tableName tableName + * @return error + */ + @Override + public Result querySinkTableMetadataHash(String tableName) { + return Result.error("Remote call failed"); + } + + /** + * querySourceCheckRowData + * + * @param dataLog data Log + * @return error + */ @Override - public Result queryTableMetadataHash(String tableName) { - return Result.error("Remote call, query table metadata hash information exception"); + public Result> querySourceCheckRowData(SourceDataLog dataLog) { + return Result.error("Remote call failed"); } + /** + * querySinkCheckRowData + * + * @param dataLog data Log + * @return error + */ + @Override + public Result> querySinkCheckRowData(SourceDataLog dataLog) { + return Result.error("Remote call failed"); + } + + /** + * querySourceSecondaryCheckRowData + * + * @param dataLog data Log + * @return error + */ + @Override + public Result> querySourceSecondaryCheckRowData(SourceDataLog dataLog) { + return Result.error("Remote call failed"); + } + + /** + * querySinkSecondaryCheckRowData + * + * @param dataLog data Log + * @return error + */ @Override - public Result> querySecondaryCheckRowData(SourceDataLog dataLog) { - return Result.error("Remote call, query secondary verification increment log data exception"); + public Result> querySinkSecondaryCheckRowData(SourceDataLog dataLog) { + return Result.error("Remote call failed"); } @Override diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java index 750fb2f6dc1f0d0542ff1110a5bda7a102375209..31abbbd36072382828347a8f7cd93af44bb3a441 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java @@ -111,7 +111,7 @@ public interface ExtractFeignClient { */ @GetMapping("/extract/query/topic/data") Result> queryTopicData(@RequestParam("tableName") String tableName, - @RequestParam("partitions") int partitions); + @RequestParam("partitions") int partitions); /** * Query the specified incremental topic data @@ -172,8 +172,44 @@ public interface ExtractFeignClient { * @param tableName tableName * @return Table metadata hash */ - @PostMapping("/extract/query/table/metadata/hash") - Result queryTableMetadataHash(@RequestParam(name = "tableName") String tableName); + @PostMapping("/extract/source/table/metadata/hash") + Result querySourceTableMetadataHash(@RequestParam(name = "tableName") String tableName); + + /** + * Query table metadata hash information + * + * @param tableName tableName + * @return Table metadata hash + */ + @PostMapping("/extract/sink/table/metadata/hash") + Result querySinkTableMetadataHash(@RequestParam(name = "tableName") String tableName); + + /** + * Extract incremental log data records + * + * @param dataLog data Log + * @return Return extraction results + */ + @PostMapping("/extract/source/data/row/hash") + Result> querySourceCheckRowData(@RequestBody SourceDataLog dataLog); + + /** + * Extract incremental log data records + * + * @param dataLog data Log + * @return Return extraction results + */ + @PostMapping("/extract/sink/data/row/hash") + Result> querySinkCheckRowData(@RequestBody SourceDataLog dataLog); + + /** + * Extract incremental log data records + * + * @param dataLog data Log + * @return Return extraction results + */ + @PostMapping("/extract/source/secondary/data/row/hash") + Result> querySourceSecondaryCheckRowData(@RequestBody SourceDataLog dataLog); /** * Extract incremental log data records @@ -181,8 +217,8 @@ public interface ExtractFeignClient { * @param dataLog data Log * @return Return extraction results */ - @PostMapping("/extract/query/secondary/data/row/hash") - Result> querySecondaryCheckRowData(@RequestBody SourceDataLog dataLog); + @PostMapping("/extract/sink/secondary/data/row/hash") + Result> querySinkSecondaryCheckRowData(@RequestBody SourceDataLog dataLog); /** * Get the current endpoint configuration information diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/AsyncConfig.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/AsyncConfig.java index d840d7bb74ba27abb19e95231477e7761f17d4d0..47d3742c67aaa2c66583f9a0f1abfeeaa56ccc6b 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/AsyncConfig.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/AsyncConfig.java @@ -44,8 +44,8 @@ public class AsyncConfig { @Bean public ThreadPoolTaskExecutor getAsyncExecutor() { executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(1); - executor.setMaxPoolSize(5); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(10); executor.setQueueCapacity(1000); executor.setThreadNamePrefix("check-executor-"); executor.initialize(); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java index 4bfa284f0d6527b50c2f5eb031fc41ba24c99c05..2e8200c4f8b21abc9bf667742eb6ee61895cdcc4 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java @@ -45,6 +45,7 @@ import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.SpringUtil; import org.opengauss.datachecker.common.web.Result; import org.springframework.lang.NonNull; +import org.springframework.util.StopWatch; import java.time.LocalDateTime; import java.util.ArrayList; @@ -77,7 +78,7 @@ public class IncrementCheckThread extends Thread { private final List sourceBucketList = new ArrayList<>(); private final List sinkBucketList = new ArrayList<>(); private final DifferencePair, Map, Map>> - difference = DifferencePair.of(new HashMap<>(), new HashMap<>(), new HashMap<>()); + difference = DifferencePair.of(new HashMap<>(), new HashMap<>(), new HashMap<>()); private final Map> bucketNumberDiffMap = new HashMap<>(); private final QueryRowDataWapper queryRowDataWapper; private final CheckResultManagerService checkResultManagerService; @@ -91,6 +92,7 @@ public class IncrementCheckThread extends Thread { private CheckRateCache checkRateCache; private EndpointMetaDataManager endpointMetaDataManager; private int maxRowSize; + private StopWatch stopWatch; /** * IncrementCheckThread constructor method @@ -99,7 +101,7 @@ public class IncrementCheckThread extends Thread { * @param support Data Check Runnable Support */ public IncrementCheckThread(@NonNull IncrementDataCheckParam checkParam, - @NonNull DataCheckRunnableSupport support) { + @NonNull DataCheckRunnableSupport support) { startTime = LocalDateTime.now(); dataLog = checkParam.getDataLog(); process = checkParam.getProcess(); @@ -112,6 +114,7 @@ public class IncrementCheckThread extends Thread { checkRateCache = SpringUtil.getBean(CheckRateCache.class); endpointMetaDataManager = SpringUtil.getBean(EndpointMetaDataManager.class); queryRowDataWapper = new QueryRowDataWapper(feignClient); + stopWatch = new StopWatch("inc " + sinkSchema + "." + tableName); } /** @@ -128,34 +131,43 @@ public class IncrementCheckThread extends Thread { @Override public void run() { try { + stopWatch.start("checkTableMetadata"); setName(buildThreadName()); // Metadata verification isTableStructureEquals = checkTableMetadata(); + stopWatch.stop(); if (isTableStructureEquals) { maxRowSize = dataLog.getCompositePrimaryValues().size(); + stopWatch.start("firstCheckCompare " + maxRowSize); // Initial verification firstCheckCompare(); + stopWatch.stop(); // Analyze the initial verification results + stopWatch.start("secondaryCheckCompare"); List diffIdList = parseDiffResult(); // Conduct secondary verification according to the initial verification results secondaryCheckCompare(diffIdList); + stopWatch.stop(); } else { log.error("check table {} metadata error", tableName); } // Verification result verification repair report + stopWatch.start("checkResult"); checkResult(); checkRateCache.add(buildCheckTable()); - log.info(" {} check table {} end", process, tableName); + stopWatch.stop(); } catch (Exception ex) { log.error("check error", ex); + } finally { + log.info(" {} check {} ", process, stopWatch.shortSummary()); } } private CheckTable buildCheckTable() { TableMetadata tableMetadata = endpointMetaDataManager.queryIncrementMetaData(Endpoint.SINK, tableName); return CheckTable.builder().tableName(tableName).rowCount(rowCount) - .completeTimestamp(System.currentTimeMillis()).avgRowLength(tableMetadata.getAvgRowLength()) - .build(); + .completeTimestamp(System.currentTimeMillis()).avgRowLength(tableMetadata.getAvgRowLength()) + .build(); } /** @@ -233,7 +245,7 @@ public class IncrementCheckThread extends Thread { } else { // sourceSize is less than thresholdMinBucketSize, that is, there is only one bucket. Compare DifferencePair subDifference = - compareBucket(sourceBucketList.get(0), sinkBucketList.get(0)); + compareBucket(sourceBucketList.get(0), sinkBucketList.get(0)); difference.getDiffering().putAll(subDifference.getDiffering()); difference.getOnlyOnLeft().putAll(subDifference.getOnlyOnLeft()); difference.getOnlyOnRight().putAll(subDifference.getOnlyOnRight()); @@ -276,8 +288,8 @@ public class IncrementCheckThread extends Thread { * @return Return metadata verification results */ private boolean checkTableMetadata() { - TableMetadataHash sourceTableHash = queryTableMetadataHash(Endpoint.SOURCE, tableName); - TableMetadataHash sinkTableHash = queryTableMetadataHash(Endpoint.SINK, tableName); + TableMetadataHash sourceTableHash = querySourceTableMetadataHash(tableName); + TableMetadataHash sinkTableHash = querySinkTableMetadataHash(tableName); boolean isEqual = Objects.equals(sourceTableHash, sinkTableHash); if (!isEqual) { isExistTableMiss = true; @@ -294,13 +306,26 @@ public class IncrementCheckThread extends Thread { return isEqual; } - private TableMetadataHash queryTableMetadataHash(Endpoint endpoint, String tableName) { - Result result = feignClient.getClient(endpoint).queryTableMetadataHash(tableName); + private TableMetadataHash querySourceTableMetadataHash(String tableName) { + Result result = feignClient.getClient(Endpoint.SOURCE) + .querySourceTableMetadataHash(tableName); if (result.isSuccess()) { return result.getData(); } else { - throw new DispatchClientException(endpoint, - "query table metadata hash " + tableName + " error, " + result.getMessage()); + throw new DispatchClientException( + Endpoint.SOURCE, + "query table metadata hash " + tableName + " error, " + result.getMessage()); + } + } + + private TableMetadataHash querySinkTableMetadataHash(String tableName) { + Result result = feignClient.getClient(Endpoint.SINK) + .querySinkTableMetadataHash(tableName); + if (result.isSuccess()) { + return result.getData(); + } else { + throw new DispatchClientException(Endpoint.SINK, + "query table metadata hash " + tableName + " error, " + result.getMessage()); } } @@ -325,9 +350,9 @@ public class IncrementCheckThread extends Thread { // Merkel tree comparison if (sourceTree.getDepth() != sinkTree.getDepth()) { throw new MerkleTreeDepthException(String.format(Locale.ROOT, - "source & sink data have large different, Please synchronize data again! " - + "merkel tree depth different,source depth=[%d],sink depth=[%d]", sourceTree.getDepth(), - sinkTree.getDepth())); + "source & sink data have large different, Please synchronize data again! " + + "merkel tree depth different,source depth=[%d],sink depth=[%d]", sourceTree.getDepth(), + sinkTree.getDepth())); } Node source = sourceTree.getRoot(); Node sink = sinkTree.getRoot(); @@ -365,7 +390,11 @@ public class IncrementCheckThread extends Thread { * @param bucketList bucket list */ private void initFirstCheckBucketList(Endpoint endpoint, List bucketList) { - List dataList = queryRowDataWapper.queryRowData(endpoint, dataLog); + StopWatch rowDataQueryWatch = new StopWatch("first query row data " + endpoint); + rowDataQueryWatch.start(dataLog.getTableName() + " " + dataLog.getCompositePrimaryValues().size()); + List dataList = queryRowDataWapper.queryCheckRowData(endpoint, dataLog); + rowDataQueryWatch.stop(); + LogUtils.debug(log, "query row data cost: {}", rowDataQueryWatch.shortSummary()); buildBucket(dataList, endpoint, bucketList); } @@ -384,7 +413,11 @@ public class IncrementCheckThread extends Thread { } private void buildSecondaryCheckBucket(Endpoint endpoint, SourceDataLog dataLog, List bucketList) { - final List dataList = getSecondaryCheckRowData(endpoint, dataLog); + StopWatch rowDataSecQueryWatch = new StopWatch("check sec query row data " + endpoint); + rowDataSecQueryWatch.start(dataLog.getTableName() + " " + dataLog.getCompositePrimaryValues().size()); + List dataList = queryRowDataWapper.querySecondaryCheckRowData(endpoint, dataLog); + rowDataSecQueryWatch.stop(); + LogUtils.debug(log, "query sec row data cost: {}", rowDataSecQueryWatch.shortSummary()); buildBucket(dataList, endpoint, bucketList); } @@ -419,13 +452,6 @@ public class IncrementCheckThread extends Thread { }); } - private List getSecondaryCheckRowData(Endpoint endpoint, SourceDataLog dataLog) { - if (dataLog == null || CollectionUtils.isEmpty(dataLog.getCompositePrimaryValues())) { - return new ArrayList<>(); - } - return queryRowDataWapper.queryRowData(endpoint, dataLog); - } - /** * Compare the difference data recorded inside the two barrels * @@ -436,7 +462,7 @@ public class IncrementCheckThread extends Thread { private DifferencePair compareBucket(Bucket sourceBucket, Bucket sinkBucket) { if (sourceBucket == null || sinkBucket == null) { return DifferencePair.of(sourceBucket == null ? sinkBucket.getBucket() : new HashMap<>(), - sinkBucket == null ? sourceBucket.getBucket() : new HashMap<>(), new HashMap()); + sinkBucket == null ? sourceBucket.getBucket() : new HashMap<>(), new HashMap()); } Map sourceMap = sourceBucket.getBucket(); Map sinkMap = sinkBucket.getBucket(); @@ -485,12 +511,14 @@ public class IncrementCheckThread extends Thread { private void checkResult() { final AbstractCheckDiffResultBuilder builder = AbstractCheckDiffResultBuilder.builder(); CheckDiffResult result = - builder.table(tableName).process(process).beginOffset(dataLog.getBeginOffset()).schema(sinkSchema) - .partitions(0).rowCount(rowCount).startTime(startTime).endTime(LocalDateTime.now()) - .isExistTableMiss(isExistTableMiss, onlyExistEndpoint).checkMode(CheckMode.INCREMENT) - .isTableStructureEquals(isTableStructureEquals).keyUpdateSet(difference.getDiffering().keySet()) - .keyInsertSet(difference.getOnlyOnLeft().keySet()).keyDeleteSet(difference.getOnlyOnRight().keySet()) - .build(); + builder.table(tableName).process(process).beginOffset(dataLog.getBeginOffset()).schema(sinkSchema) + .partitions(0).rowCount(rowCount).startTime(startTime).endTime(LocalDateTime.now()) + .isExistTableMiss(isExistTableMiss, onlyExistEndpoint).checkMode(CheckMode.INCREMENT) + .isTableStructureEquals(isTableStructureEquals) + .keyUpdateSet(difference.getDiffering().keySet()) + .keyInsertSet(difference.getOnlyOnLeft().keySet()) + .keyDeleteSet(difference.getOnlyOnRight().keySet()) + .build(); checkResultManagerService.addResult(new CheckPartition(tableName, 0), result); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/QueryRowDataWapper.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/QueryRowDataWapper.java index 278aba1b2e69e068ac0a6e6bab1e64015bf704aa..12a7289d45f54de60642587dea5e8b1190849426 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/QueryRowDataWapper.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/QueryRowDataWapper.java @@ -16,15 +16,17 @@ package org.opengauss.datachecker.check.modules.check; import org.apache.commons.collections4.CollectionUtils; +import org.opengauss.datachecker.check.client.ExtractFeignClient; import org.opengauss.datachecker.check.client.FeignClientService; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; -import org.opengauss.datachecker.common.exception.DispatchClientException; -import org.opengauss.datachecker.common.web.Result; +import org.springframework.util.Assert; import java.util.ArrayList; import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; /** * QueryRowDataWapper @@ -34,6 +36,8 @@ import java.util.List; * @since :11 */ public class QueryRowDataWapper { + private static final int MAX_QUERY_PAGE_SIZE = 100; + private final FeignClientService feignClient; /** @@ -48,22 +52,51 @@ public class QueryRowDataWapper { /** * Query incremental data * - * @param endpoint endpoint - * @param tableName tableName + * @param endpoint endpoint + * @param dataLog dataLog * @return result */ - public List queryIncrementRowData(Endpoint endpoint, String tableName) { - List data = new ArrayList<>(); - Result> result = feignClient.getClient(endpoint).queryIncrementTopicData(tableName); - if (!result.isSuccess()) { - throw new DispatchClientException(endpoint, - "query topic data of tableName " + tableName + " error, " + result.getMessage()); + public List queryCheckRowData(Endpoint endpoint, SourceDataLog dataLog) { + if (dataLog == null || CollectionUtils.isEmpty(dataLog.getCompositePrimaryValues())) { + return new ArrayList<>(); } - while (result.isSuccess() && !CollectionUtils.isEmpty(result.getData())) { - data.addAll(result.getData()); - result = feignClient.getClient(endpoint).queryIncrementTopicData(tableName); + ExtractFeignClient client = feignClient.getClient(endpoint); + Assert.isTrue(Objects.nonNull(client), endpoint + " feign client is null"); + + final List compositeKeys = dataLog.getCompositePrimaryValues(); + List result = new ArrayList<>(); + if (compositeKeys.size() > MAX_QUERY_PAGE_SIZE) { + AtomicInteger cnt = new AtomicInteger(0); + List tempCompositeKeys = new ArrayList<>(); + compositeKeys.forEach(key -> { + tempCompositeKeys.add(key); + if (cnt.incrementAndGet() % MAX_QUERY_PAGE_SIZE == 0) { + SourceDataLog pageDataLog = getPageDataLog(dataLog, tempCompositeKeys); + if (Endpoint.SOURCE.equals(endpoint)) { + result.addAll(client.querySourceCheckRowData(pageDataLog).getData()); + } else { + result.addAll(client.querySinkCheckRowData(pageDataLog).getData()); + } + tempCompositeKeys.clear(); + } + }); + if (CollectionUtils.isNotEmpty(tempCompositeKeys)) { + SourceDataLog pageDataLog = getPageDataLog(dataLog, tempCompositeKeys); + if (Endpoint.SOURCE.equals(endpoint)) { + result.addAll(client.querySourceCheckRowData(pageDataLog).getData()); + } else { + result.addAll(client.querySinkCheckRowData(pageDataLog).getData()); + } + tempCompositeKeys.clear(); + } + } else { + if (Endpoint.SOURCE.equals(endpoint)) { + result.addAll(client.querySourceCheckRowData(dataLog).getData()); + } else { + result.addAll(client.querySinkCheckRowData(dataLog).getData()); + } } - return data; + return result; } /** @@ -73,15 +106,58 @@ public class QueryRowDataWapper { * @param dataLog dataLog * @return result */ - public List queryRowData(Endpoint endpoint, SourceDataLog dataLog) { + public List querySecondaryCheckRowData(Endpoint endpoint, SourceDataLog dataLog) { if (dataLog == null || CollectionUtils.isEmpty(dataLog.getCompositePrimaryValues())) { return new ArrayList<>(); } - Result> result = feignClient.getClient(endpoint).querySecondaryCheckRowData(dataLog); - if (!result.isSuccess()) { - throw new DispatchClientException(endpoint, - "query topic data of tableName " + dataLog.getTableName() + " error, " + result.getMessage()); + ExtractFeignClient client = feignClient.getClient(endpoint); + Assert.isTrue(Objects.nonNull(client), endpoint + " feign client is null"); + + final List compositeKeys = dataLog.getCompositePrimaryValues(); + List result = new ArrayList<>(); + if (compositeKeys.size() > MAX_QUERY_PAGE_SIZE) { + AtomicInteger cnt = new AtomicInteger(0); + List tempCompositeKeys = new ArrayList<>(); + compositeKeys.forEach(key -> { + tempCompositeKeys.add(key); + if (cnt.incrementAndGet() % MAX_QUERY_PAGE_SIZE == 0) { + SourceDataLog pageDataLog = getPageDataLog(dataLog, tempCompositeKeys); + if (Endpoint.SOURCE.equals(endpoint)) { + result.addAll(client.querySourceSecondaryCheckRowData(pageDataLog).getData()); + } else { + result.addAll(client.querySinkSecondaryCheckRowData(pageDataLog).getData()); + } + tempCompositeKeys.clear(); + } + }); + if (CollectionUtils.isNotEmpty(tempCompositeKeys)) { + SourceDataLog pageDataLog = getPageDataLog(dataLog, tempCompositeKeys); + if (Endpoint.SOURCE.equals(endpoint)) { + result.addAll(client.querySourceSecondaryCheckRowData(pageDataLog).getData()); + } else { + result.addAll(client.querySinkSecondaryCheckRowData(pageDataLog).getData()); + } + tempCompositeKeys.clear(); + } + } else { + if (Endpoint.SOURCE.equals(endpoint)) { + result.addAll(client.querySourceSecondaryCheckRowData(dataLog).getData()); + } else { + result.addAll(client.querySinkSecondaryCheckRowData(dataLog).getData()); + } + } + return result; + } + + private SourceDataLog getPageDataLog(SourceDataLog dataLog, List tempCompositeKeys) { + SourceDataLog pageDataLog = new SourceDataLog(); + pageDataLog.setTableName(dataLog.getTableName()); + pageDataLog.setCompositePrimarys(dataLog.getCompositePrimarys()); + if (Objects.nonNull(tempCompositeKeys)) { + pageDataLog.setCompositePrimaryValues(tempCompositeKeys); + } else { + pageDataLog.setCompositePrimaryValues(dataLog.getCompositePrimaryValues()); } - return result.getData(); + return pageDataLog; } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/CheckResultManagerService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/CheckResultManagerService.java index 223f20e301d7e90a2f4a55571015630bd62bc948..f332433c9c0025cc4fda4f75593ebd87dad68219 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/CheckResultManagerService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/CheckResultManagerService.java @@ -24,6 +24,8 @@ import org.opengauss.datachecker.check.event.CheckSuccessReportEvent; import org.opengauss.datachecker.check.load.CheckEnvironment; import org.opengauss.datachecker.check.modules.check.CheckDiffResult; import org.opengauss.datachecker.check.modules.check.CheckResultConstants; +import org.opengauss.datachecker.common.config.ConfigCache; +import org.opengauss.datachecker.common.constant.ConfigConstants; import org.opengauss.datachecker.common.entry.check.CheckPartition; import org.opengauss.datachecker.common.entry.common.RepairEntry; import org.opengauss.datachecker.common.entry.enums.Endpoint; @@ -106,8 +108,11 @@ public class CheckResultManagerService implements ApplicationContextAware { String logFilePath = getLogRootPath(); final List successList = filterResultByResult(CheckResultConstants.RESULT_SUCCESS); final List failedList = filterResultByResult(CheckResultConstants.RESULT_FAILED); - boolean ogCompatibility = feignClient.checkTargetOgCompatibility(); - reduceFailedRepair(logFilePath, failedList, ogCompatibility); + if (!ConfigCache.hasCompatibility()) { + ConfigCache.put(ConfigConstants.OG_COMPATIBILITY_B, feignClient.checkTargetOgCompatibility()); + } + Boolean isOgCompatibility = ConfigCache.getBooleanValue(ConfigConstants.OG_COMPATIBILITY_B); + reduceFailedRepair(logFilePath, failedList, isOgCompatibility); reduceSummary(successList, failedList); } catch (Exception exception) { log.error("summaryCheckResult ", exception); @@ -131,10 +136,10 @@ public class CheckResultManagerService implements ApplicationContextAware { if (CollectionUtils.isNotEmpty(updateDiffs)) { RepairEntry update = new RepairEntry(); update.setTable(tableFailed.getTable()) - .setSchema(tableFailed.getSchema()) - .setFileName(tableFailed.getFileName()) - .setOgCompatibility(ogCompatibility) - .setDiffSet(updateDiffs); + .setSchema(tableFailed.getSchema()) + .setFileName(tableFailed.getFileName()) + .setOgCompatibility(ogCompatibility) + .setDiffSet(updateDiffs); final List updateRepairs = feignClient.buildRepairStatementUpdateDml(Endpoint.SOURCE, update); appendLogFile(repairFile, updateRepairs); } @@ -145,10 +150,10 @@ public class CheckResultManagerService implements ApplicationContextAware { if (CollectionUtils.isNotEmpty(insertDiffs)) { RepairEntry insert = new RepairEntry(); insert.setTable(tableFailed.getTable()) - .setSchema(tableFailed.getSchema()) - .setFileName(tableFailed.getFileName()) - .setOgCompatibility(ogCompatibility) - .setDiffSet(insertDiffs); + .setSchema(tableFailed.getSchema()) + .setFileName(tableFailed.getFileName()) + .setOgCompatibility(ogCompatibility) + .setDiffSet(insertDiffs); final List insertRepairs = feignClient.buildRepairStatementInsertDml(Endpoint.SOURCE, insert); appendLogFile(repairFile, insertRepairs); } @@ -159,10 +164,10 @@ public class CheckResultManagerService implements ApplicationContextAware { if (CollectionUtils.isNotEmpty(deleteDiffs)) { RepairEntry delete = new RepairEntry(); delete.setTable(tableFailed.getTable()) - .setSchema(tableFailed.getSchema()) - .setFileName(tableFailed.getFileName()) - .setOgCompatibility(ogCompatibility) - .setDiffSet(deleteDiffs); + .setSchema(tableFailed.getSchema()) + .setFileName(tableFailed.getFileName()) + .setOgCompatibility(ogCompatibility) + .setDiffSet(deleteDiffs); final List deleteRepairs = feignClient.buildRepairStatementDeleteDml(Endpoint.SOURCE, delete); appendLogFile(repairFile, deleteRepairs); } @@ -210,10 +215,10 @@ public class CheckResultManagerService implements ApplicationContextAware { private List filterResultByResult(String resultType) { List resultList = new LinkedList<>(checkResultCache.values() - .stream() - .filter(result -> result.getResult() - .equals(resultType)) - .collect(Collectors.toList())); + .stream() + .filter(result -> result.getResult() + .equals(resultType)) + .collect(Collectors.toList())); if (CheckResultConstants.RESULT_FAILED.equals(resultType)) { resultList.addAll(noCheckedCache.values()); } @@ -222,15 +227,15 @@ public class CheckResultManagerService implements ApplicationContextAware { private int calcTableCount(List resultList) { return (int) resultList.stream() - .map(CheckDiffResult::getTable) - .distinct() - .count(); + .map(CheckDiffResult::getTable) + .distinct() + .count(); } private long calcRowCount(List resultList) { return resultList.stream() - .mapToLong(CheckDiffResult::getRowCount) - .sum(); + .mapToLong(CheckDiffResult::getRowCount) + .sum(); } @Override diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementLogManager.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementLogManager.java index 65f290559064428aebe36eae49e07234f798b0ba..99b90f43fc6c82e8ff5f5dbc19dd150649bb25b7 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementLogManager.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementLogManager.java @@ -20,19 +20,15 @@ import org.opengauss.datachecker.common.util.LogUtils; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; +import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.io.File; import java.io.IOException; -import java.nio.file.FileSystems; import java.nio.file.Path; import java.nio.file.Paths; -import java.nio.file.WatchKey; import java.nio.file.Files; import java.nio.file.SimpleFileVisitor; -import java.nio.file.WatchEvent; import java.nio.file.FileVisitResult; -import java.nio.file.StandardWatchEventKinds; -import java.nio.file.WatchService; import java.nio.file.attribute.BasicFileAttributes; import java.util.Arrays; import java.util.LinkedList; @@ -51,8 +47,8 @@ public class IncrementLogManager { @Resource private ThreadPoolTaskExecutor threadPoolTaskExecutor; - private WatchService watchService; - private LinkedList backDirs = new LinkedList<>(); + private boolean isWatching = true; + private final LinkedList backDirs = new LinkedList<>(); /** * init log dir and register watch service @@ -60,56 +56,45 @@ public class IncrementLogManager { * @param path path */ public void init(String path) { - try { - watchService = FileSystems.getDefault().newWatchService(); - Path dir = Paths.get(path); - File[] files = dir.toFile().listFiles(); - if (files != null && files.length > 0) { - backDirs.addAll(Arrays.stream(files).map(File::toPath).sorted().collect(Collectors.toList())); - } - dir.register(watchService, StandardWatchEventKinds.ENTRY_CREATE); - LogUtils.info(log, "registers path {} with a watch service. ", path); - bakResultLogMonitor(); - } catch (IOException e) { - LogUtils.error(log, "init watch service failed. ", e); - } + bakResultLogMonitor(path); } /** * monitor the result log file + * + * @param path path */ - public void bakResultLogMonitor() { + private void bakResultLogMonitor(String path) { threadPoolTaskExecutor.submit(() -> { - try { - WatchKey key = watchService.take(); - for (WatchEvent event : key.pollEvents()) { - WatchEvent.Kind kind = event.kind(); - - // 过滤出目录本身的事件 - WatchEvent ev = (WatchEvent) event; - Path fileName = ev.context(); - if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE)) { - LogUtils.warn(log, "monitor result back dir {} : {}", kind, fileName); - backDirs.addLast(fileName); - } - - while (backDirs.size() > MAX_BACK_DIR_NUM) { - try { - Path path = backDirs.removeFirst(); - deleteDir(path); - LogUtils.warn(log, "remove result back more dir : {}", path); - } catch (IOException e) { - LogUtils.error(log, "remove result back more dir : {}", e.getMessage()); - } + while (isWatching) { + Path dir = Paths.get(path); + File[] files = dir.toFile().listFiles(); + if (files != null && files.length > 0) { + backDirs.addAll(Arrays.stream(files).map(File::toPath).sorted().collect(Collectors.toList())); + } + while (backDirs.size() > MAX_BACK_DIR_NUM) { + Path delDir = backDirs.removeFirst(); + try { + deleteDir(delDir); + LogUtils.warn(log, "remove result back more dir : {}", delDir); + } catch (IOException e) { + LogUtils.error(log, "remove result back more dir : ", e.getMessage()); + backDirs.addLast(delDir); } } - key.reset(); - } catch (InterruptedException e) { - LogUtils.error(log, "monitor result back dir : {}", e.getMessage()); + backDirs.clear(); } }); } + /** + * stop watch + */ + @PreDestroy + public void destroy() { + isWatching = false; + } + private static void deleteDir(Path dir) throws IOException { Files.walkFileTree(dir, new SimpleFileVisitor() { @Override diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java index 4a4407df424809c5e172d40f0d4caf7351f20d60..d0131494eb26f0cc71ffe425147a30826c671326 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java @@ -29,14 +29,15 @@ import org.opengauss.datachecker.check.modules.report.SliceProgressService; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; import org.opengauss.datachecker.common.entry.report.CheckFailed; import org.opengauss.datachecker.common.exception.CheckingException; -import org.opengauss.datachecker.common.service.DynamicThreadPoolManager; import org.opengauss.datachecker.common.service.ShutdownService; import org.opengauss.datachecker.common.util.FileUtils; import org.opengauss.datachecker.common.util.IdGenerator; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.PhaserUtil; import org.opengauss.datachecker.common.util.ThreadUtil; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; +import org.springframework.util.StopWatch; import javax.annotation.Resource; import java.nio.file.Path; @@ -49,7 +50,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -59,7 +59,6 @@ import static org.opengauss.datachecker.check.modules.check.CheckResultConstants import static org.opengauss.datachecker.check.modules.check.CheckResultConstants.FAILED_LOG_NAME; import static org.opengauss.datachecker.check.modules.check.CheckResultConstants.LEFT_SQUARE_BRACKET; import static org.opengauss.datachecker.check.modules.check.CheckResultConstants.RIGHT_SQUARE_BRACKET; -import static org.opengauss.datachecker.common.constant.DynamicTpConstant.CHECK_EXECUTOR; /** * IncrementManagerService @@ -73,6 +72,9 @@ public class IncrementManagerService { private static final Logger log = LogUtils.getLogger(IncrementManagerService.class); private static final AtomicReference PROCESS_SIGNATURE = new AtomicReference<>(); private static final BlockingQueue> INC_LOG_QUEUE = new LinkedBlockingQueue<>(); + private static final int RETRY_SLEEP_TIMES = 1000; + private static final int MAX_RETRY_SLEEP_TIMES = 1000; + @Resource private DataCheckService dataCheckService; @Resource @@ -86,13 +88,10 @@ public class IncrementManagerService { @Resource private CheckResultManagerService checkResultManagerService; @Resource - private DynamicThreadPoolManager dynamicThreadPoolManager; - @Resource - private IncrementLogManager incrementLogManager; + private ThreadPoolTaskExecutor threadPoolTaskExecutor; private final AtomicInteger retryTimes = new AtomicInteger(0); - private static final int RETRY_SLEEP_TIMES = 1000; - private static final int MAX_RETRY_SLEEP_TIMES = 1000; + private final AtomicInteger totalKeySize = new AtomicInteger(0); /** * Incremental verification log notification @@ -103,10 +102,12 @@ public class IncrementManagerService { if (CollectionUtils.isEmpty(dataLogList)) { return; } + int sum = dataLogList.stream().map(SourceDataLog::length).mapToInt(Integer::intValue).sum(); + totalKeySize.addAndGet(sum); try { INC_LOG_QUEUE.put(dataLogList); - LogUtils.info(log, "add {} data_log to the inc_log_queue,this tables contain : {}", dataLogList.size(), - getDataLogTables(dataLogList)); + LogUtils.info(log, "increment log queue size= {}, current add {} tables, contain : {}", + INC_LOG_QUEUE.size(), dataLogList.size(), getDataLogTables(dataLogList)); } catch (InterruptedException ex) { LogUtils.error(log, "notify inc data logs interrupted "); } @@ -114,15 +115,16 @@ public class IncrementManagerService { private List getDataLogTables(List dataLogList) { return dataLogList.stream() - .map(SourceDataLog::getTableName) - .collect(Collectors.toList()); + .map(SourceDataLog::getTableName) + .collect(Collectors.toList()); } public void startIncrementDataLogs() { if (feignClientService.startIncrementMonitor()) { LogUtils.info(log, "started source increment monitor"); + feignClientService.resumeIncrementMonitor(); ThreadUtil.newSingleThreadExecutor() - .submit(this::checkingIncrementDataLogs); + .submit(this::checkingIncrementDataLogs); } else { throw new CheckingException("start increment monitor failed"); } @@ -130,7 +132,7 @@ public class IncrementManagerService { private void checkingIncrementDataLogs() { Thread.currentThread() - .setName("inc-queue-process-loop"); + .setName("inc-queue-process-loop"); LogUtils.info(log, "started process increment data logs thread"); shutdownService.addMonitor(); while (!shutdownService.isShutdown()) { @@ -147,27 +149,28 @@ public class IncrementManagerService { if (diffCount >= properties.getIncrementMaxDiffCount()) { feignClientService.pauseIncrementMonitor(); LogUtils.warn(log, "pause increment monitor, because the diff-count is too large [{}] ," - + " please to repair this large diff manual !", diffCount); + + " please to repair this large diff manual !", diffCount); ThreadUtil.sleep(getRetryTime()); - } else { + } + if (INC_LOG_QUEUE.isEmpty()) { feignClientService.resumeIncrementMonitor(); - LogUtils.debug(log, "resume increment monitor,and take inc log queue!"); - if (CollectionUtils.isNotEmpty(INC_LOG_QUEUE.peek())) { - dataLogList.addAll(INC_LOG_QUEUE.take()); - } + LogUtils.warn(log, "resume increment monitor, because the inc-log-queue is empty !"); + } else { + feignClientService.pauseIncrementMonitor(); + LogUtils.warn(log, "pause increment monitor, because the inc-log-queue is not empty !"); + dataLogList.addAll(INC_LOG_QUEUE.take()); } // Collect the last verification results and build an incremental verification log mergeDataLogList(dataLogList, lastResults); if (CollectionUtils.isNotEmpty(dataLogList)) { ExportCheckResult.backCheckResultDirectory(); - incrementLogManager.bakResultLogMonitor(); sliceProgressService.startProgressing(); PROCESS_SIGNATURE.set(IdGenerator.nextId36()); checkResultManagerService.progressing(dataLogList.size()); incrementDataLogsChecking(dataLogList); + feignClientService.resumeIncrementMonitor(); } else { - LogUtils.info(log, "There are no differences to verify at the current time. Please wait."); - ThreadUtil.sleepSecond(5); + ThreadUtil.sleepSecond(1); } } catch (Exception ex) { LogUtils.error(log, "take inc log queue interrupted "); @@ -185,32 +188,32 @@ public class IncrementManagerService { AtomicInteger diffCount = new AtomicInteger(); lastResults.forEach((tableName, lastLog) -> { diffCount.addAndGet(lastLog.getCompositePrimaryValues() - .size()); + .size()); }); return diffCount.get(); } private void mergeDataLogList(List dataLogList, Map collectLastResults) { final Map dataLogMap = dataLogList.stream() - .collect(Collectors.toMap(SourceDataLog::getTableName, - Function.identity())); + .collect(Collectors.toMap(SourceDataLog::getTableName, + Function.identity())); collectLastResults.forEach((tableName, lastLog) -> { if (dataLogMap.containsKey(tableName)) { final List values = dataLogMap.get(tableName) - .getCompositePrimaryValues(); + .getCompositePrimaryValues(); final long beginOffset = Math.min(dataLogMap.get(tableName) - .getBeginOffset(), lastLog.getBeginOffset()); + .getBeginOffset(), lastLog.getBeginOffset()); final Set margeValueSet = new HashSet<>(); margeValueSet.addAll(values); margeValueSet.addAll(lastLog.getCompositePrimaryValues()); dataLogMap.get(tableName) - .getCompositePrimaryValues() - .clear(); + .getCompositePrimaryValues() + .clear(); dataLogMap.get(tableName) - .setBeginOffset(beginOffset); + .setBeginOffset(beginOffset); dataLogMap.get(tableName) - .getCompositePrimaryValues() - .addAll(margeValueSet); + .getCompositePrimaryValues() + .addAll(margeValueSet); } else { dataLogList.add(lastLog); } @@ -221,22 +224,26 @@ public class IncrementManagerService { private void incrementDataLogsChecking(List dataLogList) { String processNo = PROCESS_SIGNATURE.get(); List taskList = new ArrayList<>(); - LogUtils.info(log, "check increment {} data log", processNo); - - ThreadPoolExecutor asyncCheckExecutor = dynamicThreadPoolManager.getExecutor(CHECK_EXECUTOR); + StopWatch stopWatch = new StopWatch("check inc data " + processNo); + stopWatch.start("start check thread"); dataLogList.forEach(dataLog -> { LogUtils.debug(log, "increment process=[{}] , tableName=[{}], begin offset =[{}], diffSize=[{}]", processNo, - dataLog.getTableName(), dataLog.getBeginOffset(), dataLog.getCompositePrimaryValues() - .size()); + dataLog.getTableName(), dataLog.getBeginOffset(), dataLog.getCompositePrimaryValues() + .size()); // Verify the data according to the table name and Kafka partition taskList.add(dataCheckService.incrementCheckTableData(dataLog.getTableName(), processNo, dataLog)); }); + stopWatch.stop(); + stopWatch.start("wait check thread"); // Block the current thread until all thread pool tasks are executed. - PhaserUtil.submit(asyncCheckExecutor, taskList, () -> { + PhaserUtil.submit(threadPoolTaskExecutor, taskList, () -> { LogUtils.debug(log, "multiple check subtasks have been completed."); }); + stopWatch.stop(); + stopWatch.start("summary check result"); checkResultManagerService.summaryCheckResult(); - LogUtils.info(log, "check increment {} data log end", processNo); + stopWatch.stop(); + LogUtils.debug(log, stopWatch.prettyPrint()); } /** @@ -252,16 +259,16 @@ public class IncrementManagerService { } List historyFailedList = new ArrayList<>(); checkResultFileList.stream() - .filter(path -> FAILED_LOG_NAME.equals(path.getFileName() - .toString())) - .forEach(path -> { - try { - String content = wrapperFailedResultArray(path); - historyFailedList.addAll(JSONObject.parseArray(content, CheckFailed.class)); - } catch (CheckingException | JSONException ex) { - log.error("load check result {} has error", path.getFileName()); - } - }); + .filter(path -> FAILED_LOG_NAME.equals(path.getFileName() + .toString())) + .forEach(path -> { + try { + String content = wrapperFailedResultArray(path); + historyFailedList.addAll(JSONObject.parseArray(content, CheckFailed.class)); + } catch (CheckingException | JSONException ex) { + log.error("load check result {} has error", path.getFileName()); + } + }); LogUtils.debug(log, "collect last failed results {}", historyFailedList.size()); return parseCheckResult(historyFailedList); @@ -292,15 +299,15 @@ public class IncrementManagerService { final List values = dataLogMarge.getCompositePrimaryValues(); diffKeyValues.addAll(values); dataLogMarge.getCompositePrimaryValues() - .clear(); + .clear(); dataLogMarge.getCompositePrimaryValues() - .addAll(diffKeyValues); + .addAll(diffKeyValues); dataLogMarge.setBeginOffset(beginOffset); } else { SourceDataLog sourceDataLog = new SourceDataLog(); sourceDataLog.setTableName(tableName) - .setBeginOffset(dataLog.getBeginOffset()) - .setCompositePrimaryValues(new ArrayList<>(diffKeyValues)); + .setBeginOffset(dataLog.getBeginOffset()) + .setCompositePrimaryValues(new ArrayList<>(diffKeyValues)); dataLogMap.put(tableName, sourceDataLog); } }); diff --git a/datachecker-check/src/main/resources/application.yml b/datachecker-check/src/main/resources/application.yml index 70d69d727ebc164422661f95cb7ce2bddd3db952..a01ea97ee834a7f241140ad4d6c9c110eec7dd48 100644 --- a/datachecker-check/src/main/resources/application.yml +++ b/datachecker-check/src/main/resources/application.yml @@ -3,7 +3,7 @@ server: shutdown: graceful tomcat: threads: - max: 10 + max: 20 logging: config: classpath:./config/log4j2.xml debug: false @@ -79,6 +79,13 @@ data: auto-delete-topic: 2 increment-max-diff-count: 10 sql_mode_pad_char_to_full_length: false # sql_mode : pad_char_to_full_length disabled +feign: + okhttp: + enabled: true + httpclient: + connection-timeout: 500000 + max-connections: 500 + rules: # There are three types of filtering rules: table-level rules, row-level rules, and column-level rules. # Rules are configured in the form of List collection. diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/config/ConfigCache.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/config/ConfigCache.java index 88d1274ddc896b7eb8ec439ebfd982cf936063eb..2a7a54377d04268f7087d5ab6a6a4862644569f1 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/config/ConfigCache.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/config/ConfigCache.java @@ -128,6 +128,7 @@ public class ConfigCache { /** * 检查配置项,当前是否删除Topic + * * @return */ public static boolean isDeleteTopic() { @@ -222,7 +223,7 @@ public class ConfigCache { /** * remove config cache * - * @param key + * @param key key */ public static void remove(String key) { CACHE.remove(key); @@ -236,4 +237,13 @@ public class ConfigCache { public static boolean isCsvMode() { return Objects.equals(CheckMode.CSV, getCheckMode()); } + + /** + * 是否包含兼容性配置属性 + * + * @return boolean + */ + public static boolean hasCompatibility() { + return CACHE.containsKey(ConfigConstants.OG_COMPATIBILITY_B); + } } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/SourceDataLog.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/SourceDataLog.java index 1ee248167c797160464df0d7ba650a9dca28c02d..3061144f4513680ec63da979fe66cf8af31e8112 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/SourceDataLog.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/SourceDataLog.java @@ -56,4 +56,13 @@ public class SourceDataLog { */ @Schema(name = "compositePrimaryValues") private List compositePrimaryValues; + + /** + * source data of compositePrimarys size + * + * @return length + */ + public int length() { + return compositePrimarys.size(); + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/DruidDataSourceConfig.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/DruidDataSourceConfig.java index 8127464a42fb68b0b72c4485ff000685c4e8f319..8ebe4125e2bc3488f911357c1954c023f0aed20b 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/DruidDataSourceConfig.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/DruidDataSourceConfig.java @@ -15,6 +15,7 @@ package org.opengauss.datachecker.extract.config; +import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; @@ -52,8 +53,12 @@ public class DruidDataSourceConfig implements DataSourceConfig { @Bean(name = "dataSource") @ConfigurationProperties(prefix = "spring.datasource.druid") public DataSource druidDataSource() { - return DruidDataSourceBuilder.create() - .build(); + DruidDataSource druidDataSource = DruidDataSourceBuilder.create() + .build(); + druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(20); + druidDataSource.setMaxActive(20); + druidDataSource.setMinIdle(10); + return druidDataSource; } @Bean(name = "sqlSessionFactory") @@ -66,7 +71,7 @@ public class DruidDataSourceConfig implements DataSourceConfig { @Bean("sqlSessionTemplate") public SqlSessionTemplate primarySqlSessionTemplate( - @Qualifier("sqlSessionFactory") SqlSessionFactory sessionfactory) { + @Qualifier("sqlSessionFactory") SqlSessionFactory sessionfactory) { return new SqlSessionTemplate(sessionfactory); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java index 6854f05587a63bc401c7e2acd17aa61625f03491..7eed4915149a7bdf406ef74a9fdaacd4450d389f 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java @@ -90,7 +90,7 @@ public class ExtractController { @Operation(summary = "construction a data extraction task for the current endpoint") @PostMapping("/extract/build/task/all") public Result buildExtractTaskAllTables( - @Parameter(name = "processNo", description = "execution process no") @RequestParam(name = "processNo") + @Parameter(name = "processNo", description = "execution process no") @RequestParam(name = "processNo") String processNo) { return Result.success(dataExtractService.buildExtractTaskAllTables(processNo)); } @@ -138,7 +138,7 @@ public class ExtractController { @Operation(summary = "execute the data extraction task that has been created for the current endpoint") @PostMapping("/extract/exec/task/all") public Result execExtractTaskAllTables( - @Parameter(name = "processNo", description = "execution process no") @RequestParam(name = "processNo") + @Parameter(name = "processNo", description = "execution process no") @RequestParam(name = "processNo") String processNo) { dataExtractService.execExtractTaskAllTables(processNo); return Result.success(); @@ -163,9 +163,7 @@ public class ExtractController { * @return table data extraction task information */ @GetMapping("/extract/table/info") - @Operation(summary = "queries information about data extraction tasks in a specified table in the current process.") - Result queryTableInfo( - @Parameter(name = "tableName", description = "table name") @RequestParam(name = "tableName") String tableName) { + Result queryTableInfo(@RequestParam(name = "tableName") String tableName) { return Result.success(dataExtractService.queryTableInfo(tableName)); } @@ -179,16 +177,32 @@ public class ExtractController { @Operation(summary = "querying table data") @PostMapping("/extract/query/table/data") Result>> queryTableColumnValues( - @NotEmpty(message = "the name of the table to be repaired belongs cannot be empty") - @RequestParam(name = "tableName") String tableName, - @NotEmpty(message = "the primary key set to be repaired belongs cannot be empty") @RequestBody + @NotEmpty(message = "the name of the table to be repaired belongs cannot be empty") + @RequestParam(name = "tableName") String tableName, + @NotEmpty(message = "the primary key set to be repaired belongs cannot be empty") @RequestBody Set compositeKeySet) { return Result.success(dataExtractService.queryTableColumnValues(tableName, new ArrayList<>(compositeKeySet))); } - @Operation(summary = "query the metadata of the current table structure and perform hash calculation.") - @PostMapping("/extract/query/table/metadata/hash") - Result queryTableMetadataHash(@RequestParam(name = "tableName") String tableName) { + /** + * Query the hash value of the metadata of the table + * + * @param tableName tableName + * @return TableMetadataHash + */ + @PostMapping("/extract/source/table/metadata/hash") + Result querySourceTableMetadataHash(@RequestParam(name = "tableName") String tableName) { + return Result.success(dataExtractService.queryTableMetadataHash(tableName)); + } + + /** + * Query the hash value of the metadata of the table + * + * @param tableName tableName + * @return TableMetadataHash + */ + @PostMapping("/extract/sink/table/metadata/hash") + Result querySinkTableMetadataHash(@RequestParam(name = "tableName") String tableName) { return Result.success(dataExtractService.queryTableMetadataHash(tableName)); } @@ -203,6 +217,42 @@ public class ExtractController { return Result.success(dataExtractService.queryIncrementMetaData(tableName)); } + /** + * queries data corresponding to a specified primary key value in a table + * and performs hash for verification data query. + * + * @param dataLog data change logs + * @return row data hash + */ + @PostMapping("/extract/source/data/row/hash") + Result> querySourceCheckRowData(@RequestBody SourceDataLog dataLog) { + return Result.success(dataExtractService.querySecondaryCheckRowData(dataLog)); + } + + /** + * queries data corresponding to a specified primary key value in a table + * and performs hash for verification data query. + * + * @param dataLog data change logs + * @return row data hash + */ + @PostMapping("/extract/sink/data/row/hash") + Result> querySinkCheckRowData(@RequestBody SourceDataLog dataLog) { + return Result.success(dataExtractService.querySecondaryCheckRowData(dataLog)); + } + + /** + * queries data corresponding to a specified primary key value in a table + * and performs hash for secondary verification data query. + * + * @param dataLog data change logs + * @return row data hash + */ + @PostMapping("/extract/source/secondary/data/row/hash") + Result> querySourceSecondaryCheckRowData(@RequestBody SourceDataLog dataLog) { + return Result.success(dataExtractService.querySecondaryCheckRowData(dataLog)); + } + /** * queries data corresponding to a specified primary key value in a table * and performs hash for secondary verification data query. @@ -210,8 +260,8 @@ public class ExtractController { * @param dataLog data change logs * @return row data hash */ - @PostMapping("/extract/query/secondary/data/row/hash") - Result> querySecondaryCheckRowData(@RequestBody SourceDataLog dataLog) { + @PostMapping("/extract/sink/secondary/data/row/hash") + Result> querySinkSecondaryCheckRowData(@RequestBody SourceDataLog dataLog) { return Result.success(dataExtractService.querySecondaryCheckRowData(dataLog)); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DataConsolidationServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DataConsolidationServiceImpl.java index a38677ae47d919b27f61c2155f3e1f9bdbca7f3a..09e0def0372c22571fde8475da99c5cf867a57bc 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DataConsolidationServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DataConsolidationServiceImpl.java @@ -101,6 +101,7 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { public void pauseOrResumeIncrementMonitor(Boolean pauseOrResume) { if (Objects.nonNull(worker)) { worker.switchPauseOrResume(pauseOrResume); + LOG.info("increment monitor :{}", pauseOrResume.booleanValue() ? "pause" : "resume"); } } @@ -133,10 +134,10 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { } final List columnsMetas = tableMetadata.getColumnsMetas(); final List dateList = columnsMetas.stream() - .filter(column -> StringUtils.equalsIgnoreCase(column.getColumnType(), - MYSQL_DATE_TYPE)) - .map(ColumnsMetaData::getColumnName) - .collect(Collectors.toList()); + .filter(column -> StringUtils.equalsIgnoreCase(column.getColumnType(), + MYSQL_DATE_TYPE)) + .map(ColumnsMetaData::getColumnName) + .collect(Collectors.toList()); final Map valueMap = debeziumDataBean.getData(); dateList.forEach(dateField -> { final String time = valueMap.get(dateField); @@ -150,7 +151,7 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { private String decompressLocalDate(int compressDate) { return LocalDate.ofEpochDay(compressDate) - .format(DATE_FORMATTER); + .format(DATE_FORMATTER); } /** @@ -194,9 +195,9 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { final Set allKeys = MetaDataCache.getAllKeys(); checkDebeziumEnvironment(config.getDebeziumTopic(), config.getDebeziumTables(), allKeys); INCREMENT_CHECK_CONIFG.setDebeziumTables(config.getDebeziumTables()) - .setDebeziumTopic(config.getDebeziumTopic()) - .setGroupId(config.getGroupId()) - .setPartitions(config.getPartitions()); + .setDebeziumTopic(config.getDebeziumTopic()) + .setGroupId(config.getGroupId()) + .setPartitions(config.getPartitions()); } /** @@ -209,14 +210,14 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { * @param allTableSet Source end table set */ private void checkDebeziumEnvironment( - @NotEmpty(message = "Debezium configuration topic cannot be empty") String debeziumTopic, - List debeziumTables, - @NotEmpty(message = "Source side table metadata cache exception") Set allTableSet) { + @NotEmpty(message = "Debezium configuration topic cannot be empty") String debeziumTopic, + List debeziumTables, + @NotEmpty(message = "Source side table metadata cache exception") Set allTableSet) { checkSourceEndpoint(); if (!kafkaAdminService.isTopicExists(debeziumTopic)) { // The configuration item debezium topic information does not exist throw new DebeziumConfigException( - "The configuration item debezium topic " + debeziumTopic + " information does not exist"); + "The configuration item debezium topic " + debeziumTopic + " information does not exist"); } if (CollectionUtils.isEmpty(debeziumTables)) { @@ -224,19 +225,19 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { return; } final List allTableList = allTableSet.stream() - .map(String::toUpperCase) - .collect(Collectors.toList()); + .map(String::toUpperCase) + .collect(Collectors.toList()); List invalidTables = debeziumTables.stream() - .map(String::toUpperCase) - .filter(table -> !allTableList.contains(table)) - .collect(Collectors.toList()); + .map(String::toUpperCase) + .filter(table -> !allTableList.contains(table)) + .collect(Collectors.toList()); if (!CollectionUtils.isEmpty(invalidTables)) { // The configuration item debezium tables contains non-existent or black and white list tables LOG.warn("in current period , debezium has no table to check"); throw new DebeziumConfigException( - "The configuration item debezium-tables contains non-existent or black-and-white list tables:" - + invalidTables.toString()); + "The configuration item debezium-tables contains non-existent or black-and-white list tables:" + + invalidTables.toString()); } } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumAvroHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumAvroHandler.java index 446037d9c403fb2990367e4cbde01713abcbf639..31a9ad6840dfb7cf11c44e5e432ce03fce655508 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumAvroHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumAvroHandler.java @@ -66,7 +66,7 @@ public class DebeziumAvroHandler implements DebeziumDataHandler queue) { + @NotNull LinkedBlockingQueue queue) { try { if (Objects.isNull(message)) { return; @@ -167,9 +167,10 @@ public class DebeziumAvroHandler implements DebeziumDataHandler parseRecordData(Record message, String key) { - final Object object = message.get(key); - if (Objects.nonNull(object)) { - return JSONObject.parseObject(object.toString(), new TypeReference<>() {}); + Object object = message.get(key); + if (Objects.nonNull(object) && object instanceof Record) { + return JSONObject.parseObject(object.toString(), new TypeReference<>() { + }); } else { return new HashMap<>(0); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java index a4503814cf4b4b0a010db5c22d8255515cf869e1..cda357456efd7bfdc6a4e5c9971c9ceecc349dc9 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java @@ -28,6 +28,7 @@ import javax.annotation.PreDestroy; import java.time.Duration; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** * DebeziumWorker @@ -40,11 +41,20 @@ public class DebeziumWorker implements Runnable { private static final Logger log = LogUtils.getLogger(); private static final AtomicBoolean PAUSE_OR_RESUME = new AtomicBoolean(WorkerSwitch.RESUME); private static final AtomicBoolean RUNNING = new AtomicBoolean(true); + private static final AtomicInteger POLL_BATCH_COUNT = new AtomicInteger(); + private static final int MAX_BATCH_COUNT = 1000; private static final String NAME = "DebeziumWorker"; + private DebeziumConsumerListener debeziumConsumerListener; private KafkaConsumerConfig kafkaConsumerConfig; private KafkaConsumer consumer = null; + /** + * DebeziumWorker + * + * @param debeziumConsumerListener debeziumConsumerListener + * @param kafkaConsumerConfig kafkaConsumerConfig + */ public DebeziumWorker(DebeziumConsumerListener debeziumConsumerListener, KafkaConsumerConfig kafkaConsumerConfig) { this.debeziumConsumerListener = debeziumConsumerListener; this.kafkaConsumerConfig = kafkaConsumerConfig; @@ -53,7 +63,7 @@ public class DebeziumWorker implements Runnable { @Override public void run() { Thread.currentThread() - .setName(NAME); + .setName(NAME); log.info("The Debezium message listener task has started"); consumer = kafkaConsumerConfig.getDebeziumConsumer(); while (RUNNING.get()) { @@ -77,9 +87,14 @@ public class DebeziumWorker implements Runnable { log.error("DebeziumWorker unknown error, message,{},{}", record.toString(), ex); } } + POLL_BATCH_COUNT.addAndGet(records.count()); + if (POLL_BATCH_COUNT.get() > MAX_BATCH_COUNT) { + PAUSE_OR_RESUME.set(WorkerSwitch.PAUSE); + POLL_BATCH_COUNT.set(0); + } } else { log.info("consumer record count=0"); - ThreadUtil.sleepSecond(5); + PAUSE_OR_RESUME.set(WorkerSwitch.PAUSE); } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java index eb5941bc7e6cc7f00500a5d1e0ead4b566230ddd..abb4e2c802c63278aaa18a3f2dc1ef9e6e48b9ea 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java @@ -61,6 +61,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.lang.NonNull; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; +import org.springframework.util.StopWatch; import javax.annotation.Resource; import java.util.ArrayList; @@ -74,7 +75,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -175,7 +175,7 @@ public class DataExtractServiceImpl implements DataExtractService { return PageExtract.buildInitPage(taskList.size()); } else { LogUtils.error(log, "process={} is running extract task , {} please wait ... ", atomicProcessNo.get(), - processNo); + processNo); throw new ProcessMultipleException("process {" + atomicProcessNo.get() + "} is running extract task"); } } @@ -187,7 +187,7 @@ public class DataExtractServiceImpl implements DataExtractService { int endIdx = pageExtract.getPageEndIdx(); for (; startIdx < pageExtract.getSize() && startIdx < endIdx; startIdx++) { pageList.add(taskReference.get() - .get(startIdx)); + .get(startIdx)); } LogUtils.info(log, "fetchExtractTaskPageTables ={}", pageExtract); return pageList; @@ -216,17 +216,17 @@ public class DataExtractServiceImpl implements DataExtractService { if (CollectionUtils.isEmpty(taskList) || CollectionUtils.isEmpty(tableNames)) { LogUtils.info(log, "build extract task process={} taskList={} ,MetaCache tableNames={}", processNo, - taskList.size(), tableNames); + taskList.size(), tableNames); return; } final List extractTasks = taskList.stream() - .filter(task -> tableNames.contains(task.getTableName())) - .collect(Collectors.toList()); + .filter(task -> tableNames.contains(task.getTableName())) + .collect(Collectors.toList()); extractTasks.forEach(this::updateSinkMetadata); taskReference.get() - .addAll(extractTasks); + .addAll(extractTasks); LogUtils.info(log, "build extract task process={} count={},", processNo, taskReference.get() - .size()); + .size()); atomicProcessNo.set(processNo); // taskCountMap is used to count the number of tasks in table fragment query @@ -252,7 +252,7 @@ public class DataExtractServiceImpl implements DataExtractService { public void cleanBuildTask() { if (Objects.nonNull(taskReference.getAcquire())) { taskReference.getAcquire() - .clear(); + .clear(); } TableExtractStatusCache.removeAll(); atomicProcessNo.set(PROCESS_NO_RESET); @@ -310,8 +310,8 @@ public class DataExtractServiceImpl implements DataExtractService { ThreadUtil.sleep(MAX_SLEEP_MILLIS_TIME); if (sleepCount++ > MAX_SLEEP_COUNT) { LogUtils.info(log, "endpoint [{}] and process[{}}] task is empty!", extractProperties.getEndpoint() - .getDescription(), - processNo); + .getDescription(), + processNo); break; } } @@ -333,12 +333,12 @@ public class DataExtractServiceImpl implements DataExtractService { final String tableName = task.getTableName(); if (!tableCheckStatus.containsKey(tableName) || tableCheckStatus.get(tableName) == -1) { LogUtils.warn(log, "Abnormal table[{}] status, ignoring the current table data extraction task", - tableName); + tableName); return; } Endpoint endpoint = extractProperties.getEndpoint(); while (!tableCheckPointCache.getAll() - .containsKey(tableName)) { + .containsKey(tableName)) { ThreadUtil.sleepHalfSecond(); } List summarizedCheckPoint = tableCheckPointCache.get(tableName); @@ -352,7 +352,7 @@ public class DataExtractServiceImpl implements DataExtractService { } private List buildSliceByTask(List summarizedCheckPoint, TableMetadata tableMetadata, - Endpoint endpoint) { + Endpoint endpoint) { List sliceVoList; if (noTableSlice(tableMetadata, summarizedCheckPoint)) { sliceVoList = buildSingleSlice(tableMetadata, endpoint); @@ -368,14 +368,14 @@ public class DataExtractServiceImpl implements DataExtractService { if (sliceVoList.size() <= 20) { ExecutorService executorService = dynamicThreadPoolManager.getExecutor(EXTRACT_EXECUTOR); LogUtils.debug(log, "table [{}] get executorService success", sliceVoList.get(0) - .getTable()); + .getTable()); sliceVoList.forEach(sliceVo -> executorService.submit(sliceFactory.createSliceProcessor(sliceVo))); } else { int topicSize = ConfigCache.getIntValue(ConfigConstants.MAXIMUM_TOPIC_SIZE); int extendMaxPoolSize = ConfigCache.getIntValue(ConfigConstants.EXTEND_MAXIMUM_POOL_SIZE); ExecutorService extendExecutor = dynamicThreadPoolManager.getFreeExecutor(topicSize, extendMaxPoolSize); LogUtils.debug(log, "table [{}] get extendExecutor success", sliceVoList.get(0) - .getTable()); + .getTable()); sliceVoList.forEach(sliceVo -> extendExecutor.submit(sliceFactory.createSliceProcessor(sliceVo))); } } @@ -432,7 +432,7 @@ public class DataExtractServiceImpl implements DataExtractService { List checkPointList; try { checkPointList = sliceStatement.getCheckPoint(metadata, - ConfigCache.getIntValue(ConfigConstants.MAXIMUM_TABLE_SLICE_SIZE)); + ConfigCache.getIntValue(ConfigConstants.MAXIMUM_TABLE_SLICE_SIZE)); } catch (Exception ex) { LogUtils.error(log, "getCheckPoint error:", ex); return new ArrayList<>(); @@ -483,14 +483,14 @@ public class DataExtractServiceImpl implements DataExtractService { tableCheckPointCache.put(tableName, checkPointList); } checkPointManager.send(new CheckPointData().setTableName(tableName) - .setDigit(checkPoint.checkPkNumber(task.getTableMetadata())) - .setCheckPointList(checkPointList)); + .setDigit(checkPoint.checkPkNumber(task.getTableMetadata())) + .setCheckPointList(checkPointList)); } private String sliceTaskNameBuilder(@NonNull String tableName, int index) { return TASK_NAME_PREFIX.concat(tableName) - .concat("_slice_") - .concat(String.valueOf(index + 1)); + .concat("_slice_") + .concat(String.valueOf(index + 1)); } private void registerTopic(ExtractTask task) { @@ -541,30 +541,19 @@ public class DataExtractServiceImpl implements DataExtractService { @Override public List querySecondaryCheckRowData(SourceDataLog dataLog) { final String tableName = dataLog.getTableName(); + StopWatch stopWatch = new StopWatch("endpoint - query row data"); final List compositeKeys = dataLog.getCompositePrimaryValues(); + stopWatch.start("query " + tableName + " metadata"); final TableMetadata metadata = metaDataService.getMetaDataOfSchemaByCache(tableName); + stopWatch.stop(); if (Objects.isNull(metadata)) { throw new TableNotExistException(tableName); } - if (compositeKeys.size() > MAX_QUERY_PAGE_SIZE) { - List result = new ArrayList<>(); - AtomicInteger cnt = new AtomicInteger(0); - List tempCompositeKeys = new ArrayList<>(); - compositeKeys.forEach(key -> { - tempCompositeKeys.add(key); - if (cnt.incrementAndGet() % MAX_QUERY_PAGE_SIZE == 0) { - result.addAll( - dataManipulationService.queryColumnHashValues(tableName, tempCompositeKeys, metadata)); - tempCompositeKeys.clear(); - } - }); - if (CollectionUtils.isNotEmpty(tempCompositeKeys)) { - result.addAll(dataManipulationService.queryColumnHashValues(tableName, tempCompositeKeys, metadata)); - } - return result; - } else { - return dataManipulationService.queryColumnHashValues(tableName, compositeKeys, metadata); - } + stopWatch.start("query " + tableName + " " + compositeKeys.size()); + List result = dataManipulationService.queryColumnHashValues(tableName, compositeKeys, metadata); + stopWatch.stop(); + log.debug("endpoint - query row data - {}", stopWatch.prettyPrint()); + return result; } @Override diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java index 54d8f1006c826088fe44d945c3eaa78e323c4b09..46f722006633c7f6d1a8dcb259d31b7287e2bce1 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java @@ -16,12 +16,15 @@ package org.opengauss.datachecker.extract.task; import org.apache.commons.collections4.CollectionUtils; +import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; import org.opengauss.datachecker.common.entry.enums.DataBaseType; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.entry.extract.TableMetadataHash; +import org.opengauss.datachecker.common.exception.ExtractDataAccessException; +import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.LongHashFunctionWrapper; import org.opengauss.datachecker.extract.config.ExtractProperties; import org.opengauss.datachecker.extract.data.access.DataAccessService; @@ -32,6 +35,7 @@ import org.opengauss.datachecker.extract.util.MetaDataUtil; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.Assert; +import org.springframework.util.StopWatch; import javax.annotation.Resource; import java.util.Comparator; @@ -49,11 +53,11 @@ import java.util.Objects; */ @Service public class DataManipulationService { + private static final Logger log = LogUtils.getLogger(DataManipulationService.class); private static final LongHashFunctionWrapper HASH_UTIL = new LongHashFunctionWrapper(); private final ResultSetHashHandler resultSetHashHandler = new ResultSetHashHandler(); private final ResultSetHandlerFactory resultSetFactory = new ResultSetHandlerFactory(); - @Value("${spring.extract.databaseType}") private DataBaseType databaseType; @Resource @@ -72,26 +76,26 @@ public class DataManipulationService { * @return query result */ public List queryColumnHashValues(String tableName, List compositeKeys, - TableMetadata tableMetadata) { + TableMetadata tableMetadata) { Assert.isTrue(Objects.nonNull(tableMetadata), "Abnormal table metadata , failed to build select SQL"); final List primaryMetas = tableMetadata.getPrimaryMetas(); Assert.isTrue(!CollectionUtils.isEmpty(primaryMetas), - "The metadata information of the table primary key is abnormal, and the construction of select SQL failed"); + "The metadata of the table primary is abnormal, , failed to build select SQL"); final SelectDmlBuilder dmlBuilder = new SelectDmlBuilder(databaseType, tableMetadata.isOgCompatibilityB()); // Single primary key table data query if (primaryMetas.size() == 1) { final ColumnsMetaData primaryData = primaryMetas.get(0); String querySql = dmlBuilder.dataBaseType(databaseType).schema(extractProperties.getSchema()) - .columns(tableMetadata.getColumnsMetas()).tableName(tableName) - .conditionPrimary(primaryData).build(); + .columns(tableMetadata.getColumnsMetas()).tableName(tableName) + .conditionPrimary(primaryData).build(); return queryColumnValuesSinglePrimaryKey(querySql, compositeKeys, tableMetadata); } else { // Compound primary key table data query String querySql = dmlBuilder.dataBaseType(databaseType).schema(extractProperties.getSchema()) - .columns(tableMetadata.getColumnsMetas()).tableName(tableName) - .conditionCompositePrimary(primaryMetas).build(); + .columns(tableMetadata.getColumnsMetas()).tableName(tableName) + .conditionCompositePrimary(primaryMetas).build(); List batchParam = dmlBuilder.conditionCompositePrimaryValue(primaryMetas, compositeKeys); return queryColumnValuesByCompositePrimary(querySql, batchParam, tableMetadata); } @@ -106,24 +110,25 @@ public class DataManipulationService { * @return query result */ public List> queryColumnValues(String tableName, List compositeKeys, - TableMetadata metadata) { - Assert.isTrue(Objects.nonNull(metadata), "Abnormal table metadata information, failed to build select SQL"); + TableMetadata metadata) { + Assert.isTrue(Objects.nonNull(metadata), + "Abnormal table metadata information, failed to build select SQL"); final List primaryMetas = metadata.getPrimaryMetas(); Assert.isTrue(!CollectionUtils.isEmpty(primaryMetas), - "The metadata information of the table primary key is abnormal, and the construction of select SQL failed"); + "The metadata of the table primary is abnormal, failed to build select SQL"); final SelectDmlBuilder dmlBuilder = new SelectDmlBuilder(databaseType, metadata.isOgCompatibilityB()); List> resultMap; // Single primary key table data query if (primaryMetas.size() == 1) { final ColumnsMetaData primaryData = primaryMetas.get(0); String querySql = dmlBuilder.schema(extractProperties.getSchema()).columns(metadata.getColumnsMetas()) - .tableName(tableName).conditionPrimary(primaryData).build(); + .tableName(tableName).conditionPrimary(primaryData).build(); resultMap = queryColumnValuesSinglePrimaryKey(querySql, compositeKeys); } else { // Compound primary key table data query String querySql = dmlBuilder.schema(extractProperties.getSchema()).columns(metadata.getColumnsMetas()) - .tableName(tableName).conditionCompositePrimary(primaryMetas).build(); + .tableName(tableName).conditionCompositePrimary(primaryMetas).build(); List batchParam = dmlBuilder.conditionCompositePrimaryValue(primaryMetas, compositeKeys); resultMap = queryColumnValuesByCompositePrimary(querySql, batchParam); } @@ -140,7 +145,7 @@ public class DataManipulationService { * @return Query data results */ private List queryColumnValuesByCompositePrimary(String selectDml, List batchParam, - TableMetadata tableMetadata) { + TableMetadata tableMetadata) { // Query the current task data and organize the data HashMap paramMap = new HashMap<>(InitialCapacity.CAPACITY_1); paramMap.put(DmlBuilder.PRIMARY_KEYS, batchParam); @@ -181,7 +186,7 @@ public class DataManipulationService { * @return Query data results */ private List queryColumnValuesSinglePrimaryKey(String selectDml, List primaryKeys, - TableMetadata tableMetadata) { + TableMetadata tableMetadata) { // Query the current task data and organize the data HashMap paramMap = new HashMap<>(InitialCapacity.CAPACITY_1); paramMap.put(DmlBuilder.PRIMARY_KEYS, primaryKeys); @@ -195,6 +200,7 @@ public class DataManipulationService { return queryColumnValues(selectDml, paramMap); } + /** * Primary key table data query * @@ -203,13 +209,18 @@ public class DataManipulationService { * @return query result */ private List queryColumnValues(String selectDml, Map paramMap, - TableMetadata tableMetadata) { - List columns = MetaDataUtil.getTableColumns(tableMetadata); - List primary = MetaDataUtil.getTablePrimaryColumns(tableMetadata); + TableMetadata tableMetadata) { // Use JDBC to query the current task to extract data - ResultSetHandler resultSetHandler = resultSetFactory.createHandler(databaseType); - return dataAccessService.query(selectDml, paramMap, - (rs, rowNum) -> resultSetHashHandler.handler(primary, columns, resultSetHandler.putOneResultSetToMap(rs))); + try { + ResultSetHandler handler = resultSetFactory.createHandler(databaseType); + log.debug("row data : {} param {}", selectDml, paramMap); + return dataAccessService.query(selectDml, paramMap, + (rs, rowNum) -> resultSetHashHandler.handler(MetaDataUtil.getTablePrimaryColumns(tableMetadata), + MetaDataUtil.getTableColumns(tableMetadata), handler.putOneResultSetToMap(rs))); + } catch (Exception e) { + log.error("Failed to query data", e); + throw new ExtractDataAccessException("Failed to query data " + e.getMessage()); + } } private List> queryColumnValues(String selectDml, Map paramMap) { @@ -217,149 +228,6 @@ public class DataManipulationService { return dataAccessService.query(selectDml, paramMap, (rs, rowNum) -> handler.putOneResultSetToMap(rs)); } -// /** -// * Build the replace SQL statement of the specified table -// * -// * @param tableName tableName -// * @param compositeKeySet composite key set -// * @param metadata metadata -// * @return Return to SQL list -// */ -// public List buildReplace(String schema, String tableName, Set compositeKeySet, -// TableMetadata metadata, boolean ogCompatibility) { -// List resultList = new ArrayList<>(); -// final String localSchema = getLocalSchema(schema); -// List> columnValues = queryColumnValues(tableName, List.copyOf(compositeKeySet), metadata); -// Map> compositeKeyValues = -// transtlateColumnValues(columnValues, metadata.getPrimaryMetas()); -// UpdateDmlBuilder builder = new UpdateDmlBuilder(DataBaseType.OG, ogCompatibility); -// builder.metadata(metadata).tableName(tableName).schema(localSchema); -// compositeKeySet.forEach(compositeKey -> { -// Map columnValue = compositeKeyValues.get(compositeKey); -// if (Objects.nonNull(columnValue) && !columnValue.isEmpty()) { -// builder.columnsValues(columnValue); -// resultList.add(builder.build()); -// } -// }); -// return resultList; -// } -// -// /** -// * Build the insert SQL statement of the specified table -// * -// * @param tableName tableName -// * @param compositeKeySet composite key set -// * @param metadata metadata -// * @return Return to SQL list -// */ -// public List buildInsert(String schema, String tableName, Set compositeKeySet, -// TableMetadata metadata, boolean ogCompatibility) { -// -// List resultList = new ArrayList<>(); -// final String localSchema = getLocalSchema(schema); -// InsertDmlBuilder builder = new InsertDmlBuilder(DataBaseType.OG, ogCompatibility); -// builder.schema(localSchema).tableName(tableName).columns(metadata.getColumnsMetas()); -// List> columnValues = -// queryColumnValues(tableName, new ArrayList<>(compositeKeySet), metadata); -// Map> compositeKeyValues = -// transtlateColumnValues(columnValues, metadata.getPrimaryMetas()); -// compositeKeySet.forEach(compositeKey -> { -// Map columnValue = compositeKeyValues.get(compositeKey); -// if (Objects.nonNull(columnValue) && !columnValue.isEmpty()) { -// resultList.add(builder.columnsValue(columnValue, metadata.getColumnsMetas()).build()); -// } -// }); -// return resultList; -// } -// -// private Map> transtlateColumnValues(List> columnValues, -// List primaryMetas) { -// final List primaryKeys = getCompositeKeyColumns(primaryMetas); -// Map> map = new HashMap<>(InitialCapacity.CAPACITY_16); -// columnValues.forEach(values -> { -// map.put(getCompositeKey(values, primaryKeys), values); -// }); -// return map; -// } -// -// private List getCompositeKeyColumns(List primaryMetas) { -// return primaryMetas.stream().map(ColumnsMetaData::getColumnName).collect(Collectors.toUnmodifiableList()); -// } -// -// private String getCompositeKey(Map columnValues, List primaryKeys) { -// return primaryKeys.stream().map(key -> columnValues.get(key)) -// .collect(Collectors.joining(ExtConstants.PRIMARY_DELIMITER)); -// } -// -// /** -// * Build a batch delete SQL statement for the specified table -// * -// * @param tableName tableName -// * @param compositeKeySet composite key set -// * @param primaryMetas Primary key metadata information -// * @return Return to SQL list -// */ -// public List buildBatchDelete(String schema, String tableName, Set compositeKeySet, -// List primaryMetas) { -// List resultList = new ArrayList<>(); -// final String localSchema = getLocalSchema(schema); -// if (primaryMetas.size() == 1) { -// final ColumnsMetaData primaryMeta = primaryMetas.stream().findFirst().get(); -// compositeKeySet.forEach(compositeKey -> { -// final String deleteDml = -// new BatchDeleteDmlBuilder().tableName(tableName).schema(localSchema).conditionPrimary(primaryMeta) -// .build(); -// resultList.add(deleteDml); -// }); -// } else { -// compositeKeySet.forEach(compositeKey -> { -// resultList.add(new BatchDeleteDmlBuilder().tableName(tableName).schema(localSchema) -// .conditionCompositePrimary(primaryMetas).build()); -// }); -// } -// return resultList; -// } -// -// /** -// * Build the delete SQL statement of the specified table -// * -// * @param tableName tableName -// * @param compositeKeySet composite key set -// * @param primaryMetas Primary key metadata information -// * @param ogCompatibility -// * @return Return to SQL list -// */ -// public List buildDelete(String schema, String tableName, Set compositeKeySet, -// List primaryMetas, boolean ogCompatibility) { -// -// List resultList = new ArrayList<>(); -// final String localSchema = getLocalSchema(schema); -// if (primaryMetas.size() == 1) { -// final ColumnsMetaData primaryMeta = primaryMetas.stream().findFirst().get(); -// compositeKeySet.forEach(compositeKey -> { -// DeleteDmlBuilder deleteDmlBuilder = new DeleteDmlBuilder(DataBaseType.OG, ogCompatibility); -// final String deleteDml = -// deleteDmlBuilder.tableName(tableName).schema(localSchema).condition(primaryMeta, compositeKey) -// .build(); -// resultList.add(deleteDml); -// }); -// } else { -// compositeKeySet.forEach(compositeKey -> { -// DeleteDmlBuilder deleteDmlBuilder = new DeleteDmlBuilder(DataBaseType.OG, ogCompatibility); -// resultList.add(deleteDmlBuilder.tableName(tableName).schema(localSchema) -// .conditionCompositePrimary(compositeKey, primaryMetas).build()); -// }); -// } -// return resultList; -// } -// -// private String getLocalSchema(String schema) { -// if (StringUtils.isEmpty(schema)) { -// return extractProperties.getSchema(); -// } -// return schema; -// } - /** * Query the metadata information of the current table structure and hash * @@ -367,20 +235,27 @@ public class DataManipulationService { * @return Table structure hash */ public TableMetadataHash queryTableMetadataHash(String tableName) { + StopWatch stopWatch = new StopWatch("TableMetadataHash"); + stopWatch.start(tableName); final TableMetadataHash tableMetadataHash = new TableMetadataHash(); final List allTableNames = metaDataService.queryAllTableNames(); tableMetadataHash.setTableName(tableName); - if (allTableNames.contains(tableName)) { - final List columnsMetaData = metaDataService.queryTableColumnMetaDataOfSchema(tableName); - StringBuffer buffer = new StringBuffer(); - columnsMetaData.sort(Comparator.comparing(ColumnsMetaData::getOrdinalPosition)); - columnsMetaData.forEach(column -> { - buffer.append(column.getColumnName()).append(column.getOrdinalPosition()); - }); - tableMetadataHash.setTableHash(HASH_UTIL.hashBytes(buffer.toString())); - } else { - tableMetadataHash.setTableHash(-1L); - } + allTableNames.stream() + .filter(table -> table.equalsIgnoreCase(tableName)) + .findFirst() + .ifPresentOrElse(table -> { + List columnsMetaData = metaDataService.queryTableColumnMetaDataOfSchema(tableName); + StringBuffer buffer = new StringBuffer(); + columnsMetaData.sort(Comparator.comparing(ColumnsMetaData::getOrdinalPosition)); + columnsMetaData.forEach(column -> { + buffer.append(column.getColumnName()).append(column.getOrdinalPosition()); + }); + tableMetadataHash.setTableHash(HASH_UTIL.hashBytes(buffer.toString())); + }, () -> { + tableMetadataHash.setTableHash(-1L); + }); + stopWatch.stop(); + LogUtils.debug(log, stopWatch.prettyPrint()); return tableMetadataHash; } } diff --git a/datachecker-extract/src/main/resources/application.yml b/datachecker-extract/src/main/resources/application.yml index d64f9bf4d6b9dc6fbd6a8b2ce15bb6559588268b..1a18650b42e9ff2c974e1432c8c097e6b8fd5d76 100644 --- a/datachecker-extract/src/main/resources/application.yml +++ b/datachecker-extract/src/main/resources/application.yml @@ -2,7 +2,7 @@ server: shutdown: graceful tomcat: threads: - max: 10 + max: 20 debug: false logging: config: classpath:log4j2.xml @@ -54,6 +54,9 @@ mybatis: feign: okhttp: enabled: true + httpclient: + connection-timeout: 500000 + max-connections: 500 springdoc: version: '@springdoc.version@'