From 8b3847869ac523384a903a4ebac8efe6578aa4f6 Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Thu, 6 Mar 2025 10:33:06 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=A2=9E=E9=87=8F=E6=A0=A1?= =?UTF-8?q?=E9=AA=8C=E6=95=B0=E6=8D=AE=E6=9F=A5=E8=AF=A2=E6=9C=BA=E5=88=B6?= =?UTF-8?q?=EF=BC=8C=E4=BB=A5=E5=8F=8A=E6=95=B0=E6=8D=AE=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../check/client/ExtractFallbackFactory.java | 36 +-- .../check/client/ExtractFeignClient.java | 40 +-- .../modules/check/IncrementCheckThread.java | 2 +- .../modules/check/QueryRowDataWapper.java | 113 +------ .../service/IncrementManagerService.java | 2 +- .../extract/controller/ExtractController.java | 52 ++-- .../extract/service/DataExtractService.java | 26 +- .../service/DataExtractServiceImpl.java | 94 ++++-- .../extract/task/DataManipulationService.java | 146 +++++---- .../service/DataExtractServiceImplTest.java | 278 ------------------ pom.xml | 4 +- 11 files changed, 249 insertions(+), 544 deletions(-) delete mode 100644 datachecker-extract/src/test/java/org/opengauss/datachecker/extract/service/DataExtractServiceImplTest.java diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java index 3d43385..85cbb98 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFallbackFactory.java @@ -170,46 +170,36 @@ public class ExtractFallbackFactory implements FallbackFactory> querySourceCheckRowData(SourceDataLog dataLog) { + public Result queryCheckRowDataAsync(SourceDataLog dataLog) { return Result.error("Remote call failed"); } /** - * querySinkCheckRowData + * query check row data async status * - * @param dataLog data Log - * @return error + * @param queryId query id + * @return query status */ @Override - public Result> querySinkCheckRowData(SourceDataLog dataLog) { + public Result queryCheckRowDataAsyncStatus(String queryId) { return Result.error("Remote call failed"); } /** - * querySourceSecondaryCheckRowData + * query check row data async data * - * @param dataLog data Log - * @return error - */ - @Override - public Result> querySourceSecondaryCheckRowData(SourceDataLog dataLog) { - return Result.error("Remote call failed"); - } - - /** - * querySinkSecondaryCheckRowData - * - * @param dataLog data Log - * @return error + * @param queryId query id + * @return row data list */ @Override - public Result> querySinkSecondaryCheckRowData(SourceDataLog dataLog) { + public Result> queryCheckRowDataAsyncData(String queryId) { return Result.error("Remote call failed"); } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java index 31abbbd..a1640fa 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/client/ExtractFeignClient.java @@ -185,40 +185,32 @@ public interface ExtractFeignClient { Result querySinkTableMetadataHash(@RequestParam(name = "tableName") String tableName); /** - * Extract incremental log data records + * Query the hash value of the row data of the table; + * query is async, and the result is returned query id * - * @param dataLog data Log - * @return Return extraction results + * @param dataLog dataLog + * @return query id */ - @PostMapping("/extract/source/data/row/hash") - Result> querySourceCheckRowData(@RequestBody SourceDataLog dataLog); + @PostMapping("/extract/data/row/hash/async") + Result queryCheckRowDataAsync(@RequestBody SourceDataLog dataLog); /** - * Extract incremental log data records + * query check row data async status * - * @param dataLog data Log - * @return Return extraction results + * @param queryId query id + * @return query status */ - @PostMapping("/extract/sink/data/row/hash") - Result> querySinkCheckRowData(@RequestBody SourceDataLog dataLog); + @PostMapping("/extract/data/row/hash/async/status") + Result queryCheckRowDataAsyncStatus(@RequestParam(name = "queryId") String queryId); /** - * Extract incremental log data records + * query check row data async data * - * @param dataLog data Log - * @return Return extraction results + * @param queryId query id + * @return row data list */ - @PostMapping("/extract/source/secondary/data/row/hash") - Result> querySourceSecondaryCheckRowData(@RequestBody SourceDataLog dataLog); - - /** - * Extract incremental log data records - * - * @param dataLog data Log - * @return Return extraction results - */ - @PostMapping("/extract/sink/secondary/data/row/hash") - Result> querySinkSecondaryCheckRowData(@RequestBody SourceDataLog dataLog); + @PostMapping("/extract/data/row/hash/async/data") + Result> queryCheckRowDataAsyncData(@RequestParam(name = "queryId") String queryId); /** * Get the current endpoint configuration information diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java index f44f463..1b396fb 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java @@ -438,7 +438,7 @@ public class IncrementCheckThread extends Thread { private void buildSecondaryCheckBucket(Endpoint endpoint, SourceDataLog dataLog, List bucketList) { StopWatch rowDataSecQueryWatch = new StopWatch("check sec query row data " + endpoint); rowDataSecQueryWatch.start(dataLog.getTableName() + " " + dataLog.getCompositePrimaryValues().size()); - List dataList = queryRowDataWapper.querySecondaryCheckRowData(endpoint, dataLog); + List dataList = queryRowDataWapper.queryCheckRowData(endpoint, dataLog); rowDataSecQueryWatch.stop(); LogUtils.debug(log, "query sec row data cost: {}", rowDataSecQueryWatch.shortSummary()); buildBucket(dataList, endpoint, bucketList); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/QueryRowDataWapper.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/QueryRowDataWapper.java index 12a7289..d20c05a 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/QueryRowDataWapper.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/QueryRowDataWapper.java @@ -15,18 +15,21 @@ package org.opengauss.datachecker.check.modules.check; +import cn.hutool.core.thread.ThreadUtil; + import org.apache.commons.collections4.CollectionUtils; import org.opengauss.datachecker.check.client.ExtractFeignClient; import org.opengauss.datachecker.check.client.FeignClientService; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; +import org.opengauss.datachecker.common.exception.CheckingException; +import org.opengauss.datachecker.common.web.Result; import org.springframework.util.Assert; import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; /** * QueryRowDataWapper @@ -36,7 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger; * @since :11 */ public class QueryRowDataWapper { - private static final int MAX_QUERY_PAGE_SIZE = 100; + private static final int MAX_WAIT_TIMES = 100; private final FeignClientService feignClient; @@ -53,7 +56,7 @@ public class QueryRowDataWapper { * Query incremental data * * @param endpoint endpoint - * @param dataLog dataLog + * @param dataLog dataLog * @return result */ public List queryCheckRowData(Endpoint endpoint, SourceDataLog dataLog) { @@ -62,102 +65,16 @@ public class QueryRowDataWapper { } ExtractFeignClient client = feignClient.getClient(endpoint); Assert.isTrue(Objects.nonNull(client), endpoint + " feign client is null"); - - final List compositeKeys = dataLog.getCompositePrimaryValues(); - List result = new ArrayList<>(); - if (compositeKeys.size() > MAX_QUERY_PAGE_SIZE) { - AtomicInteger cnt = new AtomicInteger(0); - List tempCompositeKeys = new ArrayList<>(); - compositeKeys.forEach(key -> { - tempCompositeKeys.add(key); - if (cnt.incrementAndGet() % MAX_QUERY_PAGE_SIZE == 0) { - SourceDataLog pageDataLog = getPageDataLog(dataLog, tempCompositeKeys); - if (Endpoint.SOURCE.equals(endpoint)) { - result.addAll(client.querySourceCheckRowData(pageDataLog).getData()); - } else { - result.addAll(client.querySinkCheckRowData(pageDataLog).getData()); - } - tempCompositeKeys.clear(); - } - }); - if (CollectionUtils.isNotEmpty(tempCompositeKeys)) { - SourceDataLog pageDataLog = getPageDataLog(dataLog, tempCompositeKeys); - if (Endpoint.SOURCE.equals(endpoint)) { - result.addAll(client.querySourceCheckRowData(pageDataLog).getData()); - } else { - result.addAll(client.querySinkCheckRowData(pageDataLog).getData()); - } - tempCompositeKeys.clear(); - } - } else { - if (Endpoint.SOURCE.equals(endpoint)) { - result.addAll(client.querySourceCheckRowData(dataLog).getData()); - } else { - result.addAll(client.querySinkCheckRowData(dataLog).getData()); - } + String queryId = client.queryCheckRowDataAsync(dataLog).getData(); + int waitTimes = 0; + while (!client.queryCheckRowDataAsyncStatus(queryId).getData() && waitTimes < MAX_WAIT_TIMES) { + ThreadUtil.safeSleep(1000); + waitTimes++; } - return result; - } - - /** - * Query incremental data - * - * @param endpoint endpoint - * @param dataLog dataLog - * @return result - */ - public List querySecondaryCheckRowData(Endpoint endpoint, SourceDataLog dataLog) { - if (dataLog == null || CollectionUtils.isEmpty(dataLog.getCompositePrimaryValues())) { - return new ArrayList<>(); - } - ExtractFeignClient client = feignClient.getClient(endpoint); - Assert.isTrue(Objects.nonNull(client), endpoint + " feign client is null"); - - final List compositeKeys = dataLog.getCompositePrimaryValues(); - List result = new ArrayList<>(); - if (compositeKeys.size() > MAX_QUERY_PAGE_SIZE) { - AtomicInteger cnt = new AtomicInteger(0); - List tempCompositeKeys = new ArrayList<>(); - compositeKeys.forEach(key -> { - tempCompositeKeys.add(key); - if (cnt.incrementAndGet() % MAX_QUERY_PAGE_SIZE == 0) { - SourceDataLog pageDataLog = getPageDataLog(dataLog, tempCompositeKeys); - if (Endpoint.SOURCE.equals(endpoint)) { - result.addAll(client.querySourceSecondaryCheckRowData(pageDataLog).getData()); - } else { - result.addAll(client.querySinkSecondaryCheckRowData(pageDataLog).getData()); - } - tempCompositeKeys.clear(); - } - }); - if (CollectionUtils.isNotEmpty(tempCompositeKeys)) { - SourceDataLog pageDataLog = getPageDataLog(dataLog, tempCompositeKeys); - if (Endpoint.SOURCE.equals(endpoint)) { - result.addAll(client.querySourceSecondaryCheckRowData(pageDataLog).getData()); - } else { - result.addAll(client.querySinkSecondaryCheckRowData(pageDataLog).getData()); - } - tempCompositeKeys.clear(); - } - } else { - if (Endpoint.SOURCE.equals(endpoint)) { - result.addAll(client.querySourceSecondaryCheckRowData(dataLog).getData()); - } else { - result.addAll(client.querySinkSecondaryCheckRowData(dataLog).getData()); - } - } - return result; - } - - private SourceDataLog getPageDataLog(SourceDataLog dataLog, List tempCompositeKeys) { - SourceDataLog pageDataLog = new SourceDataLog(); - pageDataLog.setTableName(dataLog.getTableName()); - pageDataLog.setCompositePrimarys(dataLog.getCompositePrimarys()); - if (Objects.nonNull(tempCompositeKeys)) { - pageDataLog.setCompositePrimaryValues(tempCompositeKeys); - } else { - pageDataLog.setCompositePrimaryValues(dataLog.getCompositePrimaryValues()); + if (waitTimes >= MAX_WAIT_TIMES) { + throw new CheckingException("async query check row data wait timeout | queryId:" + queryId); } - return pageDataLog; + Result> listResult = client.queryCheckRowDataAsyncData(queryId); + return listResult.getData(); } } diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java index 5852bf8..a2e58c6 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/IncrementManagerService.java @@ -155,7 +155,7 @@ public class IncrementManagerService { if (INC_LOG_QUEUE.isEmpty()) { feignClientService.resumeIncrementMonitor(); LogUtils.warn(log, "resume increment monitor, because the inc-log-queue is empty !"); - ThreadUtil.sleepSecond(60); + ThreadUtil.sleepSecond(5); } else { feignClientService.pauseIncrementMonitor(); LogUtils.warn(log, "pause increment monitor, because the inc-log-queue is not empty !"); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java index 7eed491..b5c0deb 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/controller/ExtractController.java @@ -218,51 +218,37 @@ public class ExtractController { } /** - * queries data corresponding to a specified primary key value in a table - * and performs hash for verification data query. + * Query the hash value of the row data of the table; + * query is async, and the result is returned query id * - * @param dataLog data change logs - * @return row data hash + * @param dataLog dataLog + * @return query id */ - @PostMapping("/extract/source/data/row/hash") - Result> querySourceCheckRowData(@RequestBody SourceDataLog dataLog) { - return Result.success(dataExtractService.querySecondaryCheckRowData(dataLog)); + @PostMapping("/extract/data/row/hash/async") + Result queryCheckRowDataAsync(@RequestBody SourceDataLog dataLog) { + return Result.success(dataExtractService.querySecondaryCheckRowDataAsync(dataLog)); } /** - * queries data corresponding to a specified primary key value in a table - * and performs hash for verification data query. + * query check row data async status * - * @param dataLog data change logs - * @return row data hash + * @param queryId query id + * @return query status */ - @PostMapping("/extract/sink/data/row/hash") - Result> querySinkCheckRowData(@RequestBody SourceDataLog dataLog) { - return Result.success(dataExtractService.querySecondaryCheckRowData(dataLog)); + @PostMapping("/extract/data/row/hash/async/status") + Result queryCheckRowDataAsyncStatus(@RequestParam(name = "queryId") String queryId) { + return Result.success(dataExtractService.querySecondaryCheckRowDataStatusByQueryId(queryId)); } /** - * queries data corresponding to a specified primary key value in a table - * and performs hash for secondary verification data query. + * query check row data async data * - * @param dataLog data change logs - * @return row data hash + * @param queryId query id + * @return row data list */ - @PostMapping("/extract/source/secondary/data/row/hash") - Result> querySourceSecondaryCheckRowData(@RequestBody SourceDataLog dataLog) { - return Result.success(dataExtractService.querySecondaryCheckRowData(dataLog)); - } - - /** - * queries data corresponding to a specified primary key value in a table - * and performs hash for secondary verification data query. - * - * @param dataLog data change logs - * @return row data hash - */ - @PostMapping("/extract/sink/secondary/data/row/hash") - Result> querySinkSecondaryCheckRowData(@RequestBody SourceDataLog dataLog) { - return Result.success(dataExtractService.querySecondaryCheckRowData(dataLog)); + @PostMapping("/extract/data/row/hash/async/data") + Result> queryCheckRowDataAsyncData(@RequestParam(name = "queryId") String queryId) { + return Result.success(dataExtractService.querySecondaryCheckRowDataByQueryId(queryId)); } /** diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractService.java index 488d2d4..00c27c5 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractService.java @@ -43,7 +43,7 @@ public interface DataExtractService { * @param processNo processNo * @return Specify the construction extraction task set under processno * @throws ProcessMultipleException The current instance is executing the data extraction service - * and cannot restart the new verification. + * and cannot restart the new verification. */ PageExtract buildExtractTaskAllTables(String processNo) throws ProcessMultipleException; @@ -60,7 +60,7 @@ public interface DataExtractService { * * @param taskList taskList * @throws ProcessMultipleException The current instance is executing the data extraction service - * and cannot restart the new verification. + * and cannot restart the new verification. */ void dispatchSinkExtractTaskPage(List taskList) throws ProcessMultipleException; @@ -88,7 +88,7 @@ public interface DataExtractService { /** * Query table data * - * @param tableName tableName + * @param tableName tableName * @param compositeKeySet compositeKeySet * @return Primary key corresponds to table data */ @@ -106,9 +106,25 @@ public interface DataExtractService { * PK list data is specified in the query table, and hash is used for secondary verification data query * * @param dataLog dataLog - * @return row data hash + * @return queryId + */ + String querySecondaryCheckRowDataAsync(SourceDataLog dataLog); + + /** + * Query the data of the secondary check task by queryId + * + * @param queryId queryId + * @return RowDataHash + */ + List querySecondaryCheckRowDataByQueryId(String queryId); + + /** + * Query the status of the secondary check task by queryId + * + * @param queryId queryId + * @return true or false */ - List querySecondaryCheckRowData(SourceDataLog dataLog); + boolean querySecondaryCheckRowDataStatusByQueryId(String queryId); /** * Get the current endpoint configuration information diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java index 3f09fd8..6528779 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java @@ -36,6 +36,7 @@ import org.opengauss.datachecker.common.entry.extract.SliceVo; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.entry.extract.TableMetadataHash; +import org.opengauss.datachecker.common.exception.ExtractException; import org.opengauss.datachecker.common.exception.ProcessMultipleException; import org.opengauss.datachecker.common.exception.TableNotExistException; import org.opengauss.datachecker.common.exception.TaskNotFoundException; @@ -64,9 +65,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.lang.NonNull; import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; -import org.springframework.util.StopWatch; +import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.ArrayList; @@ -78,7 +80,11 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -95,7 +101,7 @@ import static org.opengauss.datachecker.common.constant.DynamicTpConstant.EXTRAC @Service public class DataExtractServiceImpl implements DataExtractService { private static final Logger log = LogUtils.getLogger(DataExtractService.class); - + private static final int INC_QUERY_PAGE_SIE = 200; /** * Maximum number of sleeps of threads executing data extraction tasks */ @@ -108,6 +114,9 @@ public class DataExtractServiceImpl implements DataExtractService { private static final String TASK_NAME_PREFIX = "extract_task_"; private static final int SINGLE_SLICE_NUM = 1; + private final Map> rowDataHashMap = new ConcurrentHashMap<>(); + private final BlockingQueue fetchedRowDataQueue = new LinkedBlockingQueue<>(); + /** * After the service is started, the {code atomicProcessNo} attribute will be initialized, *

@@ -136,6 +145,8 @@ public class DataExtractServiceImpl implements DataExtractService { @Resource private DynamicThreadPoolManager dynamicThreadPoolManager; @Resource + private ThreadPoolTaskExecutor sliceSendExecutor; + @Resource private SliceRegister sliceRegister; @Resource private KafkaTemplate kafkaTemplate; @@ -600,28 +611,65 @@ public class DataExtractServiceImpl implements DataExtractService { return dataManipulationService.queryTableMetadataHash(tableName); } - /** - * PK list data is specified in the query table, and hash is used for secondary verification data query - * - * @param dataLog data log - * @return row data hash - */ + @PostConstruct + private void incrementRowDataHashCacheMonitor() { + sliceSendExecutor.submit(() -> { + Thread.currentThread().setName("IncFetchedMonitor"); + String queryId = ""; + try { + queryId = fetchedRowDataQueue.take(); + ThreadUtil.sleep(1000); + rowDataHashMap.remove(queryId); + log.info("queryId:{} rowDataHashCache size:{}", queryId, rowDataHashMap.size()); + } catch (InterruptedException e) { + log.debug("ignore queryId:{} InterruptedException ", queryId); + } + }); + } + @Override - public List querySecondaryCheckRowData(SourceDataLog dataLog) { - final String tableName = dataLog.getTableName(); - StopWatch stopWatch = new StopWatch("endpoint - query row data"); - final List compositeKeys = dataLog.getCompositePrimaryValues(); - stopWatch.start("query " + tableName + " metadata"); - final TableMetadata metadata = metaDataService.getMetaDataOfSchemaByCache(tableName); - stopWatch.stop(); - if (Objects.isNull(metadata)) { - throw new TableNotExistException(tableName); - } - stopWatch.start("query " + tableName + " " + compositeKeys.size()); - List result = dataManipulationService.queryColumnHashValues(tableName, compositeKeys, metadata); - stopWatch.stop(); - log.debug("endpoint - query row data - {}", stopWatch.prettyPrint()); - return result; + public String querySecondaryCheckRowDataAsync(SourceDataLog dataLog) { + String queryId = UUID.randomUUID().toString(); + sliceSendExecutor.submit(() -> { + Thread.currentThread().setName("IncAsyncQuery"); + final String tableName = dataLog.getTableName(); + log.info("queryId:{} start async check data query | table:{}", queryId, tableName); + try { + final List compositeKeys = dataLog.getCompositePrimaryValues(); + final TableMetadata metadata = metaDataService.getMetaDataOfSchemaByCache(tableName); + if (Objects.isNull(metadata)) { + throw new TableNotExistException(tableName); + } + List result = new ArrayList<>(); + for (int i = 0; i < compositeKeys.size(); i += INC_QUERY_PAGE_SIE) { + int toIndex = Math.min(i + INC_QUERY_PAGE_SIE, compositeKeys.size()); + List pageKeys = compositeKeys.subList(i, toIndex); + List pageResult = dataManipulationService.queryColumnHashValues(tableName, pageKeys, + metadata); + result.addAll(pageResult); + log.debug("queryId:{} processed {}/{} records", queryId, toIndex, compositeKeys.size()); + } + rowDataHashMap.put(queryId, result); + } catch (ExtractException e) { + log.error("queryId:[{}] Async check data query failed | table:{} | keys:{} | error: ", queryId, + tableName, dataLog.getCompositePrimaryValues(), e); + } + }); + return queryId; + } + + @Override + public List querySecondaryCheckRowDataByQueryId(String queryId) { + List rowDataHashes = rowDataHashMap.get(queryId); + fetchedRowDataQueue.add(queryId); + log.info("queryId:{} fetched check row data ", queryId); + return rowDataHashes; + } + + @Override + public boolean querySecondaryCheckRowDataStatusByQueryId(String queryId) { + log.info("queryId:{} status {}", queryId, rowDataHashMap.containsKey(queryId)); + return rowDataHashMap.containsKey(queryId); } @Override diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java index 3670bee..3853443 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java @@ -15,6 +15,8 @@ package org.opengauss.datachecker.extract.task; +import cn.hutool.core.util.ArrayUtil; + import org.apache.commons.collections4.CollectionUtils; import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; @@ -24,13 +26,14 @@ import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.entry.extract.TableMetadataHash; -import org.opengauss.datachecker.common.exception.ExtractDataAccessException; +import org.opengauss.datachecker.common.exception.ExtractException; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.LongHashFunctionWrapper; import org.opengauss.datachecker.extract.config.ExtractProperties; import org.opengauss.datachecker.extract.data.access.DataAccessService; import org.opengauss.datachecker.extract.dml.DmlBuilder; import org.opengauss.datachecker.extract.dml.SelectDmlBuilder; +import org.opengauss.datachecker.extract.resource.ConnectionMgr; import org.opengauss.datachecker.extract.service.MetaDataService; import org.opengauss.datachecker.extract.util.MetaDataUtil; import org.springframework.beans.factory.annotation.Value; @@ -38,11 +41,18 @@ import org.springframework.stereotype.Service; import org.springframework.util.Assert; import javax.annotation.Resource; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * DML Data operation service realizes dynamic query of data @@ -70,32 +80,36 @@ public class DataManipulationService { /** * queryColumnValues * - * @param tableName tableName + * @param tableName tableName * @param compositeKeys compositeKeys * @param tableMetadata tableMetadata * @return query result */ public List queryColumnHashValues(String tableName, List compositeKeys, - TableMetadata tableMetadata) { + TableMetadata tableMetadata) { Assert.isTrue(Objects.nonNull(tableMetadata), "Abnormal table metadata , failed to build select SQL"); final List primaryMetas = tableMetadata.getPrimaryMetas(); - Assert.isTrue(!CollectionUtils.isEmpty(primaryMetas), - "The metadata of the table primary is abnormal, , failed to build select SQL"); + "The metadata of the table primary is abnormal, , failed to build select SQL"); final SelectDmlBuilder dmlBuilder = new SelectDmlBuilder(databaseType, tableMetadata.isOgCompatibilityB()); // Single primary key table data query if (primaryMetas.size() == 1) { final ColumnsMetaData primaryData = primaryMetas.get(0); - String querySql = dmlBuilder.dataBaseType(databaseType).schema(extractProperties.getSchema()) - .columns(tableMetadata.getColumnsMetas()).tableName(tableName) - .conditionPrimary(primaryData).build(); + String querySql = dmlBuilder.dataBaseType(databaseType) + .schema(extractProperties.getSchema()) + .columns(tableMetadata.getColumnsMetas()) + .tableName(tableName) + .conditionPrimary(primaryData) + .build(); return queryColumnValuesSinglePrimaryKey(querySql, compositeKeys, tableMetadata); } else { // Compound primary key table data query - - String querySql = dmlBuilder.dataBaseType(databaseType).schema(extractProperties.getSchema()) - .columns(tableMetadata.getColumnsMetas()).tableName(tableName) - .conditionCompositePrimary(primaryMetas).build(); + String querySql = dmlBuilder.dataBaseType(databaseType) + .schema(extractProperties.getSchema()) + .columns(tableMetadata.getColumnsMetas()) + .tableName(tableName) + .conditionCompositePrimary(primaryMetas) + .build(); List batchParam = dmlBuilder.conditionCompositePrimaryValue(primaryMetas, compositeKeys); return queryColumnValuesByCompositePrimary(querySql, batchParam, tableMetadata); } @@ -104,31 +118,35 @@ public class DataManipulationService { /** * queryColumnValues * - * @param tableName tableName + * @param tableName tableName * @param compositeKeys compositeKeys - * @param metadata tableMetadata + * @param metadata tableMetadata * @return query result */ public List> queryColumnValues(String tableName, List compositeKeys, - TableMetadata metadata) { - Assert.isTrue(Objects.nonNull(metadata), - "Abnormal table metadata information, failed to build select SQL"); + TableMetadata metadata) { + Assert.isTrue(Objects.nonNull(metadata), "Abnormal table metadata information, failed to build select SQL"); final List primaryMetas = metadata.getPrimaryMetas(); - Assert.isTrue(!CollectionUtils.isEmpty(primaryMetas), - "The metadata of the table primary is abnormal, failed to build select SQL"); + "The metadata of the table primary is abnormal, failed to build select SQL"); final SelectDmlBuilder dmlBuilder = new SelectDmlBuilder(databaseType, metadata.isOgCompatibilityB()); List> resultMap; // Single primary key table data query if (primaryMetas.size() == 1) { final ColumnsMetaData primaryData = primaryMetas.get(0); - String querySql = dmlBuilder.schema(extractProperties.getSchema()).columns(metadata.getColumnsMetas()) - .tableName(tableName).conditionPrimary(primaryData).build(); + String querySql = dmlBuilder.schema(extractProperties.getSchema()) + .columns(metadata.getColumnsMetas()) + .tableName(tableName) + .conditionPrimary(primaryData) + .build(); resultMap = queryColumnValuesSinglePrimaryKey(querySql, compositeKeys); } else { // Compound primary key table data query - String querySql = dmlBuilder.schema(extractProperties.getSchema()).columns(metadata.getColumnsMetas()) - .tableName(tableName).conditionCompositePrimary(primaryMetas).build(); + String querySql = dmlBuilder.schema(extractProperties.getSchema()) + .columns(metadata.getColumnsMetas()) + .tableName(tableName) + .conditionCompositePrimary(primaryMetas) + .build(); List batchParam = dmlBuilder.conditionCompositePrimaryValue(primaryMetas, compositeKeys); resultMap = queryColumnValuesByCompositePrimary(querySql, batchParam); } @@ -139,17 +157,17 @@ public class DataManipulationService { /** * Compound primary key table data query * - * @param selectDml Query SQL - * @param batchParam Compound PK query parameters + * @param statement Query SQL + * @param batchParam Compound PK query parameters * @param tableMetadata tableMetadata * @return Query data results */ - private List queryColumnValuesByCompositePrimary(String selectDml, List batchParam, - TableMetadata tableMetadata) { - // Query the current task data and organize the data - HashMap paramMap = new HashMap<>(InitialCapacity.CAPACITY_1); - paramMap.put(DmlBuilder.PRIMARY_KEYS, batchParam); - return queryColumnValues(selectDml, paramMap, tableMetadata); + private List queryColumnValuesByCompositePrimary(String statement, List batchParam, + TableMetadata tableMetadata) { + String compositeKeyValues = batchParam.stream() + .map(arr -> "(" + ArrayUtil.join(arr, ",") + ")") + .collect(Collectors.joining(",")); + return statementQuery(statement.replace(":primaryKeys", compositeKeyValues), tableMetadata); } private void rectifyValue(TableMetadata metadata, List> resultMap) { @@ -180,17 +198,14 @@ public class DataManipulationService { /** * Single primary key table data query * - * @param selectDml Query SQL - * @param primaryKeys Query primary key collection + * @param statement Query SQL + * @param primaryKeys Query primary key collection * @param tableMetadata tableMetadata * @return Query data results */ - private List queryColumnValuesSinglePrimaryKey(String selectDml, List primaryKeys, - TableMetadata tableMetadata) { - // Query the current task data and organize the data - HashMap paramMap = new HashMap<>(InitialCapacity.CAPACITY_1); - paramMap.put(DmlBuilder.PRIMARY_KEYS, primaryKeys); - return queryColumnValues(selectDml, paramMap, tableMetadata); + private List queryColumnValuesSinglePrimaryKey(String statement, List primaryKeys, + TableMetadata tableMetadata) { + return statementQuery(statement.replace(":primaryKeys", String.join(",", primaryKeys)), tableMetadata); } private List> queryColumnValuesSinglePrimaryKey(String selectDml, List primaryKeys) { @@ -200,27 +215,46 @@ public class DataManipulationService { return queryColumnValues(selectDml, paramMap); } + private List statementQuery(String pageStatement, Map paramMap, + TableMetadata tableMetadata) { + Object primaryKeys = paramMap.get("primaryKeys"); + String sqlParam = ""; + try { + if (primaryKeys instanceof List) { + List primaryKeyList = (List) primaryKeys; + sqlParam = String.join(",", primaryKeyList); + } else { + throw new IllegalArgumentException("primaryKeys must be List"); + } + } catch (ClassCastException ex) { + log.error("{}Failed to query data {}", ErrorCode.EXECUTE_QUERY_SQL, pageStatement, ex); + throw new IllegalArgumentException("paramMap must be List"); + } + return statementQuery(pageStatement.replace(":primaryKeys", sqlParam), tableMetadata); + } - /** - * Primary key table data query - * - * @param selectDml Query SQL - * @param paramMap query parameters - * @return query result - */ - private List queryColumnValues(String selectDml, Map paramMap, - TableMetadata tableMetadata) { - // Use JDBC to query the current task to extract data + private List statementQuery(String pageStatement, TableMetadata tableMetadata) { + List result = new ArrayList<>(); + Connection connection = null; + PreparedStatement ps = null; + ResultSet resultSet = null; try { ResultSetHandler handler = resultSetFactory.createHandler(databaseType); - log.debug("row data : {} param {}", selectDml, paramMap); - return dataAccessService.query(selectDml, paramMap, - (rs, rowNum) -> resultSetHashHandler.handler(MetaDataUtil.getTablePrimaryColumns(tableMetadata), - MetaDataUtil.getTableColumns(tableMetadata), handler.putOneResultSetToMap(rs))); - } catch (Exception e) { - log.error("{}Failed to query data", ErrorCode.EXECUTE_QUERY_SQL, e); - throw new ExtractDataAccessException("Failed to query data " + e.getMessage()); + List tablePrimaryColumns = MetaDataUtil.getTablePrimaryColumns(tableMetadata); + List tableColumns = MetaDataUtil.getTableColumns(tableMetadata); + connection = dataAccessService.getDataSource().getConnection(); + ps = connection.prepareStatement(pageStatement); + resultSet = ps.executeQuery(); + while (resultSet.next()) { + Map rowResult = handler.putOneResultSetToMap(resultSet); + result.add(resultSetHashHandler.handler(tablePrimaryColumns, tableColumns, rowResult)); + } + } catch (SQLException | ExtractException ex) { + log.error("execute query error, sql:{}", pageStatement, ex); + } finally { + ConnectionMgr.close(connection, ps, resultSet); } + return result; } private List> queryColumnValues(String selectDml, Map paramMap) { diff --git a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/service/DataExtractServiceImplTest.java b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/service/DataExtractServiceImplTest.java deleted file mode 100644 index 052b0e8..0000000 --- a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/service/DataExtractServiceImplTest.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. - * - * openGauss is licensed under Mulan PSL v2. - * You can use this software according to the terms and conditions of the Mulan PSL v2. - * You may obtain a copy of Mulan PSL v2 at: - * - * http://license.coscl.org.cn/MulanPSL2 - * - * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, - * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, - * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. - * See the Mulan PSL v2 for more details. - */ - -package org.opengauss.datachecker.extract.service; - -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; -import org.opengauss.datachecker.common.entry.enums.DataBaseType; -import org.opengauss.datachecker.common.entry.enums.Endpoint; -import org.opengauss.datachecker.common.entry.extract.Database; -import org.opengauss.datachecker.common.entry.extract.ExtractConfig; -import org.opengauss.datachecker.common.entry.extract.ExtractTask; -import org.opengauss.datachecker.common.entry.extract.PageExtract; -import org.opengauss.datachecker.common.entry.extract.RowDataHash; -import org.opengauss.datachecker.common.entry.extract.SourceDataLog; -import org.opengauss.datachecker.common.entry.extract.TableMetadata; -import org.opengauss.datachecker.common.entry.extract.TableMetadataHash; -import org.opengauss.datachecker.common.exception.ProcessMultipleException; -import org.opengauss.datachecker.common.exception.TaskNotFoundException; -import org.opengauss.datachecker.extract.cache.MetaDataCache; -import org.opengauss.datachecker.extract.config.ExtractProperties; -import org.opengauss.datachecker.extract.task.DataManipulationService; -import org.opengauss.datachecker.extract.task.ExtractTaskBuilder; -import org.opengauss.datachecker.extract.util.TestJsonUtil; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.Mockito.when; -import static org.opengauss.datachecker.extract.util.TestJsonUtil.KEY_META_DATA_13_TABLE; - -/** - * DataExtractServiceImplTest - * - * @author :wangchao - * @date :Created in 2023/4/24 - * @since :11 - */ -@ExtendWith(MockitoExtension.class) -class DataExtractServiceImplTest { - @Mock - private ExtractTaskBuilder mockExtractTaskBuilder; - @Mock - private ExtractProperties mockExtractProperties; - @Mock - private MetaDataService mockMetaDataService; - @Mock - private DataManipulationService mockDataManipulationService; - @InjectMocks - private DataExtractServiceImpl dataExtractServiceImplUnderTest; - boolean ogCompatibility = false; - - @BeforeAll - static void setUp() { - HashMap result = TestJsonUtil.parseHashMap(KEY_META_DATA_13_TABLE, TableMetadata.class); - MetaDataCache.putMap(result); - } - - @DisplayName("build extract task") - @Test - void testBuildExtractTaskAllTables1() { - when(mockExtractProperties.getEndpoint()).thenReturn(Endpoint.SOURCE); - // Setup - final List extractTaskList = List.of(new ExtractTask()); - when(mockExtractTaskBuilder.builder(MetaDataCache.getAllKeys())).thenReturn(List.of(new ExtractTask())); - // Run the test - final PageExtract result = dataExtractServiceImplUnderTest.buildExtractTaskAllTables("processNo"); - // Verify the results - assertThat(result).isEqualTo(extractTaskList); - } - - @DisplayName("build extract task empty") - @Test - void test_ExtractTaskBuilderReturnsNoItems() { - // Setup - when(mockExtractProperties.getEndpoint()).thenReturn(Endpoint.SOURCE); - when(mockExtractTaskBuilder.builder(MetaDataCache.getAllKeys())).thenReturn(Collections.emptyList()); - // Run the test - final PageExtract result = dataExtractServiceImplUnderTest.buildExtractTaskAllTables("processNo"); - // Verify the results - assertThat(result).isEqualTo(Collections.emptyList()); - } - - @DisplayName("build extract task ProcessMultipleException") - @Test - void test_ThrowsProcessMultipleException() { - // Setup - when(mockExtractProperties.getEndpoint()).thenReturn(Endpoint.SOURCE); - // Configure ExtractTaskBuilder.builder(...). - when(mockExtractTaskBuilder.builder(MetaDataCache.getAllKeys())).thenReturn(List.of(new ExtractTask())); - dataExtractServiceImplUnderTest.buildExtractTaskAllTables("processNo"); - // Run the test - assertThatThrownBy(() -> dataExtractServiceImplUnderTest.buildExtractTaskAllTables("processNo")).isInstanceOf( - ProcessMultipleException.class); - } - - @Test - void testCleanBuildTask() { - // Setup - // Run the test - dataExtractServiceImplUnderTest.cleanBuildTask(); - } - - @Test - void testQueryTableInfo() { - // Setup - // Verify the results - assertThatThrownBy(() -> dataExtractServiceImplUnderTest.queryTableInfo("tableName")).isInstanceOf( - TaskNotFoundException.class); - } - - @Test - void testQueryTableColumnValues() { - // Setup - final List> expectedResult = List.of(Map.ofEntries(Map.entry("value", "value"))); - - // Configure MetaDataService.getMetaDataOfSchemaByCache(...). - String table = "t_test_table_template"; - final TableMetadata tableMetadata = MetaDataCache.get(table); - when(mockMetaDataService.getMetaDataOfSchemaByCache(table)).thenReturn(tableMetadata); - - // Configure DataManipulationService.queryColumnValues(...). - final List> maps = List.of(Map.ofEntries(Map.entry("value", "value"))); - when(mockDataManipulationService.queryColumnValues(table, List.of("value"), tableMetadata)).thenReturn(maps); - - // Run the test - final List> result = - dataExtractServiceImplUnderTest.queryTableColumnValues(table, List.of("value")); - - // Verify the results - assertThat(result).isEqualTo(expectedResult); - } - - @Test - void testQueryTableColumnValues_NoItems() { - // Setup - // Configure MetaDataService.getMetaDataOfSchemaByCache(...). - String table = "t_test_table_template"; - final TableMetadata tableMetadata = MetaDataCache.get(table); - when(mockMetaDataService.getMetaDataOfSchemaByCache(table)).thenReturn(tableMetadata); - when(mockDataManipulationService.queryColumnValues(table, List.of("value"), tableMetadata)).thenReturn( - Collections.emptyList()); - - // Run the test - final List> result = - dataExtractServiceImplUnderTest.queryTableColumnValues(table, List.of("value")); - - // Verify the results - assertThat(result).isEqualTo(Collections.emptyList()); - } - - @Test - void testQueryTableMetadataHash() { - // Setup - final TableMetadataHash expectedResult = new TableMetadataHash(); - expectedResult.setTableName("tableName"); - expectedResult.setTableHash(0L); - - // Configure DataManipulationService.queryTableMetadataHash(...). - final TableMetadataHash tableMetadataHash = new TableMetadataHash(); - tableMetadataHash.setTableName("tableName"); - tableMetadataHash.setTableHash(0L); - when(mockDataManipulationService.queryTableMetadataHash("tableName")).thenReturn(tableMetadataHash); - - // Run the test - final TableMetadataHash result = dataExtractServiceImplUnderTest.queryTableMetadataHash("tableName"); - - // Verify the results - assertThat(result).isEqualTo(expectedResult); - } - - @Test - void testQuerySecondaryCheckRowData() { - // Setup - String table = "t_test_table_template"; - - final SourceDataLog dataLog = new SourceDataLog(); - dataLog.setTableName(table); - dataLog.setBeginOffset(0L); - dataLog.setCompositePrimarys(List.of("value")); - dataLog.setCompositePrimaryValues(List.of("value")); - - final RowDataHash rowDataHash = new RowDataHash(); - rowDataHash.setKey("primaryKey"); - rowDataHash.setKHash(0L); - rowDataHash.setVHash(0L); - final List expectedResult = List.of(rowDataHash); - - // Configure MetaDataService.getMetaDataOfSchemaByCache(...). - - final TableMetadata tableMetadata = MetaDataCache.get(table); - when(mockMetaDataService.getMetaDataOfSchemaByCache(table)).thenReturn(tableMetadata); - - // Configure DataManipulationService.queryColumnHashValues(...). - final RowDataHash rowDataHash1 = new RowDataHash(); - rowDataHash1.setKey("primaryKey"); - rowDataHash1.setKHash(0L); - rowDataHash1.setVHash(0L); - final List rowDataHashes = List.of(rowDataHash1); - when(mockDataManipulationService.queryColumnHashValues(table, List.of("value"), tableMetadata)).thenReturn( - rowDataHashes); - - // Run the test - final List result = dataExtractServiceImplUnderTest.querySecondaryCheckRowData(dataLog); - - // Verify the results - assertThat(result).isEqualTo(expectedResult); - } - - @Test - void testQuerySecondaryCheckRowData_NoItems() { - String table = "t_test_table_template"; - - // Setup - final SourceDataLog dataLog = new SourceDataLog(); - dataLog.setTableName(table); - dataLog.setBeginOffset(0L); - dataLog.setCompositePrimarys(List.of("value")); - dataLog.setCompositePrimaryValues(List.of("value")); - - // Configure MetaDataService.getMetaDataOfSchemaByCache(...). - final TableMetadata tableMetadata = MetaDataCache.get(table); - when(mockMetaDataService.getMetaDataOfSchemaByCache(table)).thenReturn(tableMetadata); - - when(mockDataManipulationService.queryColumnHashValues(table, List.of("value"), tableMetadata)).thenReturn( - Collections.emptyList()); - - // Run the test - final List result = dataExtractServiceImplUnderTest.querySecondaryCheckRowData(dataLog); - - // Verify the results - assertThat(result).isEqualTo(Collections.emptyList()); - } - - @Test - void testGetEndpointConfig() { - // Setup - final ExtractConfig expectedResult = new ExtractConfig(); - expectedResult.setDebeziumEnable(false); - final Database database = new Database(); - database.setSchema("schema"); - database.setDatabaseType(DataBaseType.MS); - database.setEndpoint(Endpoint.SOURCE); - expectedResult.setDatabase(database); - - when(mockExtractProperties.getDatabaseType()).thenReturn(DataBaseType.MS); - when(mockExtractProperties.getSchema()).thenReturn("schema"); - when(mockExtractProperties.getEndpoint()).thenReturn(Endpoint.SOURCE); - when(mockExtractProperties.isDebeziumEnable()).thenReturn(false); - - // Run the test - final ExtractConfig result = dataExtractServiceImplUnderTest.getEndpointConfig(); - - // Verify the results - assertThat(result).isEqualTo(expectedResult); - } -} diff --git a/pom.xml b/pom.xml index 68755c2..a825a4c 100644 --- a/pom.xml +++ b/pom.xml @@ -57,11 +57,11 @@ aliyun - http://maven.aliyun.com/nexus/content/groups/public/ + https://maven.aliyun.com/nexus/content/groups/public/ confluent - http://packages.confluent.io/maven/ + https://packages.confluent.io/maven/ -- Gitee