diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java index f68a9f0f7e0fddb583115a492ca8d37896f4860a..3afac29da2a399a9ca43ce6caed1ad01d0fab26d 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/TaskRegisterCenter.java @@ -166,8 +166,10 @@ public class TaskRegisterCenter { int tableReleaseSize = 0; if (sliceTableCounter.containsKey(tableName)) { tableReleaseSize = sliceTableCounter.get(tableName); - tableReleaseSize--; - sliceTableCounter.put(tableName, tableReleaseSize); + if (tableReleaseSize > 0) { + tableReleaseSize--; + sliceTableCounter.put(tableName, tableReleaseSize); + } log.debug("table [{}] slice release {}", tableName, tableReleaseSize); } return tableReleaseSize == 0; diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractCleanController.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractCleanController.java index 8788be2a3e570650582eba3cce64df00a8305860..b75c22e050c4dab8c6ddee8e9d93154acf5b08a5 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractCleanController.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractCleanController.java @@ -21,6 +21,7 @@ import org.opengauss.datachecker.common.web.Result; import org.opengauss.datachecker.extract.kafka.KafkaManagerService; import org.opengauss.datachecker.extract.service.CsvManagementService; import org.opengauss.datachecker.extract.service.DataExtractService; +import org.opengauss.datachecker.extract.slice.SliceProcessorContext; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestParam; @@ -45,6 +46,9 @@ public class ExtractCleanController { private ShutdownService shutdownService; @Resource private CsvManagementService csvManagementService; + @Resource + private SliceProcessorContext sliceProcessorContext; + /** * clear the endpoint information and reinitialize the environment. * @@ -68,6 +72,7 @@ public class ExtractCleanController { @PostMapping("/extract/shutdown") Result shutdown(@RequestBody String message) { csvManagementService.close(); + sliceProcessorContext.shutdownSliceStatusFeedbackService(); shutdownService.shutdown(message); return Result.success(); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/load/StartLoadRunner.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/load/StartLoadRunner.java index 6b6fb42100281e679b79abf58d3385e2b3f9a4fd..02628fcc61e4fa2755c28a2413f680bbaa6cc748 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/load/StartLoadRunner.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/load/StartLoadRunner.java @@ -25,6 +25,7 @@ import org.opengauss.datachecker.extract.config.DataSourceConfig; import org.opengauss.datachecker.extract.config.DruidDataSourceConfig; import org.opengauss.datachecker.extract.resource.ResourceManager; import org.opengauss.datachecker.extract.service.ConfigManagement; +import org.opengauss.datachecker.extract.slice.SliceProcessorContext; import org.opengauss.datachecker.extract.task.ExtractThreadSupport; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; @@ -48,6 +49,8 @@ public class StartLoadRunner implements ApplicationRunner { private ResourceManager resourceManager; @Resource private MemoryManagerService memoryManagerService; + @Resource + private SliceProcessorContext sliceProcessorContext; @Override @@ -58,6 +61,7 @@ public class StartLoadRunner implements ApplicationRunner { resourceManager.initMaxConnectionCount(); initExtractContextDataSource(); + sliceProcessorContext.startSliceStatusFeedbackService(); } private void initExtractContextDataSource() { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java index 123f65981eed6818df662f07a0921995753c0760..8f945e5c6ad4ced2b067e5235479cfecf53b6b26 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java @@ -41,6 +41,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.sql.DataSource; +import java.util.Objects; /** * SliceProcessorContext @@ -66,6 +67,7 @@ public class SliceProcessorContext { private CheckingFeignClient checkingFeignClient; @Resource private TopicCache topicCache; + private SliceStatusFeedbackService sliceStatusFeedbackService; /** * create slice kafka agents @@ -140,9 +142,8 @@ public class SliceProcessorContext { * * @param sliceExtend slice extend */ - @Retryable(maxAttempts = 3) public void feedbackStatus(SliceExtend sliceExtend) { - checkingFeignClient.refreshRegisterSlice(sliceExtend); + sliceStatusFeedbackService.addFeedbackStatus(sliceExtend); } /** @@ -153,4 +154,15 @@ public class SliceProcessorContext { public MemoryOperations getMemoryDataOperations() { return new MemoryOperations(resourceManager); } + + public void startSliceStatusFeedbackService() { + sliceStatusFeedbackService = new SliceStatusFeedbackService(checkingFeignClient); + sliceStatusFeedbackService.feedback(); + } + + public void shutdownSliceStatusFeedbackService() { + if (Objects.nonNull(sliceStatusFeedbackService)) { + sliceStatusFeedbackService.stop(); + } + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceStatusFeedbackService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceStatusFeedbackService.java new file mode 100644 index 0000000000000000000000000000000000000000..25a623bbe7be7038e179b4750995a1363ea15720 --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceStatusFeedbackService.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.slice; + +import org.apache.logging.log4j.Logger; +import org.opengauss.datachecker.common.entry.extract.SliceExtend; +import org.opengauss.datachecker.common.util.LogUtils; +import org.opengauss.datachecker.common.util.ThreadUtil; +import org.opengauss.datachecker.extract.client.CheckingFeignClient; + +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * SliceStatusFeedbackService + * + * @author :wangchao + * @date :Created in 2023/8/8 + * @since :11 + */ +public class SliceStatusFeedbackService { + private static final Logger log = LogUtils.getLogger(); + private static final Lock lock = new ReentrantLock(); + public static final String FEEDBACK_THREAD_NAME = "status-feedback-service"; + private final BlockingQueue feedbackQueue = new LinkedBlockingQueue<>(); + private final ExecutorService feedbackSender; + private final CheckingFeignClient checkingClient; + private boolean isCompleted = false; + + public SliceStatusFeedbackService(CheckingFeignClient checkingClient) { + this.checkingClient = checkingClient; + this.feedbackSender = ThreadUtil.newSingleThreadExecutor(); + } + + /** + * add slice status task for queue + * + * @param sliceExtend sliceExtend + */ + public void addFeedbackStatus(SliceExtend sliceExtend) { + lock.lock(); + try { + feedbackQueue.add(sliceExtend); + } finally { + lock.unlock(); + } + } + + /** + * close slice status feedback thread + */ + public void stop() { + this.isCompleted = true; + this.feedbackSender.shutdownNow(); + } + + /** + * start slice status feedback thread + */ + public void feedback() { + feedbackSender.submit(() -> { + Thread.currentThread() + .setName(FEEDBACK_THREAD_NAME); + SliceExtend sliceExt = null; + while (!isCompleted) { + try { + sliceExt = feedbackQueue.poll(); + if (Objects.isNull(sliceExt) && !isCompleted) { + ThreadUtil.sleepHalfSecond(); + continue; + } + checkingClient.refreshRegisterSlice(sliceExt); + log.info("feedback slice status of table [{}]", sliceExt); + } catch (Exception ex) { + log.error("feedback slice status error {}", sliceExt, ex); + } + } + }); + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java index 2c9ce93419f427919f8310206c14ab0983ae1088..29181b63c184d429d9a106a3d6957c4028a56bac 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java @@ -189,5 +189,6 @@ public class SliceResultSetSender { result.put(column.getColumnName(), nextLine[idx]); } } +// log.info("data:{}", result); } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvSliceProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvSliceProcessor.java index e26e106670a3a02a0fb5b2d355a20ba2891b0fcf..1b97c4b71ffb8f56003c8d8ebd68d90ad8b25cb8 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvSliceProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/CsvSliceProcessor.java @@ -61,11 +61,7 @@ public class CsvSliceProcessor extends AbstractSliceProcessor { log.info("csv slice processor start , [{}]", slice.toSimpleString()); TableMetadata tableMetadata = context.getTableMetaData(slice.getTable()); sliceExtend = createSliceExtend(tableMetadata.getTableHash()); - if (!slice.isEmptyTable()) { - executeQueryStatement(tableMetadata, sliceExtend); - } else { - log.info("table slice [{}] is empty ", slice.getName()); - } + executeQueryStatement(tableMetadata, sliceExtend); } catch (Exception ex) { log.error("csv slice processor , [{}] : ", slice.toSimpleString(), ex); } finally { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussCsvResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussCsvResultSetHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..c027502c432ef4da3169822e93a5569c7183913a --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussCsvResultSetHandler.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.task; + +import org.springframework.lang.NonNull; + +import java.math.BigDecimal; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * OpenGaussCsvResultSetHandler + * + * @author :wangchao + * @date :Created in 2022/9/19 + * @since :11 + */ +public class OpenGaussCsvResultSetHandler extends OpenGaussResultSetHandler { + + @Override + protected String floatNumberToString(@NonNull ResultSet resultSet, String columnLabel) throws SQLException { + String floatValue = resultSet.getString(columnLabel); + if (resultSet.wasNull()) { + return NULL; + } + if (isScientificNotation(floatValue)) { + return new BigDecimal(floatValue).toPlainString(); + } + return floatValue; + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java index 5ca555814bfbab6e524080897cefeaee0d7a4cb7..7ba79bb008d5c201ba60bc651485a779e6a9a9fd 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/OpenGaussResultSetHandler.java @@ -34,7 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; * @since :11 */ public class OpenGaussResultSetHandler extends ResultSetHandler { - private final Map typeHandlers = new ConcurrentHashMap<>(); + protected final Map typeHandlers = new ConcurrentHashMap<>(); { TypeHandler byteaToString = (rs, columnLabel) -> bytesToString(rs.getBytes(columnLabel)); @@ -88,7 +88,7 @@ public class OpenGaussResultSetHandler extends ResultSetHandler { super(supplyZero); } - private String intToString(ResultSet rs, String columnLabel) throws SQLException { + protected String intToString(ResultSet rs, String columnLabel) throws SQLException { return rs.getString(columnLabel); } @@ -135,7 +135,7 @@ public class OpenGaussResultSetHandler extends ResultSetHandler { } } - private String getPgColumnTypeName(ResultSetMetaData rsmd, int columnIdx) throws SQLException { + protected String getPgColumnTypeName(ResultSetMetaData rsmd, int columnIdx) throws SQLException { String columnTypeName = rsmd.getColumnTypeName(columnIdx); if (columnTypeName.contains(OpenGaussType.pg_catalog)) { columnTypeName = rsmd.getColumnTypeName(columnIdx) diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java index 23fec51a030541b2582c10ace5bc6b509ffed7e4..a2383b50ee0c29db64cc4c32fb48ac421b9d4a40 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java @@ -99,6 +99,7 @@ public abstract class ResultSetHandler { } catch (SQLException ex) { log.error(" parse data metadata information exception", ex); } +// log.info("data:{}", values); return values; } @@ -188,7 +189,7 @@ public abstract class ResultSetHandler { return value; } - private boolean isScientificNotation(String value) { + protected boolean isScientificNotation(String value) { return value.contains("E") || value.contains("e"); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandlerFactory.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandlerFactory.java index 110c08e5c3c804b4000c85f2e7af7388672fbc29..70213ad0c935a54b136c834f2267465cd61fd7bf 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandlerFactory.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandlerFactory.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.extract.task; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.constant.ConfigConstants; +import org.opengauss.datachecker.common.entry.enums.CheckMode; import org.opengauss.datachecker.common.entry.enums.DataBaseType; import java.util.Objects; @@ -37,10 +38,15 @@ public class ResultSetHandlerFactory { */ public ResultSetHandler createHandler(DataBaseType databaseType) { Boolean supplyZero = ConfigCache.getBooleanValue(ConfigConstants.FLOATING_POINT_DATA_SUPPLY_ZERO); + CheckMode checkMode = ConfigCache.getCheckMode(); if (Objects.equals(databaseType, DataBaseType.MS)) { return new MysqlResultSetHandler(); } else if (Objects.equals(databaseType, DataBaseType.OG)) { - return new OpenGaussResultSetHandler(supplyZero); + if (Objects.equals(CheckMode.CSV, checkMode)) { + return new OpenGaussCsvResultSetHandler(); + } else { + return new OpenGaussResultSetHandler(supplyZero); + } } else if (Objects.equals(databaseType, DataBaseType.O)) { return new OracleResultSetHandler(); } else { diff --git a/datachecker-extract/src/main/resources/application-sink.yml b/datachecker-extract/src/main/resources/application-sink.yml index ee323704535117a931110611a373e84869119f52..33378e39718c6113c25adff0922d3be750d9a0a6 100644 --- a/datachecker-extract/src/main/resources/application-sink.yml +++ b/datachecker-extract/src/main/resources/application-sink.yml @@ -10,7 +10,7 @@ spring: extract: schema: jack databaseType: OG #OG opengauss - object-size-expansion-factor: 4 + object-size-expansion-factor: 1 endpoint: SINK dataLoadMode: jdbc # jdbc or csv query-dop: 1 # jdbc Parallel Query config diff --git a/datachecker-extract/src/main/resources/application-source.yml b/datachecker-extract/src/main/resources/application-source.yml index a60dc286b9fcd51768fa068eaf11e4ca924cc627..e6b382f34fb28b994f2276f2520bdda95d3fe9cb 100644 --- a/datachecker-extract/src/main/resources/application-source.yml +++ b/datachecker-extract/src/main/resources/application-source.yml @@ -10,7 +10,7 @@ spring: extract: schema: test databaseType: MS # MS mysql - object-size-expansion-factor: 4 + object-size-expansion-factor: 1 endpoint: SOURCE dataLoadMode: jdbc # jdbc or csv query-dop: 1 # jdbc Parallel Query config