From de28a2f17989e0e70f732ae5466b9450ffe08897 Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Tue, 24 Dec 2024 15:47:00 +0800 Subject: [PATCH 1/4] =?UTF-8?q?Datakit=E6=A0=A1=E9=AA=8C=E9=80=82=E9=85=8D?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E6=94=B6=E9=9B=86=E9=9C=80=E6=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datachecker/check/CheckApplication.java | 3 +- .../validator/RuleConfigTableValidator.java | 3 +- .../load/CheckConfigDistributeLoader.java | 7 ++-- .../check/load/CheckDatabaseLoader.java | 3 +- .../modules/check/IncrementCheckThread.java | 6 ++- .../report/CheckResultManagerService.java | 8 ++-- .../report/SliceCheckResultManager.java | 7 ++-- .../check/service/CheckPointSwapRegister.java | 8 ++-- .../service/CheckTableStructureService.java | 2 +- .../check/service/KafkaServiceManager.java | 6 ++- .../check/service/impl/CheckServiceImpl.java | 7 ++-- .../check/slice/SliceCheckWorker.java | 5 ++- .../check/slice/TableCheckWorker.java | 15 ++------ .../common/entry/enums/ErrorCode.java | 37 +++++++++++++++++-- .../common/exception/CheckThreadFactory.java | 4 +- .../GlobalCommonExceptionHandler.java | 8 ++-- .../common/service/ProcessLogService.java | 37 ++++++++++++++----- .../thread/CheckUncaughtExceptionHandler.java | 3 +- .../common/thread/DiscardOldestPolicy.java | 8 ++-- .../config/GlobalExtractExceptionHandler.java | 8 ++-- .../extract/data/BaseDataService.java | 3 +- .../access/AbstractDataAccessService.java | 15 ++++---- .../data/access/CsvDataAccessService.java | 12 ++++-- .../data/access/MysqlDataAccessService.java | 3 +- .../data/access/OpgsDataAccessService.java | 3 +- .../extract/data/csv/CsvReaderListener.java | 3 +- .../extract/data/csv/CsvWriterListener.java | 5 ++- .../extract/debezium/DebeziumWorker.java | 3 +- .../IncrementDataAnalysisService.java | 7 ++-- .../extract/resource/ConnectionMgr.java | 2 +- .../service/DataExtractServiceImpl.java | 8 ++-- .../slice/ExtractPointSwapManager.java | 4 +- .../extract/slice/SliceDispatcher.java | 3 +- .../slice/SliceStatusFeedbackService.java | 4 +- .../extract/slice/TableDispatcher.java | 3 +- .../slice/process/CsvTableProcessor.java | 13 ++++--- .../slice/process/JdbcSliceProcessor.java | 11 +++--- .../slice/process/JdbcTableProcessor.java | 7 ++-- .../datachecker/extract/task/CheckPoint.java | 3 +- .../extract/task/DataManipulationService.java | 3 +- .../extract/task/ResultSetHandler.java | 6 ++- .../functional/SimpleTypeHandlerFactory.java | 7 ++-- .../extract/dao/BaseMysqlMapper.java | 2 + 43 files changed, 207 insertions(+), 108 deletions(-) diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/CheckApplication.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/CheckApplication.java index 6869973..bd657f1 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/CheckApplication.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/CheckApplication.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.check; import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.check.cmd.CheckCommandLine; +import org.opengauss.datachecker.common.entry.enums.ErrorCode; import org.opengauss.datachecker.common.service.CommonCommandLine.CmdOption; import org.opengauss.datachecker.common.util.LogUtils; import org.springframework.boot.SpringApplication; @@ -51,7 +52,7 @@ public class CheckApplication { application.run(args); } } catch (Throwable er) { - log.error("server start has unknown error", er); + log.error("{}server start has unknown error", ErrorCode.CHECK_START_ERROR, er); } } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/validator/RuleConfigTableValidator.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/validator/RuleConfigTableValidator.java index 7a216af..1f1d3ec 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/validator/RuleConfigTableValidator.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/validator/RuleConfigTableValidator.java @@ -19,6 +19,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.check.annotation.TableRule; import org.opengauss.datachecker.common.entry.common.Rule; +import org.opengauss.datachecker.common.entry.enums.ErrorCode; import org.opengauss.datachecker.common.util.LogUtils; import javax.validation.ConstraintValidatorContext; @@ -54,7 +55,7 @@ public class RuleConfigTableValidator implements RuleConfigValidator List whiteKeyList = fetchRuleByPredicate(values, rule -> rule.getName().equalsIgnoreCase(RULE_WHITE)); List blackKeyList = fetchRuleByPredicate(values, rule -> rule.getName().equalsIgnoreCase(RULE_BLACK)); if (CollectionUtils.isNotEmpty(whiteKeyList) && CollectionUtils.isNotEmpty(blackKeyList)) { - log.error("RuleConfig of table rule , black rule ={} is invalid rule", blackKeyList); + log.error("{}RuleConfig of table rule , black rule ={} is invalid rule", ErrorCode.RULE_CONFIG_ERROR, blackKeyList); values.removeAll(blackKeyList); } List rules = filterRepeatBy(values, Rule::getText); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckConfigDistributeLoader.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckConfigDistributeLoader.java index 739251b..1bfb9c8 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckConfigDistributeLoader.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckConfigDistributeLoader.java @@ -26,6 +26,7 @@ import org.opengauss.datachecker.common.entry.common.GlobalConfig; import org.opengauss.datachecker.common.entry.common.Rule; import org.opengauss.datachecker.common.entry.csv.CsvPathConfig; import org.opengauss.datachecker.common.entry.enums.CheckMode; +import org.opengauss.datachecker.common.entry.enums.ErrorCode; import org.opengauss.datachecker.common.entry.enums.RuleType; import org.opengauss.datachecker.common.entry.extract.Database; import org.opengauss.datachecker.common.exception.CheckingException; @@ -34,6 +35,7 @@ import org.springframework.core.annotation.Order; import org.springframework.stereotype.Service; import javax.annotation.Resource; + import java.util.List; import java.util.Map; import java.util.Objects; @@ -85,8 +87,8 @@ public class CheckConfigDistributeLoader extends AbstractCheckLoader { } checkEnvironment.addRules(rules); LogUtils.info(log, "check service distribute config success."); - } catch (Exception ignore) { - LogUtils.error(log, "distribute config error: ", ignore); + } catch (Exception ex) { + LogUtils.error(log, "{}distribute config error: ", ErrorCode.DISPATCH_CONFIG, ex); throw new CheckingException("distribute config error"); } } @@ -96,7 +98,6 @@ public class CheckConfigDistributeLoader extends AbstractCheckLoader { globalConfig.setRules(rules); globalConfig.setCheckMode(checkMode); globalConfig.setProcessPath(checkProperties.getDataPath()); - globalConfig.addProperties(ConfigConstants.PROCESS_NO); globalConfig.addIntProperties(ConfigConstants.REST_API_PAGE_SIZE); globalConfig.addIntProperties(ConfigConstants.MAXIMUM_TOPIC_SIZE); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckDatabaseLoader.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckDatabaseLoader.java index 60ee6c3..158e656 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckDatabaseLoader.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckDatabaseLoader.java @@ -19,6 +19,7 @@ import org.opengauss.datachecker.check.client.FeignClientService; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.constant.ConfigConstants; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.entry.enums.ErrorCode; import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames; import org.opengauss.datachecker.common.entry.extract.Database; import org.opengauss.datachecker.common.entry.extract.ExtractConfig; @@ -55,7 +56,7 @@ public class CheckDatabaseLoader extends AbstractCheckLoader { while (retry <= maxRetryTimes && (sourceConfig == null || sinkConfig == null)) { sourceConfig = feignClient.getEndpointConfig(Endpoint.SOURCE); sinkConfig = feignClient.getEndpointConfig(Endpoint.SINK); - LogUtils.error(log, "load database configuration ,retry={}", retry); + LogUtils.error(log, "{}load database configuration ,retry={}", ErrorCode.LOAD_DATABASE_CONFIG, retry); ThreadUtil.sleepOneSecond(); retry++; } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java index 2e8200c..073aed2 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.check.modules.check; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; + import org.apache.commons.collections4.CollectionUtils; import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.check.cache.CheckRateCache; @@ -35,6 +36,7 @@ import org.opengauss.datachecker.common.entry.check.IncrementDataCheckParam; import org.opengauss.datachecker.common.entry.check.Pair; import org.opengauss.datachecker.common.entry.enums.CheckMode; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.entry.enums.ErrorCode; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; import org.opengauss.datachecker.common.entry.extract.TableMetadata; @@ -78,7 +80,7 @@ public class IncrementCheckThread extends Thread { private final List sourceBucketList = new ArrayList<>(); private final List sinkBucketList = new ArrayList<>(); private final DifferencePair, Map, Map>> - difference = DifferencePair.of(new HashMap<>(), new HashMap<>(), new HashMap<>()); + difference = DifferencePair.of(new HashMap<>(), new HashMap<>(), new HashMap<>()); private final Map> bucketNumberDiffMap = new HashMap<>(); private final QueryRowDataWapper queryRowDataWapper; private final CheckResultManagerService checkResultManagerService; @@ -157,7 +159,7 @@ public class IncrementCheckThread extends Thread { checkRateCache.add(buildCheckTable()); stopWatch.stop(); } catch (Exception ex) { - log.error("check error", ex); + log.error("{}check error", ErrorCode.INCREMENT_CHECK_ERROR, ex); } finally { log.info(" {} check {} ", process, stopWatch.shortSummary()); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/CheckResultManagerService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/CheckResultManagerService.java index f332433..d9c3d7c 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/CheckResultManagerService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/CheckResultManagerService.java @@ -29,6 +29,7 @@ import org.opengauss.datachecker.common.constant.ConfigConstants; import org.opengauss.datachecker.common.entry.check.CheckPartition; import org.opengauss.datachecker.common.entry.common.RepairEntry; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.entry.enums.ErrorCode; import org.opengauss.datachecker.common.entry.report.CheckProgress; import org.opengauss.datachecker.common.entry.report.CheckSummary; import org.opengauss.datachecker.common.util.FileUtils; @@ -41,6 +42,7 @@ import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Service; import javax.annotation.Resource; + import java.io.File; import java.time.LocalDateTime; import java.util.LinkedList; @@ -77,7 +79,7 @@ public class CheckResultManagerService implements ApplicationContextAware { /** * Add Merkel verification result * - * @param checkPartition checkPartition + * @param checkPartition checkPartition * @param checkDiffResult checkDiffResult */ public void addResult(CheckPartition checkPartition, CheckDiffResult checkDiffResult) { @@ -92,7 +94,7 @@ public class CheckResultManagerService implements ApplicationContextAware { /** * Summary of verification results * - * @param tableName tableName + * @param tableName tableName * @param checkDiffResult checkDiffResult */ public void addNoCheckedResult(String tableName, CheckDiffResult checkDiffResult) { @@ -115,7 +117,7 @@ public class CheckResultManagerService implements ApplicationContextAware { reduceFailedRepair(logFilePath, failedList, isOgCompatibility); reduceSummary(successList, failedList); } catch (Exception exception) { - log.error("summaryCheckResult ", exception); + log.error("{}summaryCheckResult ", ErrorCode.SUMMARY_CHECK_RESULT, exception); } finally { checkResultCache.clear(); noCheckedCache.clear(); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java index 61c761d..a6a5605 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java @@ -31,6 +31,7 @@ import org.opengauss.datachecker.common.entry.common.RepairEntry; import org.opengauss.datachecker.common.entry.enums.CheckMode; import org.opengauss.datachecker.common.entry.enums.DML; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.entry.enums.ErrorCode; import org.opengauss.datachecker.common.entry.extract.Database; import org.opengauss.datachecker.common.entry.extract.SliceVo; import org.opengauss.datachecker.common.entry.report.CheckCsvFailed; @@ -390,7 +391,7 @@ public class SliceCheckResultManager { final List updateRepairs = feignClient.buildRepairStatementUpdateDml(Endpoint.SOURCE, update); appendLogFile(repairFile, updateRepairs); } catch (Exception ex) { - log.error("build table {} update repair file {}", tableFailed.getTable(), ex.getMessage()); + log.error("{}build table {} update repair file {}", ErrorCode.BUILD_DIFF_STATEMENT, tableFailed.getTable(), ex.getMessage()); } } } @@ -417,7 +418,7 @@ public class SliceCheckResultManager { final List insertRepairs = feignClient.buildRepairStatementInsertDml(Endpoint.SOURCE, insert); appendLogFile(repairFile, insertRepairs); } catch (Exception ex) { - log.error("build table {} insert repair file {}", tableFailed.getTable(), ex.getMessage()); + log.error("{}build table {} insert repair file {}", ErrorCode.BUILD_DIFF_STATEMENT, tableFailed.getTable(), ex.getMessage()); } } } @@ -432,7 +433,7 @@ public class SliceCheckResultManager { final List deleteRepairs = feignClient.buildRepairStatementDeleteDml(Endpoint.SOURCE, delete); appendLogFile(repairFile, deleteRepairs); } catch (Exception ex) { - log.error("build table {} delete repair file {}", tableFailed.getTable(), ex.getMessage()); + log.error("{}build table {} delete repair file {}", ErrorCode.BUILD_DIFF_STATEMENT, tableFailed.getTable(), ex.getMessage()); } } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckPointSwapRegister.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckPointSwapRegister.java index 99b0133..98910e0 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckPointSwapRegister.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckPointSwapRegister.java @@ -31,6 +31,7 @@ import org.opengauss.datachecker.common.entry.common.CheckPointBean; import org.opengauss.datachecker.common.entry.common.CheckPointData; import org.opengauss.datachecker.common.entry.common.PointPair; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.entry.enums.ErrorCode; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.ThreadUtil; import org.springframework.kafka.core.KafkaTemplate; @@ -133,7 +134,7 @@ public class CheckPointSwapRegister { sendCheckPointData.getTableName(), sendCheckPointData.getCheckPointList().size()); } } catch (Exception ex) { - log.error("checkPointSender error {}", table, ex); + log.error("{}checkPointSender error {}", ErrorCode.DISPATCH_SLICE_POINT, table, ex); } } } @@ -145,7 +146,8 @@ public class CheckPointSwapRegister { kafkaTemplate.send(topic, key, JSONObject.toJSONString(tmpBean)); } catch (TimeoutException ex) { if (reTryTimes > MAX_RETRY_TIMES) { - log.error("send msg to kafka timeout, topic: {} key: {} reTryTimes: {}", topic, key, reTryTimes); + log.error("{}send msg to kafka timeout, topic: {} key: {} reTryTimes: {}", + ErrorCode.SEND_SLICE_POINT_TIMEOUT, topic, key, reTryTimes); } ThreadUtil.sleepLongCircle(++reTryTimes); sendMsg(topic, key, tmpBean, reTryTimes); @@ -176,7 +178,7 @@ public class CheckPointSwapRegister { if (Objects.equals("java.lang.InterruptedException", ex.getMessage())) { LogUtils.warn(log, "kafka consumer stop by Interrupted"); } else { - LogUtils.error(log, "pollSwapPoint ", ex); + LogUtils.error(log, "{}pollSwapPoint ", ErrorCode.POLL_SLICE_POINT, ex); } } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckTableStructureService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckTableStructureService.java index e3fe83b..dec3af2 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckTableStructureService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/CheckTableStructureService.java @@ -177,7 +177,7 @@ public class CheckTableStructureService { .build(); taskManagerService.refreshTableExtractStatus(tableName, Endpoint.CHECK, -1); sliceCheckResultManager.addTableStructureDiffResult(tableName, result); - LogUtils.error(log, "compared the field names in table[{}](case ignored) and the result is not match", + LogUtils.warn(log, "compared the field names in table[{}](case ignored) and the result is not match", tableName); return onlyExistEndpoint; } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/KafkaServiceManager.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/KafkaServiceManager.java index 9783a38..0759192 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/KafkaServiceManager.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/KafkaServiceManager.java @@ -27,6 +27,7 @@ import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.check.config.KafkaConsumerConfig; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.constant.ConfigConstants; +import org.opengauss.datachecker.common.entry.enums.ErrorCode; import org.opengauss.datachecker.common.util.IdGenerator; import org.opengauss.datachecker.common.util.LogUtils; import org.springframework.kafka.KafkaException; @@ -35,6 +36,7 @@ import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import javax.annotation.Resource; + import java.time.Duration; import java.util.Collection; import java.util.HashMap; @@ -108,7 +110,7 @@ public class KafkaServiceManager { LogUtils.info(log, "init and listTopics admin client [{}]", ConfigCache.getValue(ConfigConstants.KAFKA_SERVERS)); } catch (ExecutionException | InterruptedException ex) { - log.error("kafka Client link exception: ", ex); + log.error("{}kafka Client link exception: ", ErrorCode.KAFKA_INIT_CONFIG, ex); throw new KafkaException("kafka Client link exception"); } kafkaConsumerConfig.initConsumerPool(); @@ -130,7 +132,7 @@ public class KafkaServiceManager { LogUtils.info(log, "create topic success , name= [{}] numPartitions = [{}]", topic, partitions); return true; } catch (InterruptedException | ExecutionException | TimeoutException e) { - LogUtils.error(log, "create tioic error : ", e); + LogUtils.error(log, "{}create topic error : ", ErrorCode.KAFKA_CREATE_TOPIC, e); return false; } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java index 1c8b044..6e9c6f8 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java @@ -24,6 +24,7 @@ import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.constant.ConfigConstants; import org.opengauss.datachecker.common.entry.enums.CheckMode; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.entry.enums.ErrorCode; import org.opengauss.datachecker.common.entry.extract.ExtractTask; import org.opengauss.datachecker.common.entry.extract.PageExtract; import org.opengauss.datachecker.common.exception.CheckingException; @@ -35,6 +36,7 @@ import org.springframework.stereotype.Service; import org.springframework.util.Assert; import javax.annotation.Resource; + import java.time.LocalDateTime; import java.util.List; import java.util.Objects; @@ -69,8 +71,7 @@ public class CheckServiceImpl implements CheckService { @Override public String start(CheckMode checkMode) { Assert.isTrue(checkEnvironment.isLoadMetaSuccess(), "current meta data is loading, please wait a moment"); - LogUtils.info(log, CheckMessage.CHECK_SERVICE_STARTING, checkEnvironment.getCheckMode() - .getCode()); + LogUtils.info(log, CheckMessage.CHECK_SERVICE_STARTING, checkEnvironment.getCheckMode().getCode()); Assert.isTrue(Objects.equals(CheckMode.FULL, checkEnvironment.getCheckMode()), "current check mode is " + CheckMode.INCREMENT.getDescription() + " , not start full check."); if (STARTED.compareAndSet(false, true)) { @@ -82,7 +83,7 @@ public class CheckServiceImpl implements CheckService { } } else { String message = String.format(CheckMessage.CHECK_SERVICE_START_ERROR, checkMode.getDescription()); - LogUtils.error(log, message); + LogUtils.error(log, "{}" + message, ErrorCode.CHECK_START_ERROR); cleanCheck(); throw new CheckingException(message); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java index 86dbfbb..b9dc25b 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java @@ -40,6 +40,7 @@ import org.opengauss.datachecker.common.entry.check.DifferencePair; import org.opengauss.datachecker.common.entry.check.Pair; import org.opengauss.datachecker.common.entry.enums.CheckMode; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.entry.enums.ErrorCode; import org.opengauss.datachecker.common.entry.extract.ConditionLimit; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.SliceExtend; @@ -141,7 +142,7 @@ public class SliceCheckWorker implements Runnable { compareNoMerkleTree(sourceTuple, sinkTuple); } } catch (Exception ignore) { - LogUtils.error(LOGGER, "check table has some error,", ignore); + LogUtils.error(LOGGER, "{}check table has some error,", ErrorCode.CHECK_SLICE_EXCEPTION, ignore); errorMsg = ignore.getMessage(); } finally { try { @@ -150,7 +151,7 @@ public class SliceCheckWorker implements Runnable { cleanCheckThreadEnvironment(); finishedSliceCheck(slice); } catch (Exception exception) { - LogUtils.error(LOGGER, "refresh check {} error:", slice.getName(), exception); + LogUtils.error(LOGGER, "{}refresh check {} error:",ErrorCode.CHECK_SLICE_EXCEPTION, slice.getName(), exception); } LogUtils.info(LOGGER, "check slice of {} end.", slice.getName()); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/TableCheckWorker.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/TableCheckWorker.java index 9d9c85b..5217a3e 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/TableCheckWorker.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/TableCheckWorker.java @@ -38,6 +38,7 @@ import org.opengauss.datachecker.common.entry.check.DifferencePair; import org.opengauss.datachecker.common.entry.check.Pair; import org.opengauss.datachecker.common.entry.enums.CheckMode; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.entry.enums.ErrorCode; import org.opengauss.datachecker.common.entry.extract.*; import org.opengauss.datachecker.common.exception.BucketNumberInconsistentException; import org.opengauss.datachecker.common.exception.CheckConsumerPollEmptyException; @@ -104,9 +105,9 @@ public class TableCheckWorker implements Runnable { setTableFixedTopic(); log.info("check table of {}", slice.getName()); checkedTableSliceByTopicPartition(source, sink); - } catch (Exception ignore) { - log.error("check table has some error,", ignore); - errorMsg = ignore.getMessage(); + } catch (Exception ex) { + log.error("{}check table has some error,", ErrorCode.CHECK_TABLE_EXCEPTION,ex); + errorMsg = ex.getMessage(); } finally { refreshSliceCheckProgress(); checkResult(errorMsg); @@ -361,14 +362,6 @@ public class TableCheckWorker implements Runnable { bucketList.sort(Comparator.comparingInt(Bucket::getNumber)); } -// private void getSliceDataFromTopicPartition(TopicPartition topicPartition, List dataList) { -// KafkaConsumerHandler consumer = checkContext.buildKafkaHandler(slice.getTable()); -// logKafka.debug("create consumer of topic, [{}] : [{}] ", slice.getTable(), topicPartition.toString()); -// consumer.poolTopicPartitionsData(topicPartition.topic(), topicPartition.partition(), dataList); -// consumer.closeConsumer(); -// logKafka.debug("close consumer of topic, [{}] : [{}] ", slice.getTable(), topicPartition.toString()); -// } - /** *
      * Align the bucket list data according to the statistical results of source
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/enums/ErrorCode.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/enums/ErrorCode.java
index cce63fe..f15ba08 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/enums/ErrorCode.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/enums/ErrorCode.java
@@ -15,9 +15,40 @@ import java.util.Locale;
  */
 public enum ErrorCode {
     UNKNOWN(5000, "未知异常", "Unknown error"),
-    INCORRECT_CONFIGURATION(5100, "参数配置错误", "There is an error in the parameter configuration"),
-    IO_EXCEPTION(5200, "文件读写异常", "IO exception"),
-    SQL_EXCEPTION(5300, "SQL执行失败", "SQL execution failed");
+    CHECK_START_ERROR(5001, "校验进程启动异常", "check process start error"),
+    CHECK_SLICE_EXCEPTION(5002, "分片校验异常", "slice check exception"),
+    REFRESH_SLICE_STATUS_EXCEPTION(5003, "更新分片校验异状态常", "refresh slice check status exception"),
+    RULE_CONFIG_ERROR(5004, "过滤规则配置错误", "filter rule configuration error"),
+    LOAD_DATABASE_CONFIG(5005, "加载数据库配置信息失败", "load database configuration information failed"),
+    INCREMENT_CHECK_ERROR(5006, "增量校验任务执行异常", "increment check thread error"),
+    SUMMARY_CHECK_RESULT(5007, "汇总校验结果异常", "summary check result exception"),
+    BUILD_DIFF_STATEMENT(5008, "构建修复语句异常", "build diff statement exception"),
+    DISPATCH_SLICE_POINT(5009, "下发分片分割点异常", "dispatch slice point exception"),
+    SEND_SLICE_POINT_TIMEOUT(5010, "kafka client发送分片分割点超时", "kafka client send slice point timeout"),
+    POLL_SLICE_POINT(5011, "kafka consumer拉取分片分割点异常", "kafka consumer poll slice point exception"),
+    KAFKA_INIT_CONFIG(5012, "kafka初始化配置异常", "kafka init config exception"),
+    KAFKA_CREATE_TOPIC(5013, "kafka创建主题失败", "kafka create topic failed"),
+    CHECK_TABLE_EXCEPTION(5014, "表校验任务异常", "table check task exception"),
+    PROCESS_LOG(5015, "progress日志记录异常", "process log record exception"),
+    CSV_METADATA_NOT_EXIST(5016, "CSV元数据文件不存在", "csv metadata file not exist"),
+    CSV_LOAD_METADATA_ERROR(5017, "CSV加载表元数据异常", "csv load metadata error"),
+    REGISTER_SLICE_POINT(5018, "注册分片分割点异常", "register slice point exception"),
+    CSV_TABLE_DISPATCHER(5019, "CSV表分发异常", "csv table dispatcher exception"),
+    DEBEZIUM_WORKER(5020, "debezium监听异常", "debezium worker exception"),
+    EXECUTE_SLICE_QUERY(5021, "执行分片查询异常", "execute slice query exception"),
+    EXECUTE_SLICE_PAGE_QUERY(5022, "执行分片分页查询异常", "execute slice page query exception"),
+    EXECUTE_SLICE_PROCESSOR(5023, "分片抽取异常", "execute slice processor exception"),
+    TABLE_COL_NULL(5024, "查询列元数据为空", "query column metadata is null"),
+    EXECUTE_QUERY_SQL(5025, "查询SQL异常", "execute query sql exception"),
+    CSV_READER_LISTENER(5026, "CSV reader 监听异常", "csv reader listener exception"),
+    CSV_WRITER_LISTENER(5027, "CSV writer 监听异常", "csv writer listener exception"),
+    DISPATCH_CONFIG(5028, "下发配置信息异常", "dispatch config exception"),
+    INCREMENT_LISTENER(5029, "增量监听异常", "increment listener exception"),
+    CSV_TABLE_PROCESSOR(5030, "CSV 表数据抽取异常", "csv table processor exception"),
+    KAFKA_LOG_CONFIG(5031, "kafka日志配置异常", "kafka log config exception"),
+    BUILD_SLICE_POINT(5032, "生成表分割点异常", "build slice point exception"),
+    ASYNC_EXTRACT_TABLE(5033, "同步抽取表信息异常", "async extract table info exception"),
+    FEEDBACK_SLICE_STATUS(5034, "反馈分片抽取状态异常", "feedback slice status exception");
 
     private final int code;
     private final String causeCn;
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/CheckThreadFactory.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/CheckThreadFactory.java
index 5fae371..bcb2c77 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/CheckThreadFactory.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/CheckThreadFactory.java
@@ -16,6 +16,7 @@
 package org.opengauss.datachecker.common.exception;
 
 import org.apache.logging.log4j.Logger;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.util.LogUtils;
 
 import java.util.concurrent.ThreadFactory;
@@ -72,8 +73,9 @@ public class CheckThreadFactory implements ThreadFactory {
  */
 class CheckUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
     private static final Logger log = LogUtils.getLogger();
+
     @Override
     public void uncaughtException(Thread thread, Throwable throwable) {
-        log.error("{} exception: ", thread.getName(), throwable);
+        log.error("{}{} exception: ", ErrorCode.UNKNOWN, thread.getName(), throwable);
     }
 }
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/GlobalCommonExceptionHandler.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/GlobalCommonExceptionHandler.java
index c578705..607096e 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/GlobalCommonExceptionHandler.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/exception/GlobalCommonExceptionHandler.java
@@ -16,6 +16,7 @@
 package org.opengauss.datachecker.common.exception;
 
 import org.apache.logging.log4j.Logger;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.enums.ResultEnum;
 import org.opengauss.datachecker.common.util.LogUtils;
 import org.opengauss.datachecker.common.web.Result;
@@ -35,6 +36,7 @@ import javax.servlet.http.HttpServletRequest;
  */
 public class GlobalCommonExceptionHandler {
     private static final Logger log = LogUtils.getLogger();
+
     /**
      * Missing required parameters
      *
@@ -132,10 +134,10 @@ public class GlobalCommonExceptionHandler {
      * Log errors
      *
      * @param request request
-     * @param exp     exception
+     * @param exp exception
      */
     protected void logError(HttpServletRequest request, Exception exp) {
-        log.error("path:{}, queryParam:{}, errorMessage:{}", request.getRequestURI(), request.getQueryString(),
-            exp.getMessage(), exp);
+        log.error("{}path:{}, queryParam:{}, errorMessage:{}", ErrorCode.UNKNOWN, request.getRequestURI(),
+            request.getQueryString(), exp.getMessage(), exp);
     }
 }
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ProcessLogService.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ProcessLogService.java
index 9aa1551..1d3b3c8 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ProcessLogService.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ProcessLogService.java
@@ -1,12 +1,30 @@
+/*
+ * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd.
+ *
+ * openGauss is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ *
+ *           http://license.coscl.org.cn/MulanPSL2
+ *
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ */
+
 package org.opengauss.datachecker.common.service;
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.annotation.JSONType;
+
 import lombok.Data;
 import lombok.experimental.Accessors;
+
 import org.apache.logging.log4j.Logger;
 import org.opengauss.datachecker.common.config.ConfigCache;
 import org.opengauss.datachecker.common.entry.enums.Endpoint;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.util.FileUtils;
 import org.opengauss.datachecker.common.util.LogUtils;
 import org.springframework.stereotype.Service;
@@ -17,6 +35,8 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
+ * ProcessLogService
+ *
  * @author :wangchao
  * @date :Created in 2023/11/17
  * @since :11
@@ -47,9 +67,9 @@ public class ProcessLogService {
 
     public void saveProcessHistoryLogging(String tableName, int order) {
         ProcessingLog processingLog = new ProcessingLog().setEndpoint(ConfigCache.getEndPoint())
-                                                         .setTable(tableName)
-                                                         .setOrder(order)
-                                                         .setFinishedTime(LocalDateTime.now());
+            .setTable(tableName)
+            .setOrder(order)
+            .setFinishedTime(LocalDateTime.now());
         if (logRootPath == null) {
             logRootPath = ConfigCache.getCheckResult();
         }
@@ -63,19 +83,18 @@ public class ProcessLogService {
 
     private void saveProcessLog(String event) {
         try {
-            String name = ManagementFactory.getRuntimeMXBean()
-                                           .getName();
+            String name = ManagementFactory.getRuntimeMXBean().getName();
             String pid = name.split("@")[0];
             ProcessLog logProcess = new ProcessLog().setEndpoint(ConfigCache.getEndPoint())
-                                                    .setPid(pid)
-                                                    .setEvent(event)
-                                                    .setExecTime(LocalDateTime.now());
+                .setPid(pid)
+                .setEvent(event)
+                .setExecTime(LocalDateTime.now());
             if (logPath == null) {
                 logPath = ConfigCache.getCheckResult() + processLog;
             }
             FileUtils.writeAppendFile(logPath, JSONObject.toJSONString(logProcess) + System.lineSeparator());
         } catch (Exception ex) {
-            log.error("save process log error: {} ", ex.getMessage());
+            log.error("{}save process log error: {} ", ErrorCode.PROCESS_LOG, ex.getMessage());
         }
     }
 
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/thread/CheckUncaughtExceptionHandler.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/thread/CheckUncaughtExceptionHandler.java
index 1301a33..dbc2ade 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/thread/CheckUncaughtExceptionHandler.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/thread/CheckUncaughtExceptionHandler.java
@@ -16,6 +16,7 @@
 package org.opengauss.datachecker.common.thread;
 
 import org.apache.logging.log4j.Logger;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 
 /**
  * CheckUncaughtExceptionHandler
@@ -39,6 +40,6 @@ public class CheckUncaughtExceptionHandler implements Thread.UncaughtExceptionHa
     @Override
     public void uncaughtException(Thread t, Throwable e) {
         String msg = String.format("getException from thread: %s,exceptionName:%s", t.getName(), e.getMessage());
-        logger.error(msg, e);
+        logger.error("{}"+msg, ErrorCode.UNKNOWN, e);
     }
 }
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/thread/DiscardOldestPolicy.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/thread/DiscardOldestPolicy.java
index d4de6aa..04b61b6 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/thread/DiscardOldestPolicy.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/thread/DiscardOldestPolicy.java
@@ -16,6 +16,7 @@
 package org.opengauss.datachecker.common.thread;
 
 import org.apache.logging.log4j.Logger;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
@@ -47,9 +48,10 @@ public class DiscardOldestPolicy extends ThreadPoolExecutor.DiscardOldestPolicy
         final long rejectedSum = discard.incrementAndGet();
         if (rejectedSum == 1 || rejectedSum % 100 == 0) {
             logger.error(
-                "DiscardOldest worker, had discard {}, taskCount {}, completedTaskCount {}, largestPoolSize {},"
-                    + "getPoolSize {}, getActiveCount {}, getThreadName {}", rejectedSum, e.getTaskCount(),
-                e.getCompletedTaskCount(), e.getLargestPoolSize(), e.getPoolSize(), e.getActiveCount(), threadName);
+                "{}DiscardOldest worker, had discard {}, taskCount {}, completedTaskCount {}, largestPoolSize {},"
+                    + "getPoolSize {}, getActiveCount {}, getThreadName {}", ErrorCode.UNKNOWN, rejectedSum,
+                e.getTaskCount(), e.getCompletedTaskCount(), e.getLargestPoolSize(), e.getPoolSize(),
+                e.getActiveCount(), threadName);
         }
     }
 }
\ No newline at end of file
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/GlobalExtractExceptionHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/GlobalExtractExceptionHandler.java
index f627ea9..7421f64 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/GlobalExtractExceptionHandler.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/GlobalExtractExceptionHandler.java
@@ -16,6 +16,7 @@
 package org.opengauss.datachecker.extract.config;
 
 import org.apache.logging.log4j.Logger;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.enums.ResultEnum;
 import org.opengauss.datachecker.common.exception.*;
 import org.opengauss.datachecker.common.util.LogUtils;
@@ -34,12 +35,13 @@ import javax.servlet.http.HttpServletRequest;
  */
 @RestControllerAdvice
 public class GlobalExtractExceptionHandler extends GlobalCommonExceptionHandler {
-    private static final String MESSAGE_TEMPLATE = "path:{}, queryParam:[{}] , error:";
+    private static final String MESSAGE_TEMPLATE = "{}path:{}, queryParam:[{}] , error:";
     private static final Logger log = LogUtils.getLogger();
+
     /**
      * service ExtractException exception handing
      *
-     * @param request   request
+     * @param request request
      * @param exception exception
      * @return request result
      */
@@ -128,6 +130,6 @@ public class GlobalExtractExceptionHandler extends GlobalCommonExceptionHandler
     }
 
     private void logError(HttpServletRequest request, ExtractException exception) {
-        log.error(MESSAGE_TEMPLATE, request.getRequestURI(), request.getQueryString(), exception);
+        log.error(MESSAGE_TEMPLATE, ErrorCode.UNKNOWN, request.getRequestURI(), request.getQueryString(), exception);
     }
 }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java
index 5d221fd..47b0967 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java
@@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
 import org.opengauss.datachecker.common.config.ConfigCache;
 import org.opengauss.datachecker.common.constant.ConfigConstants;
 import org.opengauss.datachecker.common.entry.enums.ColumnKey;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData;
 import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean;
 import org.opengauss.datachecker.common.entry.extract.TableMetadata;
@@ -197,7 +198,7 @@ public class BaseDataService {
         String tableName = tableMetadata.getTableName();
         final List columns = dataAccessService.queryTableColumnsMetaData(tableName);
         if (CollectionUtils.isEmpty(columns)) {
-            LogUtils.error(log, "table columns metadata is null ,{}", tableName);
+            LogUtils.error(log, "{}table columns metadata is null ,{}", ErrorCode.TABLE_COL_NULL, tableName);
             return;
         }
         List tempPrimaryColumnBeans = primaryColumnBeans;
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java
index dec8187..4fa8ea5 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java
@@ -26,6 +26,7 @@ import org.opengauss.datachecker.common.constant.ConfigConstants;
 import org.opengauss.datachecker.common.entry.check.Difference;
 import org.opengauss.datachecker.common.entry.common.Health;
 import org.opengauss.datachecker.common.entry.common.PointPair;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames;
 import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean;
 import org.opengauss.datachecker.common.entry.extract.TableMetadata;
@@ -169,7 +170,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
                 list.add(resultSet.getString(RS_COL_TABLE_NAME));
             }
         } catch (SQLException esql) {
-            LogUtils.error(log, "adasQueryTableNameList error ", esql);
+            LogUtils.error(log, "{}adasQueryTableNameList error ", ErrorCode.EXECUTE_QUERY_SQL, esql);
         } finally {
             closeConnection(connection);
         }
@@ -198,7 +199,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
                 list.add(metadata);
             }
         } catch (SQLException esql) {
-            LogUtils.error(log, "adasQueryTablePrimaryColumns error:", esql);
+            LogUtils.error(log, "{}adasQueryTablePrimaryColumns error:", ErrorCode.EXECUTE_QUERY_SQL, esql);
         } finally {
             closeConnection(connection);
         }
@@ -228,7 +229,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
                 list.add(metadata);
             }
         } catch (SQLException esql) {
-            LogUtils.error(log, "adasQueryTablePrimaryColumns error:", esql);
+            LogUtils.error(log, "{}adasQueryTablePrimaryColumns error:", ErrorCode.EXECUTE_QUERY_SQL, esql);
         } finally {
             closeConnection(connection);
         }
@@ -273,7 +274,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
                 list.add(metadata);
             }
         } catch (SQLException esql) {
-            LogUtils.error(log, "adasQueryTableMetadataList error: ", esql);
+            LogUtils.error(log, "{}adasQueryTableMetadataList error: ", ErrorCode.EXECUTE_QUERY_SQL, esql);
         } finally {
             closeConnection(connection);
         }
@@ -297,7 +298,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
                 list.add(resultSet.getString(1));
             }
         } catch (SQLException esql) {
-            LogUtils.error(log, "adasQueryPointList error", esql);
+            LogUtils.error(log, "{}adasQueryPointList error", ErrorCode.EXECUTE_QUERY_SQL, esql);
         }
         LogUtils.debug(log, "adasQueryPointList [{}] cost [{}ms]", sql, DurationUtils.betweenSeconds(start));
         return list;
@@ -317,7 +318,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
                 list.add(new PointPair(resultSet.getString(1), resultSet.getLong(2)));
             }
         } catch (SQLException esql) {
-            LogUtils.error(log, "adasQueryPointList error", esql);
+            LogUtils.error(log, "{}adasQueryPointList error", ErrorCode.EXECUTE_QUERY_SQL, esql);
         }
         return list;
     }
@@ -337,7 +338,7 @@ public abstract class AbstractDataAccessService implements DataAccessService {
                 result = resultSet.getString(1);
             }
         } catch (SQLException esql) {
-            LogUtils.error(log, "adasQueryOnePoint error", esql);
+            LogUtils.error(log, "{}adasQueryOnePoint error", ErrorCode.EXECUTE_QUERY_SQL, esql);
         }
         LogUtils.debug(log, "adasQueryPointList [{}] cost [{}ms]", sql, DurationUtils.betweenSeconds(start));
         return result;
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java
index b1d1645..6466539 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java
@@ -18,6 +18,7 @@ package org.opengauss.datachecker.extract.data.access;
 import com.alibaba.fastjson.JSONObject;
 import com.opencsv.CSVReader;
 import com.opencsv.exceptions.CsvValidationException;
+
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.logging.log4j.Logger;
 import org.opengauss.datachecker.common.config.ConfigCache;
@@ -28,6 +29,7 @@ import org.opengauss.datachecker.common.entry.common.PointPair;
 import org.opengauss.datachecker.common.entry.csv.CsvTableColumnMeta;
 import org.opengauss.datachecker.common.entry.csv.CsvTableMeta;
 import org.opengauss.datachecker.common.entry.enums.ColumnKey;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames;
 import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData;
 import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean;
@@ -39,6 +41,7 @@ import org.opengauss.datachecker.extract.util.CsvUtil;
 import org.springframework.jdbc.core.RowMapper;
 
 import javax.sql.DataSource;
+
 import java.io.FileReader;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -80,7 +83,7 @@ public class CsvDataAccessService implements DataAccessService {
         Path pathOfTables = ConfigCache.getCsvMetadataTablesPath();
         try {
             if (Files.notExists(pathOfTables)) {
-                log.error("csv metadata info does not exist {}", pathOfTables);
+                log.error("{}csv metadata info does not exist {}", ErrorCode.CSV_METADATA_NOT_EXIST, pathOfTables);
                 throw new CsvDataAccessException("csv metadata load failed");
             }
             Path reader = Path.of(ConfigCache.getReader());
@@ -94,7 +97,7 @@ public class CsvDataAccessService implements DataAccessService {
                     .map(CsvTableMeta::getTable)
                     .collect(Collectors.toList());
         } catch (IOException e) {
-            log.error("load table name of csv exception : ", e);
+            log.error("{}load table name of csv exception : ", ErrorCode.CSV_LOAD_METADATA_ERROR, e);
             throw new ExtractDataAccessException("load table name of csv exception");
         }
     }
@@ -109,7 +112,8 @@ public class CsvDataAccessService implements DataAccessService {
             throw new CsvDataAccessException("file " + reader.toString() + " is not exist");
         }
         if (!CsvUtil.checkExistAndWait(pathOfTables) || !CsvUtil.checkExistAndWait(pathOfColumns)) {
-            log.error("csv metadata info does not exist {} or {}", pathOfTables, pathOfColumns);
+            log.error("{}csv metadata info does not exist {} or {}", ErrorCode.CSV_METADATA_NOT_EXIST, pathOfTables,
+                pathOfColumns);
             throw new CsvDataAccessException("file " + reader.toString() + " is not exist");
         }
         try {
@@ -137,7 +141,7 @@ public class CsvDataAccessService implements DataAccessService {
                                 .collect(Collectors.toList()));
                     });
         } catch (IOException e) {
-            log.error("load table name of csv exception : ", e);
+            log.error("{}load table name of csv exception : ", ErrorCode.CSV_LOAD_METADATA_ERROR, e);
             throw new ExtractDataAccessException("load table name of csv exception");
         }
         return new ArrayList<>(tableMetadataMap.values());
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java
index 7bf6a3d..2e638e5 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java
@@ -18,6 +18,7 @@ package org.opengauss.datachecker.extract.data.access;
 import org.opengauss.datachecker.common.entry.common.DataAccessParam;
 import org.opengauss.datachecker.common.entry.common.Health;
 import org.opengauss.datachecker.common.entry.common.PointPair;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames;
 import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData;
 import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean;
@@ -176,7 +177,7 @@ public class MysqlDataAccessService extends AbstractDataAccessService {
                 return LowerCaseTableNames.codeOf(value);
             }
         } catch (SQLException ex) {
-            log.error("queryLowerCaseTableNames error", ex);
+            log.error("{}queryLowerCaseTableNames error", ErrorCode.EXECUTE_QUERY_SQL, ex);
         } finally {
             closeConnection(connection);
         }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java
index fb3851f..4840d83 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java
@@ -20,6 +20,7 @@ import org.opengauss.datachecker.common.constant.ConfigConstants;
 import org.opengauss.datachecker.common.entry.common.DataAccessParam;
 import org.opengauss.datachecker.common.entry.common.Health;
 import org.opengauss.datachecker.common.entry.common.PointPair;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames;
 import org.opengauss.datachecker.common.entry.enums.OgCompatibility;
 import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData;
@@ -224,7 +225,7 @@ public class OpgsDataAccessService extends AbstractDataAccessService {
                 result.put(name, setting);
             }
         } catch (SQLException ex) {
-            log.error("queryLowerCaseTableNames error", ex);
+            log.error("{}queryLowerCaseTableNames error", ErrorCode.EXECUTE_QUERY_SQL, ex);
         } finally {
             closeConnection(connection);
         }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java
index 9180201..91abee3 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.Logger;
 import org.opengauss.datachecker.common.config.ConfigCache;
 import org.opengauss.datachecker.common.entry.enums.Endpoint;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.extract.SliceVo;
 import org.opengauss.datachecker.common.exception.CsvDataAccessException;
 import org.opengauss.datachecker.common.util.LogUtils;
@@ -94,7 +95,7 @@ public class CsvReaderListener implements CsvListener {
                     MapUtils.put(readerSliceMap, slice.getTable(), slice);
                     LogUtils.debug(log,"reader add log : {}", line);
                 } catch (Exception ex) {
-                    LogUtils.error(log,"reader log listener error : " + ex.getMessage());
+                    LogUtils.error(log,"{}reader log listener error : " , ErrorCode.CSV_READER_LISTENER, ex.getMessage());
                 }
             }
         }, ConfigCache.getCsvLogMonitorInterval(), false);
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java
index d3fc332..f609bf2 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java
@@ -17,6 +17,7 @@ package org.opengauss.datachecker.extract.data.csv;
 
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.parser.Feature;
+
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.io.input.Tailer;
 import org.apache.commons.io.input.TailerListenerAdapter;
@@ -26,6 +27,7 @@ import org.opengauss.datachecker.common.config.ConfigCache;
 import org.opengauss.datachecker.common.constant.ConfigConstants;
 import org.opengauss.datachecker.common.entry.csv.SliceIndexVo;
 import org.opengauss.datachecker.common.entry.enums.Endpoint;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.enums.SliceIndexStatus;
 import org.opengauss.datachecker.common.entry.enums.SliceLogType;
 import org.opengauss.datachecker.common.entry.extract.SliceVo;
@@ -97,7 +99,6 @@ public class CsvWriterListener implements CsvListener {
                         return;
                     }
                     logDuplicateCheck.add(contentHash);
-
                     JSONObject writeLog = JSONObject.parseObject(line);
                     if (skipNoInvalidSlice(writeLog)) {
                         LogUtils.warn(log, "writer skip no invalid slice log : {}", line);
@@ -123,7 +124,7 @@ public class CsvWriterListener implements CsvListener {
                     }
                     LogUtils.debug(log, "writer add log : {}", line);
                 } catch (Exception ex) {
-                    LogUtils.error(log, "writer log listener error : {}", line, ex);
+                    LogUtils.error(log, "{}writer log listener error : {}", ErrorCode.CSV_WRITER_LISTENER, line, ex);
                 }
             }
         }, ConfigCache.getCsvLogMonitorInterval(), false);
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java
index 6a11136..9775317 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.logging.log4j.Logger;
 import org.opengauss.datachecker.common.constant.WorkerSwitch;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.util.LogUtils;
 import org.opengauss.datachecker.common.util.ThreadUtil;
 import org.opengauss.datachecker.extract.config.KafkaConsumerConfig;
@@ -89,7 +90,7 @@ public class DebeziumWorker implements Runnable {
                 try {
                     debeziumConsumerListener.listen(record);
                 } catch (Exception ex) {
-                    log.error("DebeziumWorker unknown error, message,{},{}", record.toString(), ex);
+                    log.error("{}DebeziumWorker unknown error, message,{},{}", ErrorCode.DEBEZIUM_WORKER, record.toString(), ex);
                 }
             }
             POLL_BATCH_COUNT.addAndGet(records.count());
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/IncrementDataAnalysisService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/IncrementDataAnalysisService.java
index 36228ba..0e79702 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/IncrementDataAnalysisService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/IncrementDataAnalysisService.java
@@ -19,6 +19,7 @@ import feign.FeignException;
 import lombok.RequiredArgsConstructor;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.logging.log4j.Logger;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.extract.SourceDataLog;
 import org.opengauss.datachecker.common.exception.ExtractException;
 import org.opengauss.datachecker.common.service.ShutdownService;
@@ -118,11 +119,11 @@ public class IncrementDataAnalysisService {
                 dataNumAnalysis();
                 dataTimeAnalysis();
             } catch (FeignException ex) {
-                log.error("check service has an error occurred. {}", ex.getMessage());
+                log.error("{}check service has an error occurred. {}", ErrorCode.INCREMENT_LISTENER, ex.getMessage());
             } catch (ExtractException ex) {
-                log.error("peek debezium topic record offset has an error occurred,", ex);
+                log.error("{}peek debezium topic record offset has an error occurred,",ErrorCode.INCREMENT_LISTENER, ex);
             } catch (Exception ex) {
-                log.error("unkown error occurred,", ex);
+                log.error("{}unkown error occurred,", ErrorCode.INCREMENT_LISTENER,ex);
             }
         };
     }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/ConnectionMgr.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/ConnectionMgr.java
index 139ade3..7c3222c 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/ConnectionMgr.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/ConnectionMgr.java
@@ -95,7 +95,7 @@ public class ConnectionMgr {
                 ThreadUtil.sleepCircle(retry);
                 conn = retryToGetConnection(++retry);
             } else {
-                throw exp;
+                throw new ExtractDataAccessException("get connection failed");
             }
         }
         return conn;
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java
index 39df39e..f7b801a 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java
@@ -23,6 +23,7 @@ import org.opengauss.datachecker.common.constant.Constants;
 import org.opengauss.datachecker.common.entry.common.CheckPointData;
 import org.opengauss.datachecker.common.entry.common.PointPair;
 import org.opengauss.datachecker.common.entry.enums.Endpoint;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.enums.SliceStatus;
 import org.opengauss.datachecker.common.entry.extract.BaseSlice;
 import org.opengauss.datachecker.common.entry.extract.Database;
@@ -351,7 +352,8 @@ public class DataExtractServiceImpl implements DataExtractService {
             }
             tableCheckPointCache.remove(tableName);
         } catch (Exception ex) {
-            LogUtils.error(log, "async exec extract tables error {}:{} ", task.getTableName(), ex.getMessage(), ex);
+            LogUtils.error(log, "{}async exec extract tables error {}:{} ", ErrorCode.ASYNC_EXTRACT_TABLE,
+                task.getTableName(), ex.getMessage(), ex);
         }
     }
 
@@ -514,7 +516,7 @@ public class DataExtractServiceImpl implements DataExtractService {
         try {
             checkPointList = autoSliceStatement.getCheckPoint(metadata, sliceSize);
         } catch (Exception ex) {
-            LogUtils.error(log, "getCheckPoint error:", ex);
+            LogUtils.error(log, "{}getCheckPoint error:", ErrorCode.BUILD_SLICE_POINT, ex);
             return new ArrayList<>();
         }
         if (CollectionUtils.isEmpty(checkPointList)) {
@@ -564,7 +566,7 @@ public class DataExtractServiceImpl implements DataExtractService {
                 tableCheckPointCache.put(tableName, List.of());
             }
         } catch (Exception e) {
-            log.error("register check point failed ", e);
+            log.error("{}register check point failed ", ErrorCode.REGISTER_SLICE_POINT, e);
         }
     }
 
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java
index 9ce8b45..2271b03 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java
@@ -30,6 +30,7 @@ import org.opengauss.datachecker.common.entry.common.CheckPointBean;
 import org.opengauss.datachecker.common.entry.common.CheckPointData;
 import org.opengauss.datachecker.common.entry.common.PointPair;
 import org.opengauss.datachecker.common.entry.enums.Endpoint;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.util.IdGenerator;
 import org.opengauss.datachecker.common.util.LogUtils;
 import org.opengauss.datachecker.common.util.ThreadUtil;
@@ -108,7 +109,8 @@ public class ExtractPointSwapManager {
             kafkaTemplate.send(topic, key, JSONObject.toJSONString(tmpBean));
         } catch (TimeoutException ex) {
             if (reTryTimes > MAX_RETRY_TIMES) {
-                log.error("send msg to kafka timeout, topic: {} key: {} reTryTimes: {}", topic, key, reTryTimes);
+                log.error("{}send msg to kafka timeout, topic: {} key: {} reTryTimes: {}",
+                    ErrorCode.SEND_SLICE_POINT_TIMEOUT, topic, key, reTryTimes);
             }
             ThreadUtil.sleepLongCircle(++reTryTimes);
             sendMsg(topic, key, tmpBean, reTryTimes);
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java
index b864080..5c3ef3f 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java
@@ -21,6 +21,7 @@ import org.apache.logging.log4j.Logger;
 import org.opengauss.datachecker.common.config.ConfigCache;
 import org.opengauss.datachecker.common.constant.ConfigConstants;
 import org.opengauss.datachecker.common.entry.enums.Endpoint;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.extract.SliceVo;
 import org.opengauss.datachecker.common.entry.extract.TableMetadata;
 import org.opengauss.datachecker.common.service.DynamicThreadPoolManager;
@@ -135,7 +136,7 @@ public class SliceDispatcher implements Runnable {
                 }
             }
         } catch (Exception exception) {
-            LogUtils.error(log, "ex", exception);
+            LogUtils.error(log, "{}csv slice dispatcher error", ErrorCode.CSV_TABLE_DISPATCHER, exception);
         }
     }
 
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceStatusFeedbackService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceStatusFeedbackService.java
index bd5eb32..a2f6432 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceStatusFeedbackService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceStatusFeedbackService.java
@@ -16,6 +16,7 @@
 package org.opengauss.datachecker.extract.slice;
 
 import org.apache.logging.log4j.Logger;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.extract.SliceExtend;
 import org.opengauss.datachecker.common.util.LogUtils;
 import org.opengauss.datachecker.common.util.ThreadUtil;
@@ -92,7 +93,8 @@ public class SliceStatusFeedbackService {
                         checkingClient.refreshRegisterSlice(sliceExt);
                     }
                 } catch (Exception ex) {
-                    LogUtils.error(log, "feedback slice status error {}", sliceExt, ex);
+                    LogUtils.error(log, "{}feedback slice status error {}", ErrorCode.FEEDBACK_SLICE_STATUS, sliceExt,
+                        ex);
                 }
             }
             LogUtils.debug(log, "feedback slice status of is completed and exited");
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/TableDispatcher.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/TableDispatcher.java
index dcdfe1d..8d2d3fe 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/TableDispatcher.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/TableDispatcher.java
@@ -19,6 +19,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.logging.log4j.Logger;
 import org.opengauss.datachecker.common.config.ConfigCache;
 import org.opengauss.datachecker.common.entry.enums.Endpoint;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.extract.SliceVo;
 import org.opengauss.datachecker.common.entry.extract.TableMetadata;
 import org.opengauss.datachecker.common.service.DynamicThreadPoolManager;
@@ -113,7 +114,7 @@ public class TableDispatcher implements Runnable {
                 }
             }
         } catch (Exception exception) {
-            log.error("ex", exception);
+            log.error("{}table dispatcher exception ", ErrorCode.CSV_TABLE_DISPATCHER, exception);
         }
     }
 
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvTableProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvTableProcessor.java
index 81bc463..2271be6 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvTableProcessor.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvTableProcessor.java
@@ -16,9 +16,11 @@
 package org.opengauss.datachecker.extract.slice.process;
 
 import com.opencsv.CSVReader;
+
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.logging.log4j.Logger;
 import org.opengauss.datachecker.common.config.ConfigCache;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.extract.SliceExtend;
 import org.opengauss.datachecker.common.entry.extract.TableMetadata;
 import org.opengauss.datachecker.common.exception.ExtractDataAccessException;
@@ -73,7 +75,7 @@ public class CsvTableProcessor extends AbstractTableProcessor {
             sliceExtend.setCount(tableRowCount);
             feedbackStatus(sliceExtend);
         } catch (Exception ex) {
-            log.error("csv table processor ,{} : ", table, ex);
+            log.error("{}csv table processor ,{} : ", ErrorCode.CSV_TABLE_PROCESSOR, table, ex);
             sliceExtend.setStatus(-1);
             feedbackStatus(sliceExtend);
         } finally {
@@ -88,7 +90,8 @@ public class CsvTableProcessor extends AbstractTableProcessor {
         return tableSliceExtend;
     }
 
-    private long executeQueryStatement(TableMetadata tableMetadata, List tablePaths, SliceExtend tableSliceExtend) throws IOException {
+    private long executeQueryStatement(TableMetadata tableMetadata, List tablePaths, SliceExtend tableSliceExtend)
+        throws IOException {
         final LocalDateTime start = LocalDateTime.now();
         long tableRowCount = 0;
         SliceKafkaAgents kafkaAgents = context.createSliceFixedKafkaAgents(topic, table);
@@ -129,12 +132,12 @@ public class CsvTableProcessor extends AbstractTableProcessor {
                         minOffsetList.add(getMinOffset(offsetList));
                         maxOffsetList.add(getMaxOffset(offsetList));
                     } catch (Exception ex) {
-                        log.error("csvTranslateAndSend error: ", ex);
+                        log.error("{}csvTranslateAndSend error: ",ErrorCode.CSV_TABLE_PROCESSOR, ex);
                     }
                     tableRowCount += rowCount;
                     log.info("finish [{}-{}] {} , [{} : {}]", tableFileCount, i, slicePath, rowCount, tableRowCount);
                 } catch (Exception ex) {
-                    log.error("CSVReader exception: ", ex);
+                    log.error("{}CSVReader exception: ", ErrorCode.CSV_TABLE_PROCESSOR, ex);
                 }
                 memoryOperations.release();
             }
@@ -142,7 +145,7 @@ public class CsvTableProcessor extends AbstractTableProcessor {
             tableSliceExtend.setEndOffset(getMaxMaxOffset(maxOffsetList));
             tableSliceExtend.setCount(tableRowCount);
         } catch (Exception ex) {
-            log.error("jdbc query  {} error : {}", table, ex.getMessage());
+            log.error("{}jdbc query  {} error : {}",ErrorCode.CSV_TABLE_PROCESSOR,table, ex.getMessage());
             throw new ExtractDataAccessException();
         } finally {
             log.info("query slice [{}] cost [{}] milliseconds", table,
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java
index 4efc0b9..4194a37 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java
@@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
 import org.opengauss.datachecker.common.config.ConfigCache;
 import org.opengauss.datachecker.common.constant.ConfigConstants;
 import org.opengauss.datachecker.common.entry.enums.DataBaseType;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.extract.SliceExtend;
 import org.opengauss.datachecker.common.entry.extract.SliceVo;
 import org.opengauss.datachecker.common.entry.extract.TableMetadata;
@@ -98,7 +99,7 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
             }
         } catch (Exception | Error ex) {
             sliceExtend.setStatus(-1);
-            LogUtils.error(log, "table slice [{}] is error", slice.toSimpleString(), ex);
+            LogUtils.error(log, "{}table slice [{}] is error", ErrorCode.EXECUTE_SLICE_PROCESSOR,slice.toSimpleString(), ex);
         } finally {
             LogUtils.info(log, "table slice [{} count {}] is finally   ", slice.toSimpleString(), rowCount.get());
             feedbackStatus(sliceExtend);
@@ -167,7 +168,7 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
             updateExtendSliceOffsetAndCount(sliceExtend, rowCount.get(), offsetList);
             log.info("executeSliceQueryStatementPage : {} async send end", slice.getName());
         } catch (Exception ex) {
-            LogUtils.error(log, "slice [{}] has exception :", slice.getName(), ex);
+            LogUtils.error(log, "{}slice [{}] has exception :",ErrorCode.EXECUTE_SLICE_QUERY, slice.getName(), ex);
             throw new ExtractDataAccessException(ex.getMessage());
         } finally {
             if (sliceSender != null) {
@@ -217,7 +218,7 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
             // 数据发送到异步处理线程中,关闭ps与rs
             ConnectionMgr.close(null, ps, resultSet);
         } catch (Exception ex) {
-            log.error("execute query {}  executionStage: {} error,retry cause : {}", slice.toSimpleString(),
+            log.error("{}execute query {}  executionStage: {} error,retry cause : {}", ErrorCode.EXECUTE_SLICE_QUERY, slice.toSimpleString(),
                 executionStage, ex.getMessage());
             if (Objects.equals(executionStage, ExecutionStage.CLOSE)) {
                 ConnectionMgr.close(null, ps, resultSet);
@@ -229,7 +230,7 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
                     queryParameters.resultSetIdx = rsIdx;
                     startOffset = statementQuery(pageStatement, connection, sliceSender, asyncHandler, queryParameters);
                 } else {
-                    log.error("execute query {} retry {} times error ,cause by ", maskQuery(pageStatement),
+                    log.error("{}execute query {} retry {} times error ,cause by ", ErrorCode.EXECUTE_QUERY_SQL, maskQuery(pageStatement),
                         queryParameters.getRetryTimes(), ex);
                     throw new ExtractDataAccessException(
                         "execute query " + maskQuery(pageStatement) + " retry " + MAX_RETRY_TIMES
@@ -274,7 +275,7 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
             waitToStopAsyncHandlerAndResources(asyncHandler);
             updateExtendSliceOffsetAndCount(sliceExtend, rowCount.get(), offsetList);
         } catch (Exception ex) {
-            LogUtils.error(log, "slice [{}] has exception :", slice.getName(), ex);
+            LogUtils.error(log, "{}slice [{}] has exception :",ErrorCode.EXECUTE_SLICE_QUERY, slice.getName(), ex);
             throw new ExtractDataAccessException(ex.getMessage());
         } finally {
             ConnectionMgr.close(connection, ps, resultSet);
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java
index 7216a6c..5920a6d 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java
@@ -17,6 +17,7 @@ package org.opengauss.datachecker.extract.slice.process;
 
 import com.alibaba.druid.pool.DruidDataSource;
 import org.apache.logging.log4j.Logger;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.extract.SliceExtend;
 import org.opengauss.datachecker.common.exception.ExtractDataAccessException;
 import org.opengauss.datachecker.common.util.LogUtils;
@@ -79,7 +80,7 @@ public class JdbcTableProcessor extends AbstractTableProcessor {
             tableSliceExtend.setCount(tableRowCount);
             feedbackStatus(tableSliceExtend);
         } catch (Exception ex) {
-            log.error("extract", ex);
+            log.error("{}extract table processor error", ErrorCode.EXECUTE_SLICE_PROCESSOR, ex);
             tableSliceExtend.setStatus(-1);
             feedbackStatus(tableSliceExtend);
         } finally {
@@ -132,7 +133,7 @@ public class JdbcTableProcessor extends AbstractTableProcessor {
             tableSliceExtend.setEndOffset(getMaxMaxOffset(maxOffsetList));
             tableSliceExtend.setCount(tableRowCount);
         } catch (SQLException ex) {
-            log.error("jdbc query  {} error : {}", table, ex.getMessage());
+            log.error("{}jdbc query  {} error : {}", ErrorCode.EXECUTE_QUERY_SQL, table, ex.getMessage());
             throw new ExtractDataAccessException();
         } finally {
             jdbcOperation.releaseConnection(connection);
@@ -177,7 +178,7 @@ public class JdbcTableProcessor extends AbstractTableProcessor {
                 log.info("finish {} , {}", table, tableRowCount);
             }
         } catch (SQLException ex) {
-            log.error("jdbc query  {} error : {}", table, ex.getMessage());
+            log.error("{}jdbc query  {} error : {}", ErrorCode.EXECUTE_QUERY_SQL, table, ex.getMessage());
             throw new ExtractDataAccessException();
         } finally {
             jdbcOperation.releaseConnection(connection);
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java
index eb7f380..1dfd25f 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java
@@ -24,6 +24,7 @@ import org.opengauss.datachecker.common.constant.ConfigConstants;
 import org.opengauss.datachecker.common.entry.common.DataAccessParam;
 import org.opengauss.datachecker.common.entry.common.PointPair;
 import org.opengauss.datachecker.common.entry.enums.DataBaseType;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData;
 import org.opengauss.datachecker.common.entry.extract.TableMetadata;
 import org.opengauss.datachecker.common.util.LogUtils;
@@ -187,7 +188,7 @@ public class CheckPoint {
         try (Connection connection = getConnection()) {
             return dataAccessService.queryUnionFirstPrimaryCheckPointList(connection, param);
         } catch (Exception e) {
-            log.error("query union primary check point list error {}", e.getMessage());
+            log.error("{}query union primary check point list error {}", ErrorCode.BUILD_SLICE_POINT, e.getMessage());
         }
         return new LinkedList<>();
     }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java
index 46f7220..278ccb2 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java
@@ -19,6 +19,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.logging.log4j.Logger;
 import org.opengauss.datachecker.common.constant.Constants.InitialCapacity;
 import org.opengauss.datachecker.common.entry.enums.DataBaseType;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData;
 import org.opengauss.datachecker.common.entry.extract.RowDataHash;
 import org.opengauss.datachecker.common.entry.extract.TableMetadata;
@@ -218,7 +219,7 @@ public class DataManipulationService {
                     (rs, rowNum) -> resultSetHashHandler.handler(MetaDataUtil.getTablePrimaryColumns(tableMetadata),
                             MetaDataUtil.getTableColumns(tableMetadata), handler.putOneResultSetToMap(rs)));
         } catch (Exception e) {
-            log.error("Failed to query data", e);
+            log.error("{}Failed to query data", ErrorCode.EXECUTE_QUERY_SQL, e);
             throw new ExtractDataAccessException("Failed to query data " + e.getMessage());
         }
     }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java
index 1f1bb73..38274be 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java
@@ -16,6 +16,7 @@
 package org.opengauss.datachecker.extract.task;
 
 import org.apache.logging.log4j.Logger;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.util.LogUtils;
 import org.opengauss.datachecker.extract.task.functional.SimpleTypeHandler;
 import org.opengauss.datachecker.extract.task.functional.SimpleTypeHandlerFactory;
@@ -89,11 +90,12 @@ public abstract class ResultSetHandler {
                     columnLabel = rsmd.getColumnLabel(columnIdx);
                     result.put(columnLabel, convert(resultSet, columnIdx, rsmd));
                 } catch (SQLException ex) {
-                    LOG.error(" Convert data [{}:{}] {} error ", tableName, columnLabel, ex.getMessage(), ex);
+                    LOG.error("{} Convert data [{}:{}] {} error ", ErrorCode.EXECUTE_QUERY_SQL, tableName, columnLabel,
+                        ex.getMessage(), ex);
                 }
             });
         } catch (SQLException ex) {
-            LOG.error(" parse data metadata information exception", ex);
+            LOG.error("{} parse data metadata information exception", ErrorCode.EXECUTE_QUERY_SQL, ex);
         }
         return result;
     }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/SimpleTypeHandlerFactory.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/SimpleTypeHandlerFactory.java
index ce519fb..e15c91f 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/SimpleTypeHandlerFactory.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/SimpleTypeHandlerFactory.java
@@ -16,6 +16,7 @@
 package org.opengauss.datachecker.extract.task.functional;
 
 import org.apache.logging.log4j.Logger;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.common.util.DateTimeFormatterMap;
 import org.opengauss.datachecker.common.util.HexUtil;
 import org.opengauss.datachecker.common.util.LogUtils;
@@ -623,7 +624,7 @@ public class SimpleTypeHandlerFactory {
                     sb.append(line);
                 }
             } catch (IOException io) {
-                LOG.error("read clobToString error");
+                LOG.error("{}read clobToString error", ErrorCode.EXECUTE_QUERY_SQL);
             } finally {
                 closeBufferedReader(bf);
                 closeReader(reader);
@@ -638,7 +639,7 @@ public class SimpleTypeHandlerFactory {
                 bf.close();
             }
         } catch (IOException e) {
-            LOG.error("close BufferedReader error");
+            LOG.error("{}close BufferedReader error", ErrorCode.EXECUTE_QUERY_SQL);
         }
     }
 
@@ -648,7 +649,7 @@ public class SimpleTypeHandlerFactory {
                 reader.close();
             }
         } catch (IOException e) {
-            LOG.error("close Reader error");
+            LOG.error("{}close Reader error", ErrorCode.EXECUTE_QUERY_SQL);
         }
     }
 
diff --git a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/BaseMysqlMapper.java b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/BaseMysqlMapper.java
index 9aa2fc4..a140c79 100644
--- a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/BaseMysqlMapper.java
+++ b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/BaseMysqlMapper.java
@@ -16,12 +16,14 @@
 package org.opengauss.datachecker.extract.dao;
 
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.ibatis.datasource.unpooled.UnpooledDataSource;
 import org.apache.ibatis.executor.Executor;
 import org.apache.ibatis.session.Configuration;
 import org.apache.ibatis.session.TransactionIsolationLevel;
 import org.apache.ibatis.transaction.Transaction;
 import org.apache.ibatis.transaction.jdbc.JdbcTransaction;
+import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.extract.data.mapper.MysqlMetaDataMapper;
 import org.springframework.core.env.PropertySource;
 import org.springframework.core.io.ClassPathResource;
-- 
Gitee


From bb1cb7ce3163294ce9d1ff3ed97c2da6785217eb Mon Sep 17 00:00:00 2001
From: mystarry-sky 
Date: Tue, 24 Dec 2024 15:57:11 +0800
Subject: [PATCH 2/4] =?UTF-8?q?Datakit=E6=A0=A1=E9=AA=8C=E9=80=82=E9=85=8D?=
 =?UTF-8?q?=E5=BC=82=E5=B8=B8=E6=94=B6=E9=9B=86=E9=9C=80=E6=B1=82?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../org/opengauss/datachecker/extract/dao/BaseMysqlMapper.java  | 2 --
 1 file changed, 2 deletions(-)

diff --git a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/BaseMysqlMapper.java b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/BaseMysqlMapper.java
index a140c79..9aa2fc4 100644
--- a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/BaseMysqlMapper.java
+++ b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/BaseMysqlMapper.java
@@ -16,14 +16,12 @@
 package org.opengauss.datachecker.extract.dao;
 
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.ibatis.datasource.unpooled.UnpooledDataSource;
 import org.apache.ibatis.executor.Executor;
 import org.apache.ibatis.session.Configuration;
 import org.apache.ibatis.session.TransactionIsolationLevel;
 import org.apache.ibatis.transaction.Transaction;
 import org.apache.ibatis.transaction.jdbc.JdbcTransaction;
-import org.opengauss.datachecker.common.entry.enums.ErrorCode;
 import org.opengauss.datachecker.extract.data.mapper.MysqlMetaDataMapper;
 import org.springframework.core.env.PropertySource;
 import org.springframework.core.io.ClassPathResource;
-- 
Gitee


From 0cbc2d39906a01fc3d095c5736950cde8cf63f51 Mon Sep 17 00:00:00 2001
From: mystarry-sky 
Date: Tue, 24 Dec 2024 16:08:20 +0800
Subject: [PATCH 3/4] =?UTF-8?q?Datakit=E6=A0=A1=E9=AA=8C=E9=80=82=E9=85=8D?=
 =?UTF-8?q?=E5=BC=82=E5=B8=B8=E6=94=B6=E9=9B=86=E9=9C=80=E6=B1=82?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../config/validator/RuleConfigTableValidator.java  |  3 ++-
 .../modules/report/SliceCheckResultManager.java     |  9 ++++++---
 .../datachecker/check/slice/SliceCheckWorker.java   |  9 +++++----
 .../datachecker/check/slice/TableCheckWorker.java   |  2 +-
 .../thread/CheckUncaughtExceptionHandler.java       |  4 ++--
 .../extract/data/csv/CsvReaderListener.java         |  3 ++-
 .../extract/debezium/DebeziumWorker.java            |  3 ++-
 .../debezium/IncrementDataAnalysisService.java      |  4 ++--
 .../extract/slice/process/CsvTableProcessor.java    |  4 ++--
 .../extract/slice/process/JdbcSliceProcessor.java   | 13 +++++++------
 10 files changed, 31 insertions(+), 23 deletions(-)

diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/validator/RuleConfigTableValidator.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/validator/RuleConfigTableValidator.java
index 1f1d3ec..fa12d02 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/validator/RuleConfigTableValidator.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/config/validator/RuleConfigTableValidator.java
@@ -55,7 +55,8 @@ public class RuleConfigTableValidator implements RuleConfigValidator
         List whiteKeyList = fetchRuleByPredicate(values, rule -> rule.getName().equalsIgnoreCase(RULE_WHITE));
         List blackKeyList = fetchRuleByPredicate(values, rule -> rule.getName().equalsIgnoreCase(RULE_BLACK));
         if (CollectionUtils.isNotEmpty(whiteKeyList) && CollectionUtils.isNotEmpty(blackKeyList)) {
-            log.error("{}RuleConfig of table rule , black rule ={} is invalid rule", ErrorCode.RULE_CONFIG_ERROR, blackKeyList);
+            log.error("{}RuleConfig of table rule , black rule ={} is invalid rule", ErrorCode.RULE_CONFIG_ERROR,
+                blackKeyList);
             values.removeAll(blackKeyList);
         }
         List rules = filterRepeatBy(values, Rule::getText);
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java
index a6a5605..ceb14ef 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java
@@ -391,7 +391,8 @@ public class SliceCheckResultManager {
                 final List updateRepairs = feignClient.buildRepairStatementUpdateDml(Endpoint.SOURCE, update);
                 appendLogFile(repairFile, updateRepairs);
             } catch (Exception ex) {
-                log.error("{}build table {} update repair file {}", ErrorCode.BUILD_DIFF_STATEMENT, tableFailed.getTable(), ex.getMessage());
+                log.error("{}build table {} update repair file {}", ErrorCode.BUILD_DIFF_STATEMENT,
+                    tableFailed.getTable(), ex.getMessage());
             }
         }
     }
@@ -418,7 +419,8 @@ public class SliceCheckResultManager {
                 final List insertRepairs = feignClient.buildRepairStatementInsertDml(Endpoint.SOURCE, insert);
                 appendLogFile(repairFile, insertRepairs);
             } catch (Exception ex) {
-                log.error("{}build table {} insert repair file {}", ErrorCode.BUILD_DIFF_STATEMENT, tableFailed.getTable(), ex.getMessage());
+                log.error("{}build table {} insert repair file {}", ErrorCode.BUILD_DIFF_STATEMENT,
+                    tableFailed.getTable(), ex.getMessage());
             }
         }
     }
@@ -433,7 +435,8 @@ public class SliceCheckResultManager {
                 final List deleteRepairs = feignClient.buildRepairStatementDeleteDml(Endpoint.SOURCE, delete);
                 appendLogFile(repairFile, deleteRepairs);
             } catch (Exception ex) {
-                log.error("{}build table {} delete repair file {}", ErrorCode.BUILD_DIFF_STATEMENT, tableFailed.getTable(), ex.getMessage());
+                log.error("{}build table {} delete repair file {}", ErrorCode.BUILD_DIFF_STATEMENT,
+                    tableFailed.getTable(), ex.getMessage());
             }
         }
     }
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java
index b9dc25b..01abaf5 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java
@@ -141,9 +141,9 @@ public class SliceCheckWorker implements Runnable {
             } else {
                 compareNoMerkleTree(sourceTuple, sinkTuple);
             }
-        } catch (Exception ignore) {
-            LogUtils.error(LOGGER, "{}check table has some error,", ErrorCode.CHECK_SLICE_EXCEPTION, ignore);
-            errorMsg = ignore.getMessage();
+        } catch (Exception ex) {
+            LogUtils.error(LOGGER, "{}check table has some error,", ErrorCode.CHECK_SLICE_EXCEPTION, ex);
+            errorMsg = ex.getMessage();
         } finally {
             try {
                 refreshSliceCheckProgress();
@@ -151,7 +151,8 @@ public class SliceCheckWorker implements Runnable {
                 cleanCheckThreadEnvironment();
                 finishedSliceCheck(slice);
             } catch (Exception exception) {
-                LogUtils.error(LOGGER, "{}refresh check {} error:",ErrorCode.CHECK_SLICE_EXCEPTION, slice.getName(), exception);
+                LogUtils.error(LOGGER, "{}refresh check {} error:", ErrorCode.CHECK_SLICE_EXCEPTION, slice.getName(),
+                    exception);
             }
             LogUtils.info(LOGGER, "check slice of {} end.", slice.getName());
         }
diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/TableCheckWorker.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/TableCheckWorker.java
index 5217a3e..36ecc40 100644
--- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/TableCheckWorker.java
+++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/TableCheckWorker.java
@@ -106,7 +106,7 @@ public class TableCheckWorker implements Runnable {
             log.info("check table of {}", slice.getName());
             checkedTableSliceByTopicPartition(source, sink);
         } catch (Exception ex) {
-            log.error("{}check table has some error,", ErrorCode.CHECK_TABLE_EXCEPTION,ex);
+            log.error("{}check table has some error,", ErrorCode.CHECK_TABLE_EXCEPTION, ex);
             errorMsg = ex.getMessage();
         } finally {
             refreshSliceCheckProgress();
diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/thread/CheckUncaughtExceptionHandler.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/thread/CheckUncaughtExceptionHandler.java
index dbc2ade..e048864 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/thread/CheckUncaughtExceptionHandler.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/thread/CheckUncaughtExceptionHandler.java
@@ -39,7 +39,7 @@ public class CheckUncaughtExceptionHandler implements Thread.UncaughtExceptionHa
 
     @Override
     public void uncaughtException(Thread t, Throwable e) {
-        String msg = String.format("getException from thread: %s,exceptionName:%s", t.getName(), e.getMessage());
-        logger.error("{}"+msg, ErrorCode.UNKNOWN, e);
+        String msg = String.format("{}getException from thread: %s,exceptionName:%s", t.getName(), e.getMessage());
+        logger.error(msg, ErrorCode.UNKNOWN, e);
     }
 }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java
index 91abee3..921ad04 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java
@@ -95,7 +95,8 @@ public class CsvReaderListener implements CsvListener {
                     MapUtils.put(readerSliceMap, slice.getTable(), slice);
                     LogUtils.debug(log,"reader add log : {}", line);
                 } catch (Exception ex) {
-                    LogUtils.error(log,"{}reader log listener error : " , ErrorCode.CSV_READER_LISTENER, ex.getMessage());
+                    LogUtils.error(log, "{}reader log listener error : ", ErrorCode.CSV_READER_LISTENER,
+                        ex.getMessage());
                 }
             }
         }, ConfigCache.getCsvLogMonitorInterval(), false);
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java
index 9775317..6e5e5cc 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java
@@ -90,7 +90,8 @@ public class DebeziumWorker implements Runnable {
                 try {
                     debeziumConsumerListener.listen(record);
                 } catch (Exception ex) {
-                    log.error("{}DebeziumWorker unknown error, message,{},{}", ErrorCode.DEBEZIUM_WORKER, record.toString(), ex);
+                    log.error("{}DebeziumWorker unknown error, message,{},{}", ErrorCode.DEBEZIUM_WORKER,
+                        record.toString(), ex);
                 }
             }
             POLL_BATCH_COUNT.addAndGet(records.count());
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/IncrementDataAnalysisService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/IncrementDataAnalysisService.java
index 0e79702..0ed6315 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/IncrementDataAnalysisService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/IncrementDataAnalysisService.java
@@ -121,9 +121,9 @@ public class IncrementDataAnalysisService {
             } catch (FeignException ex) {
                 log.error("{}check service has an error occurred. {}", ErrorCode.INCREMENT_LISTENER, ex.getMessage());
             } catch (ExtractException ex) {
-                log.error("{}peek debezium topic record offset has an error occurred,",ErrorCode.INCREMENT_LISTENER, ex);
+                log.error("{}peek debezium topic record has an error occurred", ErrorCode.INCREMENT_LISTENER, ex);
             } catch (Exception ex) {
-                log.error("{}unkown error occurred,", ErrorCode.INCREMENT_LISTENER,ex);
+                log.error("{}unkown error occurred,", ErrorCode.INCREMENT_LISTENER, ex);
             }
         };
     }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvTableProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvTableProcessor.java
index 2271be6..fabd8b7 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvTableProcessor.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvTableProcessor.java
@@ -132,7 +132,7 @@ public class CsvTableProcessor extends AbstractTableProcessor {
                         minOffsetList.add(getMinOffset(offsetList));
                         maxOffsetList.add(getMaxOffset(offsetList));
                     } catch (Exception ex) {
-                        log.error("{}csvTranslateAndSend error: ",ErrorCode.CSV_TABLE_PROCESSOR, ex);
+                        log.error("{}csvTranslateAndSend error: ", ErrorCode.CSV_TABLE_PROCESSOR, ex);
                     }
                     tableRowCount += rowCount;
                     log.info("finish [{}-{}] {} , [{} : {}]", tableFileCount, i, slicePath, rowCount, tableRowCount);
@@ -145,7 +145,7 @@ public class CsvTableProcessor extends AbstractTableProcessor {
             tableSliceExtend.setEndOffset(getMaxMaxOffset(maxOffsetList));
             tableSliceExtend.setCount(tableRowCount);
         } catch (Exception ex) {
-            log.error("{}jdbc query  {} error : {}",ErrorCode.CSV_TABLE_PROCESSOR,table, ex.getMessage());
+            log.error("{}jdbc query  {} error : {}", ErrorCode.CSV_TABLE_PROCESSOR, table, ex.getMessage());
             throw new ExtractDataAccessException();
         } finally {
             log.info("query slice [{}] cost [{}] milliseconds", table,
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java
index 4194a37..276978c 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java
@@ -99,7 +99,8 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
             }
         } catch (Exception | Error ex) {
             sliceExtend.setStatus(-1);
-            LogUtils.error(log, "{}table slice [{}] is error", ErrorCode.EXECUTE_SLICE_PROCESSOR,slice.toSimpleString(), ex);
+            LogUtils.error(log, "{}table slice [{}] is error", ErrorCode.EXECUTE_SLICE_PROCESSOR,
+                slice.toSimpleString(), ex);
         } finally {
             LogUtils.info(log, "table slice [{} count {}] is finally   ", slice.toSimpleString(), rowCount.get());
             feedbackStatus(sliceExtend);
@@ -218,8 +219,8 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
             // 数据发送到异步处理线程中,关闭ps与rs
             ConnectionMgr.close(null, ps, resultSet);
         } catch (Exception ex) {
-            log.error("{}execute query {}  executionStage: {} error,retry cause : {}", ErrorCode.EXECUTE_SLICE_QUERY, slice.toSimpleString(),
-                executionStage, ex.getMessage());
+            log.error("{}execute query {}  executionStage: {} error,retry cause : {}", ErrorCode.EXECUTE_SLICE_QUERY,
+                slice.toSimpleString(), executionStage, ex.getMessage());
             if (Objects.equals(executionStage, ExecutionStage.CLOSE)) {
                 ConnectionMgr.close(null, ps, resultSet);
             } else {
@@ -230,8 +231,8 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
                     queryParameters.resultSetIdx = rsIdx;
                     startOffset = statementQuery(pageStatement, connection, sliceSender, asyncHandler, queryParameters);
                 } else {
-                    log.error("{}execute query {} retry {} times error ,cause by ", ErrorCode.EXECUTE_QUERY_SQL, maskQuery(pageStatement),
-                        queryParameters.getRetryTimes(), ex);
+                    log.error("{}execute query {} retry {} times error ,cause by ", ErrorCode.EXECUTE_QUERY_SQL,
+                        maskQuery(pageStatement), queryParameters.getRetryTimes(), ex);
                     throw new ExtractDataAccessException(
                         "execute query " + maskQuery(pageStatement) + " retry " + MAX_RETRY_TIMES
                             + " times error ,cause by " + ex.getMessage());
@@ -275,7 +276,7 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
             waitToStopAsyncHandlerAndResources(asyncHandler);
             updateExtendSliceOffsetAndCount(sliceExtend, rowCount.get(), offsetList);
         } catch (Exception ex) {
-            LogUtils.error(log, "{}slice [{}] has exception :",ErrorCode.EXECUTE_SLICE_QUERY, slice.getName(), ex);
+            LogUtils.error(log, "{}slice [{}] has exception :", ErrorCode.EXECUTE_SLICE_QUERY, slice.getName(), ex);
             throw new ExtractDataAccessException(ex.getMessage());
         } finally {
             ConnectionMgr.close(connection, ps, resultSet);
-- 
Gitee


From 31bc1f61317fe5309c6e82e08f7aea5f8b96236e Mon Sep 17 00:00:00 2001
From: mystarry-sky 
Date: Tue, 24 Dec 2024 16:17:27 +0800
Subject: [PATCH 4/4] =?UTF-8?q?Datakit=E6=A0=A1=E9=AA=8C=E9=80=82=E9=85=8D?=
 =?UTF-8?q?=E5=BC=82=E5=B8=B8=E6=94=B6=E9=9B=86=E9=9C=80=E6=B1=82?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../opengauss/datachecker/common/service/ProcessLogService.java | 2 +-
 .../datachecker/extract/slice/process/JdbcSliceProcessor.java   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ProcessLogService.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ProcessLogService.java
index 1d3b3c8..078a52a 100644
--- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ProcessLogService.java
+++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ProcessLogService.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd.
+ * Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
  *
  * openGauss is licensed under Mulan PSL v2.
  * You can use this software according to the terms and conditions of the Mulan PSL v2.
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java
index 276978c..539704e 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java
@@ -169,7 +169,7 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor {
             updateExtendSliceOffsetAndCount(sliceExtend, rowCount.get(), offsetList);
             log.info("executeSliceQueryStatementPage : {} async send end", slice.getName());
         } catch (Exception ex) {
-            LogUtils.error(log, "{}slice [{}] has exception :",ErrorCode.EXECUTE_SLICE_QUERY, slice.getName(), ex);
+            LogUtils.error(log, "{}slice [{}] has exception :", ErrorCode.EXECUTE_SLICE_QUERY, slice.getName(), ex);
             throw new ExtractDataAccessException(ex.getMessage());
         } finally {
             if (sliceSender != null) {
-- 
Gitee