From 565ed95a47de00801fdf0c6c4db728c6d24a8e08 Mon Sep 17 00:00:00 2001 From: buter Date: Thu, 18 Aug 2022 18:54:07 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=A0=A1=E9=AA=8C=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E6=8A=BD=E5=8F=96=E6=A8=A1=E5=9D=97=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/entry/debezium/DebePayload.java | 77 +++++ .../common/entry/debezium/DebeziumData.java | 50 ++++ .../common/entry/debezium/PayloadSource.java | 43 +++ .../DataConsolidationController.java | 82 ++++++ .../debe/DataConsolidationService.java | 67 +++++ .../debe/DataConsolidationServiceImpl.java | 266 ++++++++++++++++++ .../extract/debe/DebeziumDataHandler.java | 59 ++++ .../extract/debe/DebeziumDataLogs.java | 99 +++++++ .../debe/IncrementDataAnalysisService.java | 149 ++++++++++ 9 files changed, 892 insertions(+) create mode 100644 datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebePayload.java create mode 100644 datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebeziumData.java create mode 100644 datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/PayloadSource.java create mode 100644 datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/DataConsolidationController.java create mode 100644 datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationService.java create mode 100644 datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationServiceImpl.java create mode 100644 datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataHandler.java create mode 100644 datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataLogs.java create mode 100644 datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/IncrementDataAnalysisService.java diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebePayload.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebePayload.java new file mode 100644 index 0000000..3103dbe --- /dev/null +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebePayload.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.common.entry.debezium; + +import lombok.Data; + +import java.util.List; +import java.util.Map; + +/** + * DebePayload + * + * @author :wangchao + * @date :Created in 2022/6/30 + * @since :11 + */ +@Data +public class DebePayload { + private PayloadSource source; + Map before; + Map after; + private String databaseName; + private String schemaName; + private String ddl; + private List tableChanges; +} + +@Data +class PayloadTableChange { + private String id; + private String type; + private PayloadTable table; +} + +@Data +class PayloadTable { + private String defaultCharsetName; + private List primaryKeyColumnNames; + private List primaryKeyColumnChanges; + private List foreignKeyColumns; + private List uniqueColumns; + private List checkColumns; + private List columns; + private String comment; +} + +@Data +class PayloadTableColumns { + private String name; + private int jdbcType; + private String nativeType; + private String typeName; + private String typeExpression; + private String charsetName; + private int length; + private int scale; + private int position; + private boolean optional; + private String defaultValueExpression; + private boolean autoIncremented; + private boolean generated; + private String comment; + private List modifyKeys; +} \ No newline at end of file diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebeziumData.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebeziumData.java new file mode 100644 index 0000000..acac4fc --- /dev/null +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/DebeziumData.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.common.entry.debezium; + +import lombok.Data; + +import java.util.List; + +/** + * DebeziumData + * + * @author :wangchao + * @date :Created in 2022/6/24 + * @since :11 + */ +@Data +public class DebeziumData { + private DebePayload payload; +} + +@Data +class DebeSchema { + private String name; + private String type; + private List fields; + private boolean optional; + +} + +@Data +class Field { + private String name; + private String field; + private String type; + private List fields; + private boolean optional; +} \ No newline at end of file diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/PayloadSource.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/PayloadSource.java new file mode 100644 index 0000000..83db532 --- /dev/null +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/debezium/PayloadSource.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.common.entry.debezium; + +import lombok.Data; + +/** + * PayloadSource + * + * @author :wangchao + * @date :Created in 2022/6/30 + * @since :11 + */ +@Data +public class PayloadSource { + private String version; + private String connector; + private String name; + private long ts_ms; + private boolean snapshot; + private String db; + private String sequence; + private String schema; + private String table; + private String txId; + private String scn; + private String commit_scn; + private String lcr_position; +} + diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/DataConsolidationController.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/DataConsolidationController.java new file mode 100644 index 0000000..aac3532 --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/DataConsolidationController.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.controller; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.AllArgsConstructor; +import org.opengauss.datachecker.common.entry.check.IncrementCheckConfig; +import org.opengauss.datachecker.common.entry.check.IncrementCheckTopic; +import org.opengauss.datachecker.common.entry.extract.SourceDataLog; +import org.opengauss.datachecker.common.web.Result; +import org.opengauss.datachecker.extract.debe.DataConsolidationService; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +/** + * incremental verification debezium data integration + * + * @author :wangchao + * @date :Created in 2022/6/30 + * @since :11 + */ +@Tag(name = "incremental verification debezium data integration") +@RestController +@AllArgsConstructor +public class DataConsolidationController { + private final DataConsolidationService dataConsolidationService; + + /** + * Querying topic records + * + * @param topicName topicName + * @return topic records + */ + @Operation(summary = "Querying topic records") + @GetMapping("/extract/debezium/topic/records") + Result> getDebeziumTopicRecords(@RequestParam(name = "topicName") String topicName) { + return Result.success(dataConsolidationService.getDebeziumTopicRecords(topicName)); + } + + /** + * queries the topic information of the debezium + * + * @return topic information + */ + @Operation(summary = "queries the topic information of the debezium") + @PostMapping("/extract/debezium/topic/count") + Result getDebeziumTopicRecordCount() { + return Result.success(dataConsolidationService.getDebeziumTopicRecordOffSet()); + } + + /** + * configuring debezium-related environment information + * + * @param config config + * @return request result + */ + @Operation(summary = "configuring debezium-related environment information") + @PostMapping("/extract/debezium/topic/config") + Result configIncrementCheckEnvironment(@RequestBody IncrementCheckConfig config) { + dataConsolidationService.configIncrementCheckEnvironment(config); + return Result.success(); + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationService.java new file mode 100644 index 0000000..4b1d301 --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationService.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.debe; + +import org.opengauss.datachecker.common.entry.check.IncrementCheckConfig; +import org.opengauss.datachecker.common.entry.check.IncrementCheckTopic; +import org.opengauss.datachecker.common.entry.extract.SourceDataLog; + +import java.util.List; + +/** + * Debezium incremental log data merge service + * + * @author :wangchao + * @date :Created in 2022/7/1 + * @since :11 + */ +public interface DataConsolidationService { + /** + * Get the topic records of debezium, and analyze and merge the topic records + * + * @param topicName topic name + * @return topic records + */ + List getDebeziumTopicRecords(String topicName); + + /** + * Get the message record information of the topic corresponding to the debrizum listening table + * + * @return Return message consumption record + */ + IncrementCheckTopic getDebeziumTopicRecordOffSet(); + + /** + * Get the debezium listening table and record the offset information of the message corresponding to the topic + * + * @return return offset + */ + long getDebeziumTopicRecordEndOffSet(); + + /** + * Check whether the current extraction end is the source end + * + * @return Is it the source end + */ + boolean isSourceEndpoint(); + + /** + * Configure incremental verification (debezium configuration) + * + * @param config Incremental verification (debezium configuration) + */ + void configIncrementCheckEnvironment(IncrementCheckConfig config); +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationServiceImpl.java new file mode 100644 index 0000000..4835d82 --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DataConsolidationServiceImpl.java @@ -0,0 +1,266 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.debe; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.opengauss.datachecker.common.entry.check.IncrementCheckConfig; +import org.opengauss.datachecker.common.entry.check.IncrementCheckTopic; +import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.entry.extract.SourceDataLog; +import org.opengauss.datachecker.common.exception.DebeziumConfigException; +import org.opengauss.datachecker.common.util.IdGenerator; +import org.opengauss.datachecker.extract.cache.MetaDataCache; +import org.opengauss.datachecker.extract.config.ExtractProperties; +import org.opengauss.datachecker.extract.config.KafkaConsumerConfig; +import org.opengauss.datachecker.extract.kafka.KafkaAdminService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; + +import javax.annotation.PostConstruct; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * DataConsolidationServiceImpl + * + * @author :zhangyaozhong + * @date :Created in 2022/6/14 + * @since :11 + */ +@Slf4j +@Service +public class DataConsolidationServiceImpl implements DataConsolidationService { + + private static final IncrementCheckConfig INCREMENT_CHECK_CONIFG = new IncrementCheckConfig(); + + private KafkaConsumer debeziumTopicOffSetConsumer = null; + + private final Object lock = new Object(); + + @Autowired + private DebeziumDataHandler debeziumDataHandler; + + @Autowired + private KafkaConsumerConfig consumerConfig; + + @Autowired + private KafkaAdminService kafkaAdminService; + + @Autowired + private ExtractProperties extractProperties; + + @PostConstruct + public void initIncrementConfig() { + if (extractProperties.getDebeziumEnable()) { + INCREMENT_CHECK_CONIFG.setDebeziumTopic(extractProperties.getDebeziumTopic()) + .setDebeziumTables(extractProperties.getDebeziumTables()) + .setPartitions(extractProperties.getDebeziumTopicPartitions()) + .setGroupId(extractProperties.getDebeziumGroupId()); + + getDebeziumTopicRecordOffSet(); + } + } + + /** + * Get the topic records of debezium, and analyze and merge the topic records + * + * @param topicName topic name + * @return topic records + */ + @Override + public List getDebeziumTopicRecords(String topicName) { + checkIncrementCheckEnvironment(); + IncrementCheckTopic topic = getDebeziumTopic(); + topic.setTopic(topicName).setGroupId(IdGenerator.nextId36()); + KafkaConsumer kafkaConsumer = consumerConfig.getDebeziumConsumer(topic); + log.info("kafka debezium topic consumer topic=[{}]", topicName); + // Consume a partition data of a topic + List dataList = new ArrayList<>(); + comsumerAllRecords(kafkaConsumer, dataList); + log.info("kafka consumer topic=[{}] dataList=[{}]", topicName, dataList.size()); + return dataList; + } + + private void comsumerAllRecords(KafkaConsumer kafkaConsumer, List dataList) { + log.debug("kafka Consumer poll"); + DebeziumDataLogs debeziumDataLogs = new DebeziumDataLogs(); + int consumerRecords = getConsumerRecords(kafkaConsumer, debeziumDataLogs); + while (consumerRecords > 0) { + consumerRecords = getConsumerRecords(kafkaConsumer, debeziumDataLogs); + } + dataList.addAll(debeziumDataLogs.values()); + log.debug("Consumer data debezium DataHandler"); + } + + /** + * Consume and process Kafka consumer client data + * + * @param kafkaConsumer consumer + * @param debeziumDataLogs Processing results + * @return Number of consumer records + */ + private int getConsumerRecords(KafkaConsumer kafkaConsumer, DebeziumDataLogs debeziumDataLogs) { + ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(200)); + consumerRecords.forEach(record -> { + try { + debeziumDataHandler.handler(record.value(), debeziumDataLogs); + } catch (Exception e) { + // Abnormal message structure, ignoring the current message + log.error("Abnormal message structure, ignoring the current message,{}", record.value()); + } + }); + return consumerRecords.count(); + } + + /** + * Get the message record information of the topic corresponding to the debrizum listening table + * + * @return Return message consumption record + */ + @Override + public IncrementCheckTopic getDebeziumTopicRecordOffSet() { + checkIncrementCheckEnvironment(); + IncrementCheckTopic topic = getDebeziumTopic(); + final TopicPartition topicPartition = new TopicPartition(topic.getTopic(), 0); + List partitionList = List.of(topicPartition); + debeziumTopicOffSetConsumer = getDebeziumTopicOffSetConsumer(); + + // View topic current message consumption starting position + debeziumTopicOffSetConsumer.seekToBeginning(partitionList); + topic.setBegin(debeziumTopicOffSetConsumer.position(topicPartition)); + + // View topic current message consumption deadline + debeziumTopicOffSetConsumer.seekToEnd(partitionList); + topic.setEnd(debeziumTopicOffSetConsumer.position(topicPartition)); + return topic; + } + + private KafkaConsumer getDebeziumTopicOffSetConsumer() { + if (Objects.nonNull(debeziumTopicOffSetConsumer)) { + return debeziumTopicOffSetConsumer; + } + if (Objects.isNull(debeziumTopicOffSetConsumer)) { + synchronized (lock) { + if (Objects.isNull(debeziumTopicOffSetConsumer)) { + IncrementCheckTopic topic = getDebeziumTopic(); + final TopicPartition topicPartition = new TopicPartition(topic.getTopic(), 0); + KafkaConsumer kafkaConsumer = consumerConfig.getDebeziumConsumer(topic); + List partitionList = List.of(topicPartition); + // Set consumption mode as partition + kafkaConsumer.assign(partitionList); + } + } + } + return debeziumTopicOffSetConsumer; + } + + /** + * Get the debezium listening table and record the offset information of the message corresponding to the topic + * + * @return offset + */ + @Override + public long getDebeziumTopicRecordEndOffSet() { + final TopicPartition topicPartition = new TopicPartition(INCREMENT_CHECK_CONIFG.getDebeziumTopic(), 0); + // View topic current message consumption deadline + return debeziumTopicOffSetConsumer.position(topicPartition); + } + + private IncrementCheckTopic getDebeziumTopic() { + return new IncrementCheckTopic().setTopic(INCREMENT_CHECK_CONIFG.getDebeziumTopic()) + .setGroupId(INCREMENT_CHECK_CONIFG.getGroupId()) + .setPartitions(INCREMENT_CHECK_CONIFG.getPartitions()); + } + + @Override + public boolean isSourceEndpoint() { + return Objects.equals(Endpoint.SOURCE, extractProperties.getEndpoint()); + } + + /** + * Is the current service a source service + */ + private void checkSourceEndpoint() { + Assert.isTrue(isSourceEndpoint(), "The current service is not a source-endpoint-service"); + } + + /** + * Check the configuration of the debezium environment for incremental verification + */ + private void checkIncrementCheckEnvironment() { + final Set allKeys = MetaDataCache.getAllKeys(); + // Debezium environmental inspection + checkDebeziumEnvironment(INCREMENT_CHECK_CONIFG.getDebeziumTopic(), INCREMENT_CHECK_CONIFG.getDebeziumTables(), + allKeys); + } + + /** + * Check and configure the incremental verification debezium environment configuration + * + * @param config configuration information + */ + @Override + public void configIncrementCheckEnvironment(@NotNull IncrementCheckConfig config) { + final Set allKeys = MetaDataCache.getAllKeys(); + checkDebeziumEnvironment(config.getDebeziumTopic(), config.getDebeziumTables(), allKeys); + INCREMENT_CHECK_CONIFG.setDebeziumTables(config.getDebeziumTables()).setDebeziumTopic(config.getDebeziumTopic()) + .setGroupId(config.getGroupId()).setPartitions(config.getPartitions()); + } + + /** + * debezium Environment check

+ * Check whether the debezium configuration topic exists

+ * Check whether the debezium configuration tables exist

+ * + * @param debeziumTopic Topic to be checked + * @param debeziumTables Debezium configuration table list + * @param allTableSet Source end table set + */ + private void checkDebeziumEnvironment( + @NotEmpty(message = "Debezium configuration topic cannot be empty") String debeziumTopic, + @NotEmpty(message = "Debezium configuration tables cannot be empty") List debeziumTables, + @NotEmpty(message = "Source side table metadata cache exception") Set allTableSet) { + checkSourceEndpoint(); + if (!kafkaAdminService.isTopicExists(debeziumTopic)) { + + // The configuration item debezium topic information does not exist + throw new DebeziumConfigException("The configuration item debezium topic information does not exist"); + } + final List allTableList = allTableSet.stream().map(String::toUpperCase).collect(Collectors.toList()); + + List invalidTables = + debeziumTables.stream().map(String::toUpperCase).filter(table -> !allTableList.contains(table)) + .collect(Collectors.toList()); + if (!CollectionUtils.isEmpty(invalidTables)) { + + // The configuration item debezium tables contains non-existent or black and white list tables + throw new DebeziumConfigException( + "The configuration item debezium-tables contains non-existent or black-and-white list tables:" + + invalidTables.toString()); + } + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataHandler.java new file mode 100644 index 0000000..a6f0a6f --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataHandler.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.debe; + +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.opengauss.datachecker.common.entry.debezium.DebePayload; +import org.opengauss.datachecker.common.entry.debezium.DebeziumData; +import org.opengauss.datachecker.common.entry.debezium.PayloadSource; +import org.springframework.stereotype.Service; + +import javax.validation.constraints.NotNull; +import java.util.Map; + +/** + * DebeziumDataHandler + * + * @author :wangchao + * @date :Created in 2022/6/24 + * @since :11 + */ +@Slf4j +@Service +public class DebeziumDataHandler { + /** + * Debezium message parsing and adding the parsing result to the {@code DebeziumDataLogs.class} result set + * + * @param message message + * @param debeziumDataLogs debeziumDataLogs + */ + public void handler(String message, @NotNull DebeziumDataLogs debeziumDataLogs) { + final DebeziumData debeziumData = JSONObject.parseObject(message, DebeziumData.class); + final DebePayload payload = debeziumData.getPayload(); + final Map before = payload.getBefore(); + final Map after = payload.getAfter(); + final PayloadSource source = payload.getSource(); + // Extract the data and add it to the debezium incremental log object + if (!debeziumDataLogs.addDebeziumDataKey(source.getTable(), after != null ? after : before)) { + // The format of the debezium message is abnormal. + // The corresponding table [{}] of the current message does not exist or is illegal + log.error( + "The debezium message format is abnormal. The current message corresponding table [{}] does not exist or is illegal", + source.getTable()); + } + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataLogs.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataLogs.java new file mode 100644 index 0000000..b252f02 --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/DebeziumDataLogs.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.debe; + +import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; +import org.opengauss.datachecker.common.entry.extract.SourceDataLog; +import org.opengauss.datachecker.common.entry.extract.TableMetadata; +import org.opengauss.datachecker.extract.cache.MetaDataCache; +import org.opengauss.datachecker.extract.constants.ExtConstants; + +import javax.validation.constraints.NotEmpty; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * DebeziumDataLogs + * + * @author :wangchao + * @date :Created in 2022/7/1 + * @since :11 + */ +public class DebeziumDataLogs extends ConcurrentHashMap { + private static final long serialVersionUID = 6477495180190870182L; + + /** + * @param tableName tableName + * @return Table log data encapsulation + */ + public SourceDataLog getOrDefault(@NotEmpty String tableName) { + if (containsKey(tableName)) { + return get(tableName); + } + final SourceDataLog dataLog = buildDefault(tableName); + if (Objects.isNull(dataLog)) { + return null; + } + put(tableName, dataLog); + return get(tableName); + } + + /** + * Build data log encapsulation object according to table name + * + * @param tableName tableName + * @return Data log object + */ + private SourceDataLog buildDefault(String tableName) { + final TableMetadata metadata = MetaDataCache.get(tableName); + if (Objects.isNull(metadata)) { + return null; + } + SourceDataLog dataLog = new SourceDataLog(); + dataLog.setTableName(tableName).setCompositePrimarys( + metadata.getPrimaryMetas().stream().map(ColumnsMetaData::getColumnName).collect(Collectors.toList())) + .setCompositePrimaryValues(new ArrayList<>()); + put(tableName, dataLog); + return dataLog; + } + + /** + * Add the {@code tableName} table log to the table log object + * + * @param tableName tableName + * @param valuesMap All field values of the current record + * @return Add the {@code tableName} table log successfully + */ + public boolean addDebeziumDataKey(String tableName, Map valuesMap) { + final SourceDataLog dataLog = getOrDefault(tableName); + if (Objects.nonNull(dataLog)) { + List primaryValues = new ArrayList<>(); + final List primarys = dataLog.getCompositePrimarys(); + primarys.forEach(primary -> { + if (valuesMap.containsKey(primary)) { + primaryValues.add(valuesMap.get(primary)); + } + }); + dataLog.getCompositePrimaryValues().add(String.join(ExtConstants.PRIMARY_DELIMITER, primaryValues)); + return true; + } + return false; + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/IncrementDataAnalysisService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/IncrementDataAnalysisService.java new file mode 100644 index 0000000..307db8f --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debe/IncrementDataAnalysisService.java @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.debe; + +import lombok.RequiredArgsConstructor; +import org.opengauss.datachecker.common.entry.check.IncrementCheckTopic; +import org.opengauss.datachecker.common.entry.extract.SourceDataLog; +import org.opengauss.datachecker.extract.client.CheckingFeignClient; +import org.opengauss.datachecker.extract.config.ExtractProperties; +import org.springframework.stereotype.Service; +import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; + +import javax.annotation.PostConstruct; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * IncrementDataAnalysisService + * + * @author :wangchao + * @date :Created in 2022/7/4 + * @since :11 + */ +@RequiredArgsConstructor +@Service +public class IncrementDataAnalysisService { + /** + * Single thread scheduled task - execute check polling thread + */ + private static final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + + private final ExtractProperties extractProperties; + private final DataConsolidationService consolidationService; + private final CheckingFeignClient checkingFeignClient; + + /** + * Used to record the offset of the last consumption of the incremental verification topic data, + * which is the starting point of the next data consumption + */ + private volatile AtomicLong lastOffSetAtomic = new AtomicLong(0L); + /** + * It is used to record the last execution time of the incremental verification topic data, + * which is the starting point of the execution cycle of the next data consumption task + */ + private volatile AtomicLong lastTimestampAtomic = new AtomicLong(0L); + + @PostConstruct + public void startIncrDataAnalysis() { + if (extractProperties.getDebeziumEnable() && consolidationService.isSourceEndpoint()) { + verificationConfiguration(); + IncrementCheckTopic topicRecordOffSet = consolidationService.getDebeziumTopicRecordOffSet(); + // Start the initialization load to verify the topic offset + lastOffSetAtomic.set(topicRecordOffSet.getBegin()); + setLastTimestampAtomicCurrentTime(); + dataAnslysis(); + } + } + + private void verificationConfiguration() { + final int debeziumTimePeriod = extractProperties.getDebeziumTimePeriod(); + final int debeziumNumPeriod = extractProperties.getDebeziumNumPeriod(); + Assert.isTrue(debeziumTimePeriod > 0, + "Debezium incremental migration verification, the time period should be greater than 0"); + Assert.isTrue(debeziumNumPeriod > 100, "Debezium incremental migration verification statistics:" + + "the threshold value of the number of incremental change records should be greater than 100"); + } + + /** + * Incremental log data record extraction scheduling task + */ + public void dataAnslysis() { + scheduledExecutor + .scheduleWithFixedDelay(peekDebeziumTopicRecordOffset(), DataNumAnalysisThreadConstant.initialDelay, + DataNumAnalysisThreadConstant.delay, TimeUnit.SECONDS); + } + + /** + * @return Incremental log data record extraction scheduling task thread + */ + private Runnable peekDebeziumTopicRecordOffset() { + return () -> { + Thread.currentThread().setName(DataNumAnalysisThreadConstant.name); + dataNumAnalysis(); + dataTimeAnslysis(); + }; + } + + /** + * Incremental log data extraction and time latitude management + */ + public void dataTimeAnslysis() { + long time = System.currentTimeMillis(); + if ((time - lastTimestampAtomic.get()) >= extractProperties.getDebeziumTimePeriod()) { + final List debeziumTopicRecords = + consolidationService.getDebeziumTopicRecords(extractProperties.getDebeziumTopic()); + if (!CollectionUtils.isEmpty(debeziumTopicRecords)) { + checkingFeignClient.notifySourceIncrementDataLogs(debeziumTopicRecords); + lastOffSetAtomic.addAndGet(debeziumTopicRecords.size()); + } + } + // Set the start calculation time point of the next time execution cycle + lastTimestampAtomic.set(time); + } + + /** + * Incremental log data extraction, quantity and latitude management + */ + public void dataNumAnalysis() { + final long offset = consolidationService.getDebeziumTopicRecordEndOffSet(); + // Verify whether the data volume threshold dimension scenario trigger conditions are met + if ((offset - lastOffSetAtomic.get()) >= extractProperties.getDebeziumNumPeriod()) { + // When the data volume threshold is reached, + // the data is extracted and pushed to the verification service. + final List debeziumTopicRecords = + consolidationService.getDebeziumTopicRecords(extractProperties.getDebeziumTopic()); + checkingFeignClient.notifySourceIncrementDataLogs(debeziumTopicRecords); + lastOffSetAtomic.addAndGet(debeziumTopicRecords.size()); + // Trigger data volume threshold dimension scenario - update time threshold + setLastTimestampAtomicCurrentTime(); + } + } + + private void setLastTimestampAtomicCurrentTime() { + lastTimestampAtomic.set(System.currentTimeMillis()); + } + + interface DataNumAnalysisThreadConstant { + String name = "DataAnalysisThread"; + long initialDelay = 0; + long delay = 1; + } +} -- Gitee