From c3f654421e448b23c98c70703c930ffc797b17dc Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Fri, 30 Aug 2024 16:58:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0time=EF=BC=8Cvarbinary?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E4=B8=BB=E9=94=AE=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/DataExtractServiceImpl.java | 26 +++++++++------- .../slice/ExtractPointSwapManager.java | 16 +++++----- .../extract/util/MetaDataUtil.java | 31 ++++++++++--------- 3 files changed, 40 insertions(+), 33 deletions(-) diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java index abb4e2c..5c9955d 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java @@ -447,7 +447,7 @@ public class DataExtractServiceImpl implements DataExtractService { new Thread(() -> { checkPointManager = new ExtractPointSwapManager(kafkaTemplate, kafkaConsumerConfig); checkPointManager.setCheckPointSwapTopicName(ConfigCache.getValue(ConfigConstants.PROCESS_NO)); - LogUtils.info(log, "tableRegisterCheckPoint start pollSwapPoint thread"); + LogUtils.info(log, "start pollSwapPoint thread to register CheckPoint taskSize=" + taskList.size()); checkPointManager.pollSwapPoint(tableCheckPointCache); Endpoint endpoint = ConfigCache.getEndPoint(); CountDownLatch countDownLatch = new CountDownLatch(taskList.size()); @@ -474,17 +474,21 @@ public class DataExtractServiceImpl implements DataExtractService { } private void registerCheckPoint(ExtractTask task, Endpoint endpoint) { - String tableName = task.getTableName(); - LogUtils.info(log, "register check point [{}][{}]", endpoint, tableName); - CheckPoint checkPoint = new CheckPoint(dataAccessService); - List checkPointList = getCheckPoint(checkPoint, task.getTableMetadata()); - if (checkPointList == null || checkPointList.size() <= 2) { - checkPointList = List.of(); - tableCheckPointCache.put(tableName, checkPointList); + try { + String tableName = task.getTableName(); + LogUtils.info(log, "register check point [{}][{}]", endpoint, tableName); + CheckPoint checkPoint = new CheckPoint(dataAccessService); + List checkPointList = getCheckPoint(checkPoint, task.getTableMetadata()); + if (checkPointList == null || checkPointList.size() <= 2) { + checkPointList = List.of(); + tableCheckPointCache.put(tableName, checkPointList); + } + checkPointManager.send(new CheckPointData().setTableName(tableName) + .setDigit(checkPoint.checkPkNumber(task.getTableMetadata())) + .setCheckPointList(checkPointList)); + } catch (Exception e) { + log.error("register check point failed ", e); } - checkPointManager.send(new CheckPointData().setTableName(tableName) - .setDigit(checkPoint.checkPkNumber(task.getTableMetadata())) - .setCheckPointList(checkPointList)); } private String sliceTaskNameBuilder(@NonNull String tableName, int index) { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java index 158c083..ffdd607 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java @@ -56,7 +56,7 @@ public class ExtractPointSwapManager { private KafkaConsumerConfig kafkaConsumerConfig; public ExtractPointSwapManager(KafkaTemplate kafkaTemplate, - KafkaConsumerConfig kafkaConsumerConfig) { + KafkaConsumerConfig kafkaConsumerConfig) { this.kafkaTemplate = kafkaTemplate; this.endpoint = ConfigCache.getEndPoint(); this.endpoint = ConfigCache.getEndPoint(); @@ -87,7 +87,7 @@ public class ExtractPointSwapManager { tableCheckPointCache.put(pointData.getTableName(), translateDigitPoint(pointData)); deliveredCount.getAndIncrement(); LogUtils.info(log, "swap summarized checkpoint of table [{}]:[{}] ", deliveredCount, - pointData); + pointData); } }); } else { @@ -102,17 +102,17 @@ public class ExtractPointSwapManager { } } LogUtils.warn(log, "close check point swap consumer {} :{}", checkPointSwapTopicName, - kafkaConsumer.groupMetadata() - .groupId()); + kafkaConsumer.groupMetadata() + .groupId()); kafkaConsumerConfig.closeConsumer(kafkaConsumer); }); } private List translateDigitPoint(CheckPointData pointData) { return pointData.isDigit() ? pointData.getCheckPointList() - .stream() - .map(obj -> Long.parseLong((String) obj)) - .collect(Collectors.toList()) : pointData.getCheckPointList(); + .stream() + .map(obj -> Long.parseLong((String) obj)) + .collect(Collectors.toList()) : pointData.getCheckPointList(); } private void trySubscribe() { @@ -130,6 +130,7 @@ public class ExtractPointSwapManager { kafkaConsumer.subscribe(List.of(checkPointSwapTopicName)); Map> listTopics = kafkaConsumer.listTopics(); isSubscribe = listTopics.containsKey(checkPointSwapTopicName); + LogUtils.info(log, "subscribe check point swap topic {} ", checkPointSwapTopicName); } catch (Exception ex) { LogUtils.warn(log, "subscribe {} failed", checkPointSwapTopicName); } @@ -138,7 +139,6 @@ public class ExtractPointSwapManager { public void setCheckPointSwapTopicName(String process) { this.checkPointSwapTopicName = String.format(Constants.SWAP_POINT_TOPIC_TEMP, process); - LogUtils.info(log, "check point swap topic {}", checkPointSwapTopicName); } public void close() { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/MetaDataUtil.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/MetaDataUtil.java index dd95305..29fc2d4 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/MetaDataUtil.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/MetaDataUtil.java @@ -36,19 +36,19 @@ import java.util.stream.Collectors; */ public class MetaDataUtil { private static final List numberDataTypes = - List.of("integer", "int", "uint1", "uint2", "uint4", "uint8", "long", "decimal", "numeric", "smallint", - "NUMBER", "tinyint", "mediumint", "bigint"); + List.of("integer", "int", "uint1", "uint2", "uint4", "uint8", "long", "decimal", "numeric", + "smallint", "NUMBER", "tinyint", "mediumint", "bigint"); private static final List dataTypes = - List.of("integer", "int", "uint1", "uint2", "uint4", "uint8", "long", "decimal", "numeric", "NUMBER", - "VARCHAR2", "smallint", "tinyint", "mediumint", "bigint", "character", "char", "varchar", - "character varying", "CHAR"); + List.of("integer", "int", "uint1", "uint2", "uint4", "uint8", "long", "decimal", "numeric", + "NUMBER", "VARCHAR2", "smallint", "tinyint", "mediumint", "bigint", "character", "char", "varchar", + "character varying", "CHAR", "time without time zone", "\"varbinary\"", "varbinary", "time"); private static final List digitalDataTypes = - List.of("integer", "int", "uint1", "uint2", "uint4", "uint8", "long", "decimal", "numeric", "smallint", - "number", "tinyint", "mediumint", "bigint", "double", "float"); + List.of("integer", "int", "uint1", "uint2", "uint4", "uint8", "long", "decimal", "numeric", "smallint", + "number", "tinyint", "mediumint", "bigint", "double", "float"); private static final List LARGE_DIGITAL_TYPES = - List.of("uint8", "long", "decimal", "numeric", "number", "bigint", "double", "float"); + List.of("uint8", "long", "decimal", "numeric", "number", "bigint", "double", "float"); /** * getTableColumns @@ -87,9 +87,9 @@ public class MetaDataUtil { return emptyList(); } return columnsMetas.stream() - .sorted(Comparator.comparing(ColumnsMetaData::getOrdinalPosition)) - .map(ColumnsMetaData::getColumnName) - .collect(Collectors.toUnmodifiableList()); + .sorted(Comparator.comparing(ColumnsMetaData::getOrdinalPosition)) + .map(ColumnsMetaData::getColumnName) + .collect(Collectors.toUnmodifiableList()); } /** @@ -123,7 +123,7 @@ public class MetaDataUtil { */ public static boolean isDigitKey(ColumnsMetaData columnKey) { return digitalDataTypes.contains(columnKey.getDataType() - .toLowerCase(Locale.getDefault())); + .toLowerCase(Locale.getDefault())); } public static boolean isDigitKey(String dataType) { @@ -138,13 +138,16 @@ public class MetaDataUtil { */ public static boolean isLargeDigitalTypeKey(ColumnsMetaData primaryKey) { return LARGE_DIGITAL_TYPES.contains(primaryKey.getDataType() - .toLowerCase(Locale.getDefault())); + .toLowerCase(Locale.getDefault())); } public static boolean isInvalidPrimaryKey(ColumnsMetaData primaryKey) { if (primaryKey.getColumnKey() != ColumnKey.PRI) { return false; } - return !dataTypes.contains(primaryKey.getDataType()); + return dataTypes.stream() + .filter(dataType -> dataType.equalsIgnoreCase(primaryKey.getDataType())) + .findAny() + .isEmpty(); } } -- Gitee