From 6ce4b885463a566318764acd0198a9636c2c9bbd Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Fri, 5 Jan 2024 14:46:14 +0800 Subject: [PATCH] =?UTF-8?q?chameleon=E5=8D=8F=E5=90=8C=E6=A0=A1=E9=AA=8C?= =?UTF-8?q?=E5=9C=BA=E6=99=AF=EF=BC=8C=E5=A4=9A=E8=A1=A8=E4=B8=94=E8=A1=A8?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E8=BE=83=E5=B0=91=E5=9C=BA=E6=99=AF=EF=BC=8C?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=B9=B6=E5=8F=91=E5=BC=95=E8=B5=B7=E7=9A=84?= =?UTF-8?q?=E7=8A=B6=E6=80=81=E5=8F=8D=E9=A6=88=E6=8E=A5=E5=8F=A3=E6=8A=A5?= =?UTF-8?q?=E9=94=99=E9=97=AE=E9=A2=98=EF=BC=88=E5=81=B6=E7=8E=B0=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../check/service/TaskRegisterCenter.java | 6 +- .../controller/ExtractCleanController.java | 5 + .../extract/load/StartLoadRunner.java | 4 + .../extract/slice/SliceProcessorContext.java | 16 ++- .../slice/SliceStatusFeedbackService.java | 97 +++++++++++++++++++ .../slice/common/SliceResultSetSender.java | 1 + .../slice/process/CsvSliceProcessor.java | 6 +- .../task/OpenGaussCsvResultSetHandler.java | 44 +++++++++ .../task/OpenGaussResultSetHandler.java | 6 +- .../extract/task/ResultSetHandler.java | 3 +- .../extract/task/ResultSetHandlerFactory.java | 8 +- .../src/main/resources/application-sink.yml | 2 +- .../src/main/resources/application-source.yml | 2 +- 13 files changed, 184 insertions(+), 16 deletions(-) create mode 100644 datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceStatusFeedbackService.java create mode 100644 datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussCsvResultSetHandler.java diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java index f68a9f0..3afac29 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java @@ -166,8 +166,10 @@ public class TaskRegisterCenter { int tableReleaseSize = 0; if (sliceTableCounter.containsKey(tableName)) { tableReleaseSize = sliceTableCounter.get(tableName); - tableReleaseSize--; - sliceTableCounter.put(tableName, tableReleaseSize); + if (tableReleaseSize > 0) { + tableReleaseSize--; + sliceTableCounter.put(tableName, tableReleaseSize); + } log.debug("table [{}] slice release {}", tableName, tableReleaseSize); } return tableReleaseSize == 0; diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractCleanController.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractCleanController.java index 8788be2..b75c22e 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractCleanController.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractCleanController.java @@ -21,6 +21,7 @@ import org.opengauss.datachecker.common.web.Result; import org.opengauss.datachecker.extract.kafka.KafkaManagerService; import org.opengauss.datachecker.extract.service.CsvManagementService; import org.opengauss.datachecker.extract.service.DataExtractService; +import org.opengauss.datachecker.extract.slice.SliceProcessorContext; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestParam; @@ -45,6 +46,9 @@ public class ExtractCleanController { private ShutdownService shutdownService; @Resource private CsvManagementService csvManagementService; + @Resource + private SliceProcessorContext sliceProcessorContext; + /** * clear the endpoint information and reinitialize the environment. * @@ -68,6 +72,7 @@ public class ExtractCleanController { @PostMapping("/extract/shutdown") Result shutdown(@RequestBody String message) { csvManagementService.close(); + sliceProcessorContext.shutdownSliceStatusFeedbackService(); shutdownService.shutdown(message); return Result.success(); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/load/StartLoadRunner.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/load/StartLoadRunner.java index 6b6fb42..02628fc 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/load/StartLoadRunner.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/load/StartLoadRunner.java @@ -25,6 +25,7 @@ import org.opengauss.datachecker.extract.config.DataSourceConfig; import org.opengauss.datachecker.extract.config.DruidDataSourceConfig; import org.opengauss.datachecker.extract.resource.ResourceManager; import org.opengauss.datachecker.extract.service.ConfigManagement; +import org.opengauss.datachecker.extract.slice.SliceProcessorContext; import org.opengauss.datachecker.extract.task.ExtractThreadSupport; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; @@ -48,6 +49,8 @@ public class StartLoadRunner implements ApplicationRunner { private ResourceManager resourceManager; @Resource private MemoryManagerService memoryManagerService; + @Resource + private SliceProcessorContext sliceProcessorContext; @Override @@ -58,6 +61,7 @@ public class StartLoadRunner implements ApplicationRunner { resourceManager.initMaxConnectionCount(); initExtractContextDataSource(); + sliceProcessorContext.startSliceStatusFeedbackService(); } private void initExtractContextDataSource() { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java index 123f659..8f945e5 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java @@ -41,6 +41,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.sql.DataSource; +import java.util.Objects; /** * SliceProcessorContext @@ -66,6 +67,7 @@ public class SliceProcessorContext { private CheckingFeignClient checkingFeignClient; @Resource private TopicCache topicCache; + private SliceStatusFeedbackService sliceStatusFeedbackService; /** * create slice kafka agents @@ -140,9 +142,8 @@ public class SliceProcessorContext { * * @param sliceExtend slice extend */ - @Retryable(maxAttempts = 3) public void feedbackStatus(SliceExtend sliceExtend) { - checkingFeignClient.refreshRegisterSlice(sliceExtend); + sliceStatusFeedbackService.addFeedbackStatus(sliceExtend); } /** @@ -153,4 +154,15 @@ public class SliceProcessorContext { public MemoryOperations getMemoryDataOperations() { return new MemoryOperations(resourceManager); } + + public void startSliceStatusFeedbackService() { + sliceStatusFeedbackService = new SliceStatusFeedbackService(checkingFeignClient); + sliceStatusFeedbackService.feedback(); + } + + public void shutdownSliceStatusFeedbackService() { + if (Objects.nonNull(sliceStatusFeedbackService)) { + sliceStatusFeedbackService.stop(); + } + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceStatusFeedbackService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceStatusFeedbackService.java new file mode 100644 index 0000000..25a623b --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceStatusFeedbackService.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.slice; + +import org.apache.logging.log4j.Logger; +import org.opengauss.datachecker.common.entry.extract.SliceExtend; +import org.opengauss.datachecker.common.util.LogUtils; +import org.opengauss.datachecker.common.util.ThreadUtil; +import org.opengauss.datachecker.extract.client.CheckingFeignClient; + +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * SliceStatusFeedbackService + * + * @author :wangchao + * @date :Created in 2023/8/8 + * @since :11 + */ +public class SliceStatusFeedbackService { + private static final Logger log = LogUtils.getLogger(); + private static final Lock lock = new ReentrantLock(); + public static final String FEEDBACK_THREAD_NAME = "status-feedback-service"; + private final BlockingQueue feedbackQueue = new LinkedBlockingQueue<>(); + private final ExecutorService feedbackSender; + private final CheckingFeignClient checkingClient; + private boolean isCompleted = false; + + public SliceStatusFeedbackService(CheckingFeignClient checkingClient) { + this.checkingClient = checkingClient; + this.feedbackSender = ThreadUtil.newSingleThreadExecutor(); + } + + /** + * add slice status task for queue + * + * @param sliceExtend sliceExtend + */ + public void addFeedbackStatus(SliceExtend sliceExtend) { + lock.lock(); + try { + feedbackQueue.add(sliceExtend); + } finally { + lock.unlock(); + } + } + + /** + * close slice status feedback thread + */ + public void stop() { + this.isCompleted = true; + this.feedbackSender.shutdownNow(); + } + + /** + * start slice status feedback thread + */ + public void feedback() { + feedbackSender.submit(() -> { + Thread.currentThread() + .setName(FEEDBACK_THREAD_NAME); + SliceExtend sliceExt = null; + while (!isCompleted) { + try { + sliceExt = feedbackQueue.poll(); + if (Objects.isNull(sliceExt) && !isCompleted) { + ThreadUtil.sleepHalfSecond(); + continue; + } + checkingClient.refreshRegisterSlice(sliceExt); + log.info("feedback slice status of table [{}]", sliceExt); + } catch (Exception ex) { + log.error("feedback slice status error {}", sliceExt, ex); + } + } + }); + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java index 2c9ce93..29181b6 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java @@ -189,5 +189,6 @@ public class SliceResultSetSender { result.put(column.getColumnName(), nextLine[idx]); } } +// log.info("data:{}", result); } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvSliceProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvSliceProcessor.java index e26e106..1b97c4b 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvSliceProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvSliceProcessor.java @@ -61,11 +61,7 @@ public class CsvSliceProcessor extends AbstractSliceProcessor { log.info("csv slice processor start , [{}]", slice.toSimpleString()); TableMetadata tableMetadata = context.getTableMetaData(slice.getTable()); sliceExtend = createSliceExtend(tableMetadata.getTableHash()); - if (!slice.isEmptyTable()) { - executeQueryStatement(tableMetadata, sliceExtend); - } else { - log.info("table slice [{}] is empty ", slice.getName()); - } + executeQueryStatement(tableMetadata, sliceExtend); } catch (Exception ex) { log.error("csv slice processor , [{}] : ", slice.toSimpleString(), ex); } finally { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussCsvResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussCsvResultSetHandler.java new file mode 100644 index 0000000..c027502 --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussCsvResultSetHandler.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.task; + +import org.springframework.lang.NonNull; + +import java.math.BigDecimal; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * OpenGaussCsvResultSetHandler + * + * @author :wangchao + * @date :Created in 2022/9/19 + * @since :11 + */ +public class OpenGaussCsvResultSetHandler extends OpenGaussResultSetHandler { + + @Override + protected String floatNumberToString(@NonNull ResultSet resultSet, String columnLabel) throws SQLException { + String floatValue = resultSet.getString(columnLabel); + if (resultSet.wasNull()) { + return NULL; + } + if (isScientificNotation(floatValue)) { + return new BigDecimal(floatValue).toPlainString(); + } + return floatValue; + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java index 5ca5558..7ba79bb 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java @@ -34,7 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; * @since :11 */ public class OpenGaussResultSetHandler extends ResultSetHandler { - private final Map typeHandlers = new ConcurrentHashMap<>(); + protected final Map typeHandlers = new ConcurrentHashMap<>(); { TypeHandler byteaToString = (rs, columnLabel) -> bytesToString(rs.getBytes(columnLabel)); @@ -88,7 +88,7 @@ public class OpenGaussResultSetHandler extends ResultSetHandler { super(supplyZero); } - private String intToString(ResultSet rs, String columnLabel) throws SQLException { + protected String intToString(ResultSet rs, String columnLabel) throws SQLException { return rs.getString(columnLabel); } @@ -135,7 +135,7 @@ public class OpenGaussResultSetHandler extends ResultSetHandler { } } - private String getPgColumnTypeName(ResultSetMetaData rsmd, int columnIdx) throws SQLException { + protected String getPgColumnTypeName(ResultSetMetaData rsmd, int columnIdx) throws SQLException { String columnTypeName = rsmd.getColumnTypeName(columnIdx); if (columnTypeName.contains(OpenGaussType.pg_catalog)) { columnTypeName = rsmd.getColumnTypeName(columnIdx) diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java index 23fec51..a2383b5 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java @@ -99,6 +99,7 @@ public abstract class ResultSetHandler { } catch (SQLException ex) { log.error(" parse data metadata information exception", ex); } +// log.info("data:{}", values); return values; } @@ -188,7 +189,7 @@ public abstract class ResultSetHandler { return value; } - private boolean isScientificNotation(String value) { + protected boolean isScientificNotation(String value) { return value.contains("E") || value.contains("e"); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandlerFactory.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandlerFactory.java index 110c08e..70213ad 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandlerFactory.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandlerFactory.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.extract.task; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.constant.ConfigConstants; +import org.opengauss.datachecker.common.entry.enums.CheckMode; import org.opengauss.datachecker.common.entry.enums.DataBaseType; import java.util.Objects; @@ -37,10 +38,15 @@ public class ResultSetHandlerFactory { */ public ResultSetHandler createHandler(DataBaseType databaseType) { Boolean supplyZero = ConfigCache.getBooleanValue(ConfigConstants.FLOATING_POINT_DATA_SUPPLY_ZERO); + CheckMode checkMode = ConfigCache.getCheckMode(); if (Objects.equals(databaseType, DataBaseType.MS)) { return new MysqlResultSetHandler(); } else if (Objects.equals(databaseType, DataBaseType.OG)) { - return new OpenGaussResultSetHandler(supplyZero); + if (Objects.equals(CheckMode.CSV, checkMode)) { + return new OpenGaussCsvResultSetHandler(); + } else { + return new OpenGaussResultSetHandler(supplyZero); + } } else if (Objects.equals(databaseType, DataBaseType.O)) { return new OracleResultSetHandler(); } else { diff --git a/datachecker-extract/src/main/resources/application-sink.yml b/datachecker-extract/src/main/resources/application-sink.yml index ee32370..33378e3 100644 --- a/datachecker-extract/src/main/resources/application-sink.yml +++ b/datachecker-extract/src/main/resources/application-sink.yml @@ -10,7 +10,7 @@ spring: extract: schema: jack databaseType: OG #OG opengauss - object-size-expansion-factor: 4 + object-size-expansion-factor: 1 endpoint: SINK dataLoadMode: jdbc # jdbc or csv query-dop: 1 # jdbc Parallel Query config diff --git a/datachecker-extract/src/main/resources/application-source.yml b/datachecker-extract/src/main/resources/application-source.yml index a60dc28..e6b382f 100644 --- a/datachecker-extract/src/main/resources/application-source.yml +++ b/datachecker-extract/src/main/resources/application-source.yml @@ -10,7 +10,7 @@ spring: extract: schema: test databaseType: MS # MS mysql - object-size-expansion-factor: 4 + object-size-expansion-factor: 1 endpoint: SOURCE dataLoadMode: jdbc # jdbc or csv query-dop: 1 # jdbc Parallel Query config -- Gitee