From 3a66034d312cec08051c2dfc1aa59d011550127c Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Fri, 23 Feb 2024 16:49:15 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DCSV=E5=9C=BA=E6=99=AF?= =?UTF-8?q?=E7=9B=AE=E6=A0=87=E7=AB=AF=E5=B0=91=E8=A1=A8=EF=BC=8C=E5=A4=9A?= =?UTF-8?q?=E8=A1=A8=EF=BC=8C=E4=BB=A5=E5=8F=8ACSVdouble=EF=BC=8C=E6=97=B6?= =?UTF-8?q?=E9=97=B4=E7=B1=BB=E5=9E=8B=E6=A0=A1=E9=AA=8C=E4=B8=8D=E5=87=86?= =?UTF-8?q?=E7=A1=AE=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../check/client/ExtractFallbackFactory.java | 3 +- .../check/client/ExtractFeignClient.java | 9 +- .../check/client/FeignClientService.java | 17 +-- .../check/controller/CheckCsvController.java | 19 ++++ .../check/load/CheckStartCsvLoader.java | 3 +- .../report/SliceCheckResultManager.java | 6 +- .../check/service/TaskRegisterCenter.java | 104 ++++++++++++++++-- .../check/slice/SliceCheckEventHandler.java | 37 ++++++- .../common/util/DateTimeFormatterMap.java | 60 ++++++++++ .../datachecker/common/util/FileUtils.java | 12 ++ .../extract/client/CheckingFeignClient.java | 11 ++ .../controller/CheckCsvController.java | 11 ++ .../extract/data/BaseDataService.java | 1 + .../extract/data/csv/CsvListener.java | 8 ++ .../extract/data/csv/CsvReaderListener.java | 8 ++ .../extract/data/csv/CsvWriterListener.java | 6 + .../service/BaseManagementService.java | 25 ----- .../extract/service/CsvManagementService.java | 22 +++- .../extract/slice/SliceDispatcher.java | 22 ++-- .../slice/common/SliceResultSetSender.java | 50 ++++++++- .../extract/task/MysqlResultSetHandler.java | 33 +++++- .../task/OpenGaussResultSetHandler.java | 38 ++++++- .../extract/task/ResultSetHandler.java | 8 +- .../task/functional/MysqlTypeHandler.java | 40 +++++++ .../functional/OpgsTypeHandler.java} | 37 +++---- .../extract/util/MetaDataUtil.java | 19 +++- .../extract/dao/MysqlResultSetTest.java | 6 + .../extract/dao/OpenGaussResultSetTest.java | 6 + .../resources/mysql_opgs/expect/t_time.json | 47 ++++++++ .../resources/mysql_opgs/sql/init_t_time.sql | 19 ++++ 30 files changed, 588 insertions(+), 99 deletions(-) create mode 100644 datachecker-common/src/main/java/org/opengauss/datachecker/common/util/DateTimeFormatterMap.java delete mode 100644 datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/BaseManagementService.java create mode 100644 datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/MysqlTypeHandler.java rename datachecker-extract/src/main/java/org/opengauss/datachecker/extract/{controller/ExtractBaseController.java => task/functional/OpgsTypeHandler.java} (43%) create mode 100644 datachecker-extract/src/test/resources/mysql_opgs/expect/t_time.json create mode 100644 datachecker-extract/src/test/resources/mysql_opgs/sql/init_t_time.sql diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java index 3d20752..352e376 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java @@ -196,13 +196,12 @@ public class ExtractFallbackFactory implements FallbackFactory fetchCheckTableCount() { + public Result fetchCsvCheckTableCount() { return null; } @Override public void dispatcherTables(List list) { - } } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java index 7306e2e..71eacec 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java @@ -227,8 +227,13 @@ public interface ExtractFeignClient { @PostMapping("/start/csv/service") Result enableCsvExtractService(); - @GetMapping("/fetch/check/table/count") - Result fetchCheckTableCount(); + /** + * fetchCsvCheckTableCount + * + * @return csv table count + */ + @GetMapping("/fetch/csv/check/table/count") + Result fetchCsvCheckTableCount(); /** * csv dispatcher tables diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java index d68c78f..76794d2 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/FeignClientService.java @@ -349,15 +349,16 @@ public class FeignClientService { getClient(Endpoint.SINK).enableCsvExtractService(); } - public int fetchCheckTableCount() { - int source = fetchCheckTableCount(Endpoint.SOURCE); - int sink = fetchCheckTableCount(Endpoint.SINK); - return Math.max(source, sink); - } - - public int fetchCheckTableCount(Endpoint endpoint) { + /** + * fetchCsvCheckTableCount + * source table count + * + * @param endpoint endpoint + * @return table count + */ + public int fetchCsvCheckTableCount(Endpoint endpoint) { try { - Result result = getClient(endpoint).fetchCheckTableCount(); + Result result = getClient(endpoint).fetchCsvCheckTableCount(); if (result.isSuccess()) { return result.getData(); } else { diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/CheckCsvController.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/CheckCsvController.java index 7bd5cc1..70113ff 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/CheckCsvController.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/controller/CheckCsvController.java @@ -16,12 +16,16 @@ package org.opengauss.datachecker.check.controller; import org.opengauss.datachecker.check.service.CsvProcessManagement; +import org.opengauss.datachecker.check.service.TaskRegisterCenter; +import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import javax.annotation.Resource; import java.util.List; /** @@ -34,6 +38,8 @@ import java.util.List; public class CheckCsvController { @Autowired private CsvProcessManagement csvProcessManagement; + @Resource + private TaskRegisterCenter registerCenter; /** * csv task dispatcher @@ -44,4 +50,17 @@ public class CheckCsvController { public void notifyTableIndexCompleted(@RequestBody List completedTableList) { csvProcessManagement.taskDispatcher(completedTableList); } + + /** + * notifyCheckIgnoreTable + * + * @param endpoint endpoint + * @param table table + * @param reason reason + */ + @PostMapping("/notify/check/csv/ignore") + void notifyCheckIgnoreTable(@RequestParam("endpoint") Endpoint endpoint, @RequestParam("table") String table, + @RequestParam("reason") String reason) { + registerCenter.addCheckIgnoreTable(endpoint, table, reason); + } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckStartCsvLoader.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckStartCsvLoader.java index ce2c611..259bed7 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckStartCsvLoader.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckStartCsvLoader.java @@ -22,6 +22,7 @@ import org.opengauss.datachecker.check.service.CsvProcessManagement; import org.opengauss.datachecker.check.service.TaskRegisterCenter; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.entry.enums.CheckMode; +import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.util.SpringUtil; import org.opengauss.datachecker.common.util.ThreadUtil; import org.springframework.core.annotation.Order; @@ -55,7 +56,7 @@ public class CheckStartCsvLoader extends AbstractCheckLoader { if (Objects.equals(CheckMode.CSV, ConfigCache.getCheckMode())) { log.info("start data check csv"); sliceProgressService.startProgressing(); - int count = feignClient.fetchCheckTableCount(); + int count = feignClient.fetchCsvCheckTableCount(Endpoint.SOURCE); sliceProgressService.updateTotalTableCount(count); csvProcessManagement.startTaskDispatcher(); feignClient.enableCsvExtractService(); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java index 4a4b783..d513b0a 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/report/SliceCheckResultManager.java @@ -46,7 +46,6 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import javax.validation.constraints.NotEmpty; -import java.io.File; import java.time.Duration; import java.time.LocalDateTime; import java.util.LinkedList; @@ -189,8 +188,7 @@ public class SliceCheckResultManager { private void notifyCsvShardingCompleted(CheckDiffResult checkDiffResult, boolean immediately) { if (isCsvMode && immediately) { String csvDataPath = ConfigCache.getCsvData(); - File file = new File(csvDataPath, checkDiffResult.getFileName()); - if (file.renameTo(new File(csvDataPath, checkDiffResult.getFileName() + ".check"))) { + if (FileUtils.renameTo(csvDataPath, checkDiffResult.getFileName())) { log.info("rename csv sharding completed [{}]", checkDiffResult.getFileName()); } else { log.warn("rename csv sharding false [{}]", checkDiffResult.getFileName()); @@ -207,7 +205,7 @@ public class SliceCheckResultManager { } /** - * refresg summary log + * refresh summary log */ public void refreshSummary() { CheckSummary checkSummary = new CheckSummary(); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java index 3afac29..f09eb91 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java @@ -18,18 +18,26 @@ package org.opengauss.datachecker.check.service; import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.check.slice.SliceCheckEvent; import org.opengauss.datachecker.check.slice.SliceCheckEventHandler; +import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.SliceExtend; import org.opengauss.datachecker.common.entry.extract.SliceVo; +import org.opengauss.datachecker.common.util.FileUtils; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.MapUtils; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; /** * TaskRegisterCenter @@ -41,17 +49,20 @@ import java.util.concurrent.locks.ReentrantLock; @SuppressWarnings("AlibabaConstantFieldShouldBeUpperCase") @Component public class TaskRegisterCenter { - private static final Logger log = LogUtils.getLogger(); protected static final AtomicInteger sliceTotalCount = new AtomicInteger(); protected static final AtomicInteger tableCount = new AtomicInteger(); - protected static final Map center = new ConcurrentHashMap<>(); protected static final Map sliceTableCounter = new ConcurrentHashMap<>(); protected static final Map> sliceExtendMap = new ConcurrentHashMap<>(); - private static final int status_updated_all = 3; + + private static final Logger log = LogUtils.getLogger(); + private static final int STATUS_UPDATED_ALL = 3; + private final ReentrantLock lock = new ReentrantLock(); @Resource private SliceCheckEventHandler sliceCheckEventHandler; + private Map sourceIgnoreMap = new HashMap<>(); + private Map sinkIgnoreMap = new HashMap<>(); /** * register slice info to register center @@ -105,7 +116,7 @@ public class TaskRegisterCenter { Endpoint endpoint = sliceExt.getEndpoint(); MapUtils.put(sliceExtendMap, sliceName, endpoint, sliceExt); log.info("{} update slice [{}] status [{}->{}]", endpoint, sliceName, oldStatus, curStatus); - if (curStatus == status_updated_all) { + if (curStatus == STATUS_UPDATED_ALL) { notifySliceCheckHandle(slice); } } @@ -122,12 +133,47 @@ public class TaskRegisterCenter { remove(slice); } + private void notifyIgnoreSliceCheckHandle(String table) { + removeIgnoreTables(table); + List> tableSliceList = center.entrySet() + .stream() + .filter(entry -> entry.getValue() + .getTable() + .equals(table)) + .collect(Collectors.toList()); + Optional.of(tableSliceList) + .ifPresent(list -> { + Entry firstSlice = list.get(0); + String sliceName = firstSlice.getValue() + .getName(); + SliceExtend source = MapUtils.get(sliceExtendMap, sliceName, Endpoint.SOURCE); + SliceExtend sink = MapUtils.get(sliceExtendMap, sliceName, Endpoint.SINK); + sliceCheckEventHandler.handleIgnoreTable(firstSlice.getValue(), source, sink); + String csvDataPath = ConfigCache.getCsvData(); + list.forEach(entry -> { + if (FileUtils.renameTo(csvDataPath, entry.getKey())) { + log.debug("rename csv sharding completed [{}] by {}", entry.getKey(), "table miss"); + } + remove(entry.getKey()); + }); + }); + } + + /** + * remove center and sliceExtend cache + * + * @param sliceVo sliceVo + */ public void remove(SliceVo sliceVo) { + remove(sliceVo.getName()); + } + + private void remove(String sliceName) { lock.lock(); try { - center.remove(sliceVo.getName()); - sliceExtendMap.remove(sliceVo.getName()); - log.info("drop slice [{}] due to had notified , release [{}]", sliceVo.getName(), center.size()); + center.remove(sliceName); + sliceExtendMap.remove(sliceName); + log.info("drop slice [{}] due to had notified , release [{}]", sliceName, center.size()); } finally { lock.unlock(); } @@ -149,6 +195,18 @@ public class TaskRegisterCenter { * @return boolean true | false */ public boolean checkCompletedAll(int tableCount) { + // 处理csv场景已经忽略的表 + if (!(sourceIgnoreMap.isEmpty() && sinkIgnoreMap.isEmpty())) { + sliceTableCounter.entrySet() + .stream() + .filter(tableEntry -> tableEntry.getValue() > 0 && ( + sourceIgnoreMap.containsKey(tableEntry.getKey()) || sinkIgnoreMap.containsKey( + tableEntry.getKey()))) + .forEach(ignoreTable -> { + notifyIgnoreSliceCheckHandle(ignoreTable.getKey()); + log.warn("ignore table {} ===add ignore table to result===", ignoreTable.getKey()); + }); + } return sliceTableCounter.values() .stream() .allMatch(count -> count == 0) && sliceTableCounter.size() == tableCount; @@ -177,4 +235,36 @@ public class TaskRegisterCenter { lock.unlock(); } } + + /** + * addCheckIgnoreTable + * + * @param endpoint endpoint + * @param table table + * @param reason reason + */ + public void addCheckIgnoreTable(Endpoint endpoint, String table, String reason) { + if (Objects.equals(endpoint, Endpoint.SOURCE)) { + sourceIgnoreMap.put(table, reason); + } else { + sinkIgnoreMap.put(table, reason); + } + refreshIgnoreTables(table); + } + + private void refreshIgnoreTables(String table) { + if (sourceIgnoreMap.containsKey(table) && sinkIgnoreMap.containsKey(table)) { + sourceIgnoreMap.remove(table); + sinkIgnoreMap.remove(table); + } + } + + private void removeIgnoreTables(String table) { + if (sourceIgnoreMap.containsKey(table)) { + sourceIgnoreMap.remove(table); + } + if (sinkIgnoreMap.containsKey(table)) { + sinkIgnoreMap.remove(table); + } + } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java index 6fd79c2..de0792a 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java @@ -21,15 +21,15 @@ import org.opengauss.datachecker.check.modules.check.CheckDiffResult; import org.opengauss.datachecker.check.service.TaskRegisterCenter; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.constant.ConfigConstants; +import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.SliceExtend; import org.opengauss.datachecker.common.entry.extract.SliceVo; import org.opengauss.datachecker.common.service.DynamicThreadPoolManager; import org.opengauss.datachecker.common.util.LogUtils; import org.springframework.stereotype.Component; -import javax.annotation.Resource; - import java.time.LocalDateTime; +import java.util.Objects; import java.util.concurrent.ThreadPoolExecutor; import static org.opengauss.datachecker.common.constant.DynamicTpConstant.CHECK_EXECUTOR; @@ -67,7 +67,8 @@ public class SliceCheckEventHandler { */ public void handle(SliceCheckEvent checkEvent) { if (checkTableStructure(checkEvent)) { - log.info("slice check event {} is dispatched, and checked level=[isTableLevel={}]", checkEvent.getCheckName(),checkEvent.isTableLevel()); + log.info("slice check event {} is dispatched, and checked level=[isTableLevel={}]", + checkEvent.getCheckName(), checkEvent.isTableLevel()); if (checkEvent.isTableLevel()) { executor.submit(new TableCheckWorker(checkEvent, sliceCheckContext)); } else { @@ -79,7 +80,8 @@ public class SliceCheckEventHandler { .getTableHash(), checkEvent.getSink() .getTableHash()); handleTableStructureDiff(checkEvent); - registerCenter.refreshCheckedTableCompleted(checkEvent.getSlice().getTable()); + registerCenter.refreshCheckedTableCompleted(checkEvent.getSlice() + .getTable()); } } @@ -113,4 +115,31 @@ public class SliceCheckEventHandler { SliceExtend sink = checkEvent.getSink(); return source.getTableHash() == sink.getTableHash(); } + + /** + * handleIgnoreTable + * + * @param slice slice + * @param source source + * @param sink sink + */ + public void handleIgnoreTable(SliceVo slice, SliceExtend source, SliceExtend sink) { + sliceCheckContext.refreshSliceCheckProgress(slice, 0); + CheckDiffResultBuilder builder = CheckDiffResultBuilder.builder(); + Endpoint existEndpoint = Objects.nonNull(source) && Objects.isNull(sink) ? Endpoint.SOURCE : Endpoint.SINK; + builder.checkMode(ConfigCache.getCheckMode()) + .process(ConfigCache.getValue(ConfigConstants.PROCESS_NO)) + .schema(slice.getSchema()) + .table(slice.getTable()) + .sno(slice.getNo()) + .startTime(LocalDateTime.now()) + .endTime(LocalDateTime.now()) + .isTableStructureEquals(false) + .isExistTableMiss(true, existEndpoint) + .rowCount(0) + .error("table miss"); + CheckDiffResult result = builder.build(); + sliceCheckContext.addTableStructureDiffResult(slice, result); + registerCenter.refreshCheckedTableCompleted(slice.getTable()); + } } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/DateTimeFormatterMap.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/DateTimeFormatterMap.java new file mode 100644 index 0000000..b54f1fd --- /dev/null +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/DateTimeFormatterMap.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.common.util; + +import java.time.format.DateTimeFormatter; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + *
+ * DateTimeFormatterMap
+ * DataTime  "yyyy-MM-dd HH:mm:ss"
+ * "yyyy-MM-dd HH:mm:ss.S"
+ * "yyyy-MM-dd HH:mm:ss.SS"
+ * "yyyy-MM-dd HH:mm:ss.SSS"
+ * "yyyy-MM-dd HH:mm:ss.SSSS"
+ * "yyyy-MM-dd HH:mm:ss.SSSSS"
+ * "yyyy-MM-dd HH:mm:ss.SSSSSS"
+ * 
+ * + * @author :wangchao + * @date :Created in 2024/2/21 + * @since :11 + */ +public class DateTimeFormatterMap { + private static final Map FORMATTER = new LinkedHashMap<>(); + private static final String FORMAT = "yyyy-MM-dd HH:mm:ss"; + private static final String FORMAT_S = "S"; + + /** + * Constructor + */ + public DateTimeFormatterMap() { + FORMATTER.put(0, DateTimeFormatter.ofPattern(FORMAT)); + } + + /** + * get DateTimeFormatter by nano second length + * + * @param length format nano second length + * @return DateTimeFormatter + */ + public DateTimeFormatter get(Integer length) { + return FORMATTER.computeIfAbsent(length, + numberOfTimes -> DateTimeFormatter.ofPattern(FORMAT + "." + FORMAT_S.repeat(numberOfTimes))); + } +} diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/FileUtils.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/FileUtils.java index 417607f..487b791 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/FileUtils.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/FileUtils.java @@ -164,4 +164,16 @@ public class FileUtils { } return StringUtils.EMPTY; } + + /** + * rename csv data file to filename.csv.check + * + * @param csvDataPath csv data path + * @param oldFileName csv file name + * @return rename result boolean + */ + public static boolean renameTo(String csvDataPath, String oldFileName) { + File file = new File(csvDataPath, oldFileName); + return file.exists() && file.renameTo(new File(csvDataPath, oldFileName + ".check")); + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java index 24ed0a5..ed0f8a0 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/client/CheckingFeignClient.java @@ -145,4 +145,15 @@ public interface CheckingFeignClient { */ @PostMapping("/load/metadata/completed") void refreshLoadMetadataCompleted(@RequestParam("endpoint") Endpoint endpoint); + + /** + * notifyCheckIgnoreTable + * + * @param endpoint endpoint + * @param table table + * @param reason reason + */ + @PostMapping("/notify/check/csv/ignore") + void notifyCheckIgnoreTable(@RequestParam("endpoint") Endpoint endpoint, @RequestParam("table") String table, + @RequestParam("reason") String reason); } \ No newline at end of file diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/CheckCsvController.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/CheckCsvController.java index 732079e..dc12d6b 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/CheckCsvController.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/CheckCsvController.java @@ -19,6 +19,7 @@ import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.constant.ConfigConstants; import org.opengauss.datachecker.common.web.Result; import org.opengauss.datachecker.extract.service.CsvManagementService; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; @@ -65,4 +66,14 @@ public class CheckCsvController { public void dispatcherTables(@RequestBody List list) { csvManagementService.dispatcherTables(list); } + + /** + * fetch table count + * + * @return table count + */ + @GetMapping("/fetch/csv/check/table/count") + public Result fetchCheckTableCount() { + return Result.success(csvManagementService.fetchCheckTableCount()); + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java index ac2f91e..be1cc63 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java @@ -89,6 +89,7 @@ public class BaseDataService { /** * query check table metadata list, and use rule.table + * it is not have any columns and primary columns * * @return table name list */ diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvListener.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvListener.java index da83cd8..1545d0a 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvListener.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvListener.java @@ -94,4 +94,12 @@ public interface CsvListener { * @param table table */ void releaseSliceCache(String table); + + /** + * notifyCheckIgnoreTable + * + * @param tableName tableName + * @param reason reason + */ + void notifyCheckIgnoreTable(String tableName, String reason); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java index 9a5b97c..abb96c5 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvReaderListener.java @@ -21,6 +21,7 @@ import org.apache.commons.io.input.TailerListenerAdapter; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.config.ConfigCache; +import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.SliceVo; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.MapUtils; @@ -48,10 +49,12 @@ public class CsvReaderListener implements CsvListener { private volatile Set logDuplicateCheck = new HashSet<>(); private Tailer tailer; private boolean isTailEnd = false; + private CheckingFeignClient checkingClient; @Override public void initCsvListener(CheckingFeignClient checkingClient) { log.info("csv reader listener is starting ."); + this.checkingClient = checkingClient; // creates and starts a Tailer for read writer logs in real time tailer = Tailer.create(new File(ConfigCache.getReader()), new TailerListenerAdapter() { @Override @@ -100,6 +103,11 @@ public class CsvReaderListener implements CsvListener { readerSliceMap.remove(table); } + @Override + public void notifyCheckIgnoreTable(String tableName, String reason) { + checkingClient.notifyCheckIgnoreTable(Endpoint.SOURCE, tableName, reason); + } + @Override public boolean isFinished() { return isTailEnd && readerSliceMap.isEmpty(); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java index 8ba1fe4..b8d044c 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/csv/CsvWriterListener.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.constant.ConfigConstants; import org.opengauss.datachecker.common.entry.csv.SliceIndexVo; +import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.enums.SliceIndexStatus; import org.opengauss.datachecker.common.entry.enums.SliceLogType; import org.opengauss.datachecker.common.entry.extract.SliceVo; @@ -175,6 +176,11 @@ public class CsvWriterListener implements CsvListener { } } + @Override + public void notifyCheckIgnoreTable(String tableName, String reason) { + checkingClient.notifyCheckIgnoreTable(Endpoint.SINK, tableName, reason); + } + @Override public boolean isFinished() { return isTailEnd && writerSliceMap.isEmpty(); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/BaseManagementService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/BaseManagementService.java deleted file mode 100644 index cc82a07..0000000 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/BaseManagementService.java +++ /dev/null @@ -1,25 +0,0 @@ -package org.opengauss.datachecker.extract.service; - -import org.opengauss.datachecker.extract.data.BaseDataService; -import org.springframework.stereotype.Service; - -import javax.annotation.Resource; -import java.util.List; - -/** - * BaseManagementService - * - * @author :wangchao - * @date :Created in 2023/7/18 - * @since :11 - */ -@Service -public class BaseManagementService { - @Resource - private BaseDataService baseDataService; - - public Integer fetchCheckTableCount() { - List nameList = baseDataService.bdsQueryTableNameList(); - return nameList.size(); - } -} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/CsvManagementService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/CsvManagementService.java index 13099e7..ab65d3a 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/CsvManagementService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/CsvManagementService.java @@ -19,6 +19,7 @@ import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.constant.ConfigConstants; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.service.DynamicThreadPoolManager; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.extract.client.CheckingFeignClient; @@ -33,6 +34,7 @@ import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -49,6 +51,8 @@ public class CsvManagementService { @Resource private BaseDataService baseDataService; @Resource + private MetaDataService metaDataService; + @Resource private DynamicThreadPoolManager dynamicThreadPoolManager; @Resource private SliceRegister sliceRegister; @@ -56,6 +60,7 @@ public class CsvManagementService { private CheckingFeignClient checkingClient; private CsvListener listener; private SliceDispatcher sliceDispatcher = null; + private Endpoint currentEndpoint; /** * start csv process. @@ -65,15 +70,14 @@ public class CsvManagementService { public void startCsvProcess() { // init dynamic thread pool monitor dynamicThreadPoolManager.dynamicThreadPoolMonitor(); - + currentEndpoint = ConfigCache.getValue(ConfigConstants.ENDPOINT, Endpoint.class); // start listener of reader or writer logs - if (Objects.equals(Endpoint.SOURCE, ConfigCache.getValue(ConfigConstants.ENDPOINT, Endpoint.class))) { + if (Objects.equals(Endpoint.SOURCE, currentEndpoint)) { // load check table list listener = new CsvReaderListener(); } else { listener = new CsvWriterListener(); } - baseDataService.bdsQueryTableMetadataList(); listener.initCsvListener(checkingClient); // start slice dispatcher core thread sliceDispatcher = new SliceDispatcher(dynamicThreadPoolManager, sliceRegister, baseDataService, listener); @@ -83,7 +87,6 @@ public class CsvManagementService { public void startCsvNoSliceLogProcess() { dynamicThreadPoolManager.dynamicThreadPoolMonitor(); - baseDataService.bdsQueryTableMetadataList(); TableDispatcher tableDispatcher = new TableDispatcher(dynamicThreadPoolManager, sliceRegister, baseDataService); Thread thread = new Thread(tableDispatcher); thread.start(); @@ -102,4 +105,15 @@ public class CsvManagementService { public void dispatcherTables(List list) { sliceDispatcher.addSliceTables(list); } + + /** + * fetchCheckTableCount + * + * @return table count + */ + public int fetchCheckTableCount() { + Map tableMetadataMap = metaDataService.mdsQueryMetaDataOfSchema(); + currentEndpoint = ConfigCache.getValue(ConfigConstants.ENDPOINT, Endpoint.class); + return Objects.equals(Endpoint.SOURCE, currentEndpoint) ? tableMetadataMap.size() : 0; + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java index 0939450..ecea8e7 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceDispatcher.java @@ -25,6 +25,7 @@ import org.opengauss.datachecker.common.entry.extract.SliceVo; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.entry.extract.Topic; import org.opengauss.datachecker.common.service.DynamicThreadPoolManager; +import org.opengauss.datachecker.common.util.FileUtils; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.ThreadUtil; import org.opengauss.datachecker.extract.cache.TopicCache; @@ -32,9 +33,9 @@ import org.opengauss.datachecker.extract.data.BaseDataService; import org.opengauss.datachecker.extract.data.csv.CsvListener; import org.opengauss.datachecker.extract.slice.factory.SliceFactory; -import java.io.File; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -97,14 +98,14 @@ public class SliceDispatcher implements Runnable { // check table by rule of table if (!baseDataService.checkTableContains(table)) { log.info("distributors ignore [{}] table shards based on table rules", table); - notifyIgnoreCsvName(endPoint, table, "table rules"); + notifyIgnoreCsvName(endPoint, table, "tableNoMatch"); listener.releaseSliceCache(table); continue; } TableMetadata tableMetadata = baseDataService.queryTableMetadata(table); if (!tableMetadata.hasPrimary()) { log.info("distributors ignore [{}] table because of it's no primary", table); - notifyIgnoreCsvName(endPoint, table, "table no primary"); + notifyIgnoreCsvName(endPoint, table, "tableNoPrimary"); listener.releaseSliceCache(table); continue; } @@ -147,16 +148,17 @@ public class SliceDispatcher implements Runnable { } private void notifyIgnoreCsvName(Endpoint endPoint, String ignoreCsvTableName, String reason) { + listener.notifyCheckIgnoreTable(ignoreCsvTableName, reason); if (Objects.equals(Endpoint.SOURCE, endPoint)) { String csvDataPath = ConfigCache.getCsvData(); List tableSliceList = listener.fetchTableSliceList(ignoreCsvTableName); - tableSliceList.forEach(slice -> { - String ignoreCsvName = slice.getName(); - File file = new File(csvDataPath, ignoreCsvName); - if (file.exists() && file.renameTo(new File(csvDataPath, ignoreCsvName + ".check"))) { - log.debug("rename csv sharding completed [{}] by {}", ignoreCsvName, reason); - } - }); + Optional.ofNullable(tableSliceList) + .ifPresent(list -> list.forEach(slice -> { + String ignoreCsvName = slice.getName(); + if (FileUtils.renameTo(csvDataPath, ignoreCsvName)) { + log.debug("rename csv sharding completed [{}] by {}", ignoreCsvName, reason); + } + })); } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java index a9ab421..9a67e25 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java @@ -26,6 +26,7 @@ import org.springframework.kafka.support.SendResult; import org.springframework.lang.NonNull; import org.springframework.util.concurrent.ListenableFuture; +import java.math.BigDecimal; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.util.List; @@ -49,6 +50,20 @@ public class SliceResultSetSender { private final List columnMetas; private final List primary; private final String tableName; + private final ResultParseConsumer, ColumnsMetaData> largeDigitalType = + (value, result, column) -> { + if (isScientificNotation(value)) { + result.put(column.getColumnName(), new BigDecimal(value).toPlainString()); + } else { + result.put(column.getColumnName(), value); + } + }; + private final ResultParseConsumer, ColumnsMetaData> defaultConsumer = + (value, result, column) -> result.put(column.getColumnName(), value); + private final ResultParseConsumer, ColumnsMetaData> binaryAndBlobConsumer = + (value, result, column) -> result.put(column.getColumnName(), value.substring(1)); + private final ResultParseConsumer, ColumnsMetaData> csvNullValueConsumer = + (value, result, column) -> result.put(column.getColumnName(), CSV_NULL_VALUE); /** * constructor @@ -65,6 +80,25 @@ public class SliceResultSetSender { this.kafkaOperate = kafkaOperate; } + /** + * ResultParseConsumer + * + * @param result row original text + * @param parse result map + * @param column metadata + */ + @FunctionalInterface + private interface ResultParseConsumer { + /** + * ResultParseConsumer + * + * @param value result row original text + * @param result parse result map + * @param column column metadata + */ + void accept(String value, Map result, ColumnsMetaData column); + } + /** * resultSetTranslateAndSendRandom * @@ -199,20 +233,28 @@ public class SliceResultSetSender { private void parse(String[] nextLine, Map result) { int idx; + String tmpValue; for (ColumnsMetaData column : columnMetas) { idx = column.getOrdinalPosition() - 1; - if (CSV_NULL_VALUE.equalsIgnoreCase(nextLine[idx])) { - result.put(column.getColumnName(), CSV_NULL_VALUE); + tmpValue = nextLine[idx]; + if (CSV_NULL_VALUE.equalsIgnoreCase(tmpValue)) { + csvNullValueConsumer.accept(tmpValue, result, column); } else { if (isBinaryOrBlob(column.getColumnType())) { - result.put(column.getColumnName(), nextLine[idx].substring(1)); + binaryAndBlobConsumer.accept(tmpValue, result, column); + } else if (MetaDataUtil.isLargeDigitalTypeKey(column)) { + largeDigitalType.accept(tmpValue, result, column); } else { - result.put(column.getColumnName(), nextLine[idx]); + defaultConsumer.accept(tmpValue, result, column); } } } } + private boolean isScientificNotation(String value) { + return value.contains("E") || value.contains("e"); + } + /** * 判断当前类型是否是binary(binary/varbinary) 或者 blob(blob,tinyblob,blob,mediumblob,longblob) 类型 * diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/MysqlResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/MysqlResultSetHandler.java index d4bbb5f..315fa13 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/MysqlResultSetHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/MysqlResultSetHandler.java @@ -21,11 +21,19 @@ import org.opengauss.datachecker.common.util.HexUtil; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.format.DateTimeFormatter; +import java.util.Calendar; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; +import com.mysql.cj.result.Field; +import org.opengauss.datachecker.extract.task.functional.MysqlTypeHandler; +import org.springframework.lang.NonNull; + /** * MysqlResultSetHandler * @@ -41,6 +49,7 @@ public class MysqlResultSetHandler extends ResultSetHandler { private static final List doubleTypeList = List.of(MysqlType.DOUBLE, MysqlType.DOUBLE_UNSIGNED); private static final List decimalTypeList = List.of(MysqlType.DECIMAL, MysqlType.DECIMAL_UNSIGNED); private final Map typeHandlers = new ConcurrentHashMap<>(); + private final Map mysqlTypeHandlers = new ConcurrentHashMap<>(); { TypeHandler binaryToString = (rs, columnLabel) -> bytesToString(rs.getBytes(columnLabel)); @@ -60,10 +69,11 @@ public class MysqlResultSetHandler extends ResultSetHandler { // date time timestamp typeHandlers.put(MysqlType.DATE, this::getDateFormat); - typeHandlers.put(MysqlType.DATETIME, this::getTimestampFormat); typeHandlers.put(MysqlType.TIME, this::getTimeFormat); - typeHandlers.put(MysqlType.TIMESTAMP, this::getTimestampFormat); typeHandlers.put(MysqlType.YEAR, this::getYearFormat); + + mysqlTypeHandlers.put(MysqlType.TIMESTAMP, this::getMysqlTimestampFormat); + mysqlTypeHandlers.put(MysqlType.DATETIME, this::getMysqlTimestampFormat); } private String bitToString(ResultSet resultSet, String columnLabel, int precision) throws SQLException { @@ -82,6 +92,7 @@ public class MysqlResultSetHandler extends ResultSetHandler { String columnLabel = rsmd.getColumnLabel(columnIdx); String columnTypeName = rsmd.getColumnTypeName(columnIdx); final MysqlType mysqlType = MysqlType.getByName(columnTypeName); + Field[] fields = getResultSetFields(rsmd); if (MysqlType.BIT.equals(mysqlType)) { int precision = rsmd.getPrecision(columnIdx); return bitToString(resultSet, columnLabel, precision); @@ -112,12 +123,30 @@ public class MysqlResultSetHandler extends ResultSetHandler { } else if (typeHandlers.containsKey(mysqlType)) { return typeHandlers.get(mysqlType) .convert(resultSet, columnLabel); + } else if (mysqlTypeHandlers.containsKey(mysqlType)) { + return mysqlTypeHandlers.get(mysqlType) + .convert(resultSet, columnLabel, fields[columnIdx - 1]); } else { Object object = resultSet.getObject(columnLabel); return Objects.isNull(object) ? NULL : object.toString(); } } + private Field[] getResultSetFields(ResultSetMetaData rsmd) { + if (rsmd instanceof com.mysql.cj.jdbc.result.ResultSetMetaData) { + return ((com.mysql.cj.jdbc.result.ResultSetMetaData) rsmd).getFields(); + } + return new Field[0]; + } + + private String getMysqlTimestampFormat(@NonNull ResultSet resultSet, String columnLabel, + com.mysql.cj.result.Field field) throws SQLException { + final Timestamp timestamp = + resultSet.getTimestamp(columnLabel, Calendar.getInstance(TimeZone.getTimeZone("GMT+8"))); + DateTimeFormatter dateTimeFormatter = TIMESTAMP_MAPPER.get(field.getDecimals()); + return Objects.nonNull(timestamp) ? dateTimeFormatter.format(timestamp.toLocalDateTime()) : NULL; + } + /** * mysqlType is Float Double * diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java index 52b71af..f660720 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java @@ -15,15 +15,23 @@ package org.opengauss.datachecker.extract.task; +import org.opengauss.core.Field; import org.opengauss.datachecker.common.util.HexUtil; +import org.opengauss.datachecker.extract.task.functional.OpgsTypeHandler; +import org.opengauss.jdbc.PgResultSetMetaData; +import org.springframework.lang.NonNull; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.format.DateTimeFormatter; +import java.util.Calendar; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; /** @@ -35,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public class OpenGaussResultSetHandler extends ResultSetHandler { private static final Map typeHandlers = new ConcurrentHashMap<>(); + private static final Map opgsTypeHandlers = new ConcurrentHashMap<>(); { // byte binary blob @@ -75,8 +84,9 @@ public class OpenGaussResultSetHandler extends ResultSetHandler { // date time timestamp typeHandlers.put(OpenGaussType.DATE, this::getDateFormat); typeHandlers.put(OpenGaussType.TIME, this::getTimeFormat); - typeHandlers.put(OpenGaussType.TIMESTAMP, this::getTimestampFormat); - typeHandlers.put(OpenGaussType.TIMESTAMPTZ, this::getTimestampFormat); + + opgsTypeHandlers.put(OpenGaussType.TIMESTAMP, this::getOpgsTimestampFormat); + opgsTypeHandlers.put(OpenGaussType.TIMESTAMPTZ, this::getOpgsTimestampFormat); } /** @@ -95,6 +105,14 @@ public class OpenGaussResultSetHandler extends ResultSetHandler { super(supplyZero); } + private String getOpgsTimestampFormat(@NonNull ResultSet resultSet, String columnLabel, Field field) + throws SQLException { + final Timestamp timestamp = + resultSet.getTimestamp(columnLabel, Calendar.getInstance(TimeZone.getTimeZone("GMT+8"))); + DateTimeFormatter dateTimeFormatter = TIMESTAMP_MAPPER.get(field.getMod()); + return Objects.nonNull(timestamp) ? dateTimeFormatter.format(timestamp.toLocalDateTime()) : NULL; + } + /** * binaryToString * @@ -167,12 +185,23 @@ public class OpenGaussResultSetHandler extends ResultSetHandler { } else if (typeHandlers.containsKey(columnTypeName)) { return typeHandlers.get(columnTypeName) .convert(resultSet, columnLabel); + } else if (opgsTypeHandlers.containsKey(columnTypeName)) { + return opgsTypeHandlers.get(columnTypeName) + .convert(resultSet, columnLabel, getResultSetField(columnIdx, rsmd)); } else { Object object = resultSet.getObject(columnLabel); return Objects.isNull(object) ? NULL : object.toString(); } } + private Field getResultSetField(int columnIdx, ResultSetMetaData rsmd) throws SQLException { + if (rsmd instanceof PgResultSetMetaData) { + return ((PgResultSetMetaData) rsmd).getField(columnIdx); + } else { + return null; + } + } + private String convertNumericToString(ResultSetMetaData rsmd, ResultSet resultSet, int columnIdx) throws SQLException { int precision = rsmd.getPrecision(columnIdx); @@ -346,6 +375,11 @@ public class OpenGaussResultSetHandler extends ResultSetHandler { */ String TIME = "time"; + /** + * year + */ + String YEAR = "year"; + /** * opengauss data type : timestamp */ diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java index 8138acc..31c29ae 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java @@ -16,6 +16,7 @@ package org.opengauss.datachecker.extract.task; import org.apache.logging.log4j.Logger; +import org.opengauss.datachecker.common.util.DateTimeFormatterMap; import org.opengauss.datachecker.common.util.HexUtil; import org.opengauss.datachecker.common.util.LogUtils; import org.springframework.lang.NonNull; @@ -56,6 +57,11 @@ public abstract class ResultSetHandler { protected static final DateTimeFormatter TIMESTAMP_NANOS = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"); protected static final DateTimeFormatter TIMESTAMP = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + /** + * TIMESTAMP_MAPPER {@link DateTimeFormatterMap} + */ + protected static final DateTimeFormatterMap TIMESTAMP_MAPPER = new DateTimeFormatterMap(); protected static final String EMPTY = ""; protected static final String NULL = null; @@ -363,7 +369,7 @@ public abstract class ResultSetHandler { * * @param resultSet resultSet * @param columnLabel columnLabel - * @return result + * @return result result * @throws SQLException SQLException */ String convert(ResultSet resultSet, String columnLabel) throws SQLException; diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/MysqlTypeHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/MysqlTypeHandler.java new file mode 100644 index 0000000..13c71dc --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/MysqlTypeHandler.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.task.functional; + +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * MysqlTypeHandler + * + * @author :wangchao + * @date :Created in 2024/2/26 + * @since :11 + */ +@FunctionalInterface +public interface MysqlTypeHandler { + /** + * result convert to string + * + * @param resultSet resultSet + * @param columnLabel columnLabel + * @param field field + * @return result result + * @throws SQLException SQLException + */ + String convert(ResultSet resultSet, String columnLabel, com.mysql.cj.result.Field field) throws SQLException; +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractBaseController.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/OpgsTypeHandler.java similarity index 43% rename from datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractBaseController.java rename to datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/OpgsTypeHandler.java index 3334ffb..259f1f9 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractBaseController.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/functional/OpgsTypeHandler.java @@ -13,36 +13,31 @@ * See the Mulan PSL v2 for more details. */ -package org.opengauss.datachecker.extract.controller; +package org.opengauss.datachecker.extract.task.functional; -import io.swagger.v3.oas.annotations.tags.Tag; -import org.opengauss.datachecker.common.web.Result; -import org.opengauss.datachecker.extract.service.BaseManagementService; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RestController; +import org.opengauss.core.Field; -import javax.annotation.Resource; +import java.sql.ResultSet; +import java.sql.SQLException; /** - * data base service + * OpgsTypeHandler + * result convert to string functional interface * * @author :wangchao - * @date :Created in 2022/6/23 + * @date :Created in 2024/2/26 * @since :11 */ -@Tag(name = "data base service") -@RestController -public class ExtractBaseController { - @Resource - private BaseManagementService baseManagementService; - +@FunctionalInterface +public interface OpgsTypeHandler { /** - * fetch table count + * result convert to string * - * @return table count + * @param resultSet resultSet + * @param columnLabel columnLabel + * @param field field + * @return result result + * @throws SQLException SQLException */ - @GetMapping("/fetch/check/table/count") - public Result fetchCheckTableCount() { - return Result.success(baseManagementService.fetchCheckTableCount()); - } + String convert(ResultSet resultSet, String columnLabel, Field field) throws SQLException; } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/MetaDataUtil.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/MetaDataUtil.java index 0dd28d8..2081d6b 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/MetaDataUtil.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/MetaDataUtil.java @@ -39,13 +39,17 @@ public class MetaDataUtil { List.of("integer", "int", "uint1", "uint2", "uint4", "uint8", "long", "decimal", "numeric", "smallint", "NUMBER", "tinyint", "mediumint", "bigint"); private static final List dataTypes = - List.of("integer", "int", "uint1", "uint2", "uint4", "uint8", "long", "decimal", "numeric", "NUMBER","VARCHAR2", - "smallint", "tinyint", "mediumint", "bigint", "character", "char", "varchar", "character varying", "CHAR"); + List.of("integer", "int", "uint1", "uint2", "uint4", "uint8", "long", "decimal", "numeric", "NUMBER", + "VARCHAR2", "smallint", "tinyint", "mediumint", "bigint", "character", "char", "varchar", + "character varying", "CHAR"); private static final List digitalDataTypes = List.of("integer", "int", "uint1", "uint2", "uint4", "uint8", "long", "decimal", "numeric", "smallint", "number", "tinyint", "mediumint", "bigint", "double", "float"); + private static final List LARGE_DIGITAL_TYPES = + List.of("uint8", "long", "decimal", "numeric", "number", "bigint", "double", "float"); + /** * getTableColumns * @@ -116,6 +120,17 @@ public class MetaDataUtil { .toLowerCase(Locale.getDefault())); } + /** + * 大数字类型,结果可能为科学计数表示 + * + * @param primaryKey primaryKey + * @return boolean + */ + public static boolean isLargeDigitalTypeKey(ColumnsMetaData primaryKey) { + return LARGE_DIGITAL_TYPES.contains(primaryKey.getDataType() + .toLowerCase(Locale.getDefault())); + } + public static boolean isInvalidPrimaryKey(ColumnsMetaData primaryKey) { if (primaryKey.getColumnKey() != ColumnKey.PRI) { return false; diff --git a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/MysqlResultSetTest.java b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/MysqlResultSetTest.java index a46ebd1..7c54f1c 100644 --- a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/MysqlResultSetTest.java +++ b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/MysqlResultSetTest.java @@ -79,6 +79,12 @@ public class MysqlResultSetTest extends BaseMysqlMapper { dataResultSetHandler.testTable(schema, "t_text", "init_t_text.sql"); } + @DisplayName("test mysql t_time") + @Test + public void testTime() { + dataResultSetHandler.testTable(schema, "t_time", "init_t_time.sql"); + } + @AfterAll void dropTestDatabaseT() { super.dropTestDb(testDatabaseInitScript); diff --git a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/OpenGaussResultSetTest.java b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/OpenGaussResultSetTest.java index 01017ab..de637f4 100644 --- a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/OpenGaussResultSetTest.java +++ b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/OpenGaussResultSetTest.java @@ -80,6 +80,12 @@ public class OpenGaussResultSetTest extends BaseOpenGaussMapper { dataResultSetHandler.testTable(schema, "t_text", "init_t_text_og.sql"); } + @DisplayName("test openGauss t_time") + @Test + public void testTime() { + dataResultSetHandler.testTable(schema, "t_time", "init_t_time.sql"); + } + @AfterAll void dropTestDatabaseT() { super.dropTestDb(testDatabaseInitScript); diff --git a/datachecker-extract/src/test/resources/mysql_opgs/expect/t_time.json b/datachecker-extract/src/test/resources/mysql_opgs/expect/t_time.json new file mode 100644 index 0000000..fb25a2d --- /dev/null +++ b/datachecker-extract/src/test/resources/mysql_opgs/expect/t_time.json @@ -0,0 +1,47 @@ +[ + { + "c_time1": "18:34:43", + "c_timestamp": "2022-12-16 11:29:52", + "c_datetime": "2024-02-17 18:34:46", + "c_date": "2024-02-17", + "c_time": "2022-12-16 11:04:50.004300", + "c_year": "2024", + "id": "1" + }, + { + "c_time1": "18:34:43", + "c_timestamp": "2022-12-16 11:04:50", + "c_datetime": "2024-02-17 18:34:46", + "c_date": "2024-02-17", + "c_time": "2022-12-16 11:04:50.004300", + "c_year": "2024", + "id": "2" + }, + { + "c_time1": "18:34:44", + "c_timestamp": "2022-12-16 11:39:52", + "c_datetime": "2024-02-17 18:34:47", + "c_date": "2024-02-17", + "c_time": "2022-12-16 11:14:50.004300", + "c_year": "2024", + "id": "3" + }, + { + "c_time1": "18:34:45", + "c_timestamp": "2022-12-16 11:39:52", + "c_datetime": "2024-02-17 18:34:48", + "c_date": "2024-02-17", + "c_time": "2022-12-16 11:14:50.004300", + "c_year": "2024", + "id": "4" + }, + { + "c_time1": "16:09:09", + "c_timestamp": "2024-02-20 16:09:05", + "c_datetime": "2024-02-20 16:09:10", + "c_date": "2024-02-20", + "c_time": "2024-02-20 16:09:03", + "c_year": "2024", + "id": "5" + } +] \ No newline at end of file diff --git a/datachecker-extract/src/test/resources/mysql_opgs/sql/init_t_time.sql b/datachecker-extract/src/test/resources/mysql_opgs/sql/init_t_time.sql new file mode 100644 index 0000000..a037d85 --- /dev/null +++ b/datachecker-extract/src/test/resources/mysql_opgs/sql/init_t_time.sql @@ -0,0 +1,19 @@ +use test_schema; + +DROP TABLE IF EXISTS `t_time`; +CREATE TABLE `t_time` ( + `id` INT(11) NOT NULL AUTO_INCREMENT, + `c_time` TIMESTAMP(6) NULL DEFAULT NULL, + `c_timestamp` TIMESTAMP NULL DEFAULT NULL, + `c_year` YEAR NULL DEFAULT NULL, + `c_date` DATE NULL DEFAULT NULL, + `c_time1` TIME NULL DEFAULT NULL, + `c_datetime` DATETIME NULL DEFAULT NULL, + PRIMARY KEY (`id`) USING BTREE +); + +INSERT INTO `t_time` VALUES (1, '2022-12-16 11:04:50.004300', '2022-12-16 11:29:52', '2024', '2024-02-17', '18:34:43', '2024-02-17 18:34:46'); +INSERT INTO `t_time` VALUES (2, '2022-12-16 11:04:50.004300', '2022-12-16 11:04:50', '2024', '2024-02-17', '18:34:43', '2024-02-17 18:34:46'); +INSERT INTO `t_time` VALUES (3, '2022-12-16 11:14:50.004300', '2022-12-16 11:39:52', '2024', '2024-02-17', '18:34:44', '2024-02-17 18:34:47'); +INSERT INTO `t_time` VALUES (4, '2022-12-16 11:14:50.004300', '2022-12-16 11:39:52', '2024', '2024-02-17', '18:34:45', '2024-02-17 18:34:48'); +INSERT INTO `t_time` VALUES (5, '2024-02-20 16:09:03.000000', '2024-02-20 16:09:05', '2024', '2024-02-20', '16:09:09', '2024-02-20 16:09:10'); -- Gitee