diff --git a/config/application-sink.yml b/config/application-sink.yml index e201312c1aadab92f4125ff6b02695a6fde007f3..3003894e126a9aa484cca5fda9948ac3effa04cb 100644 --- a/config/application-sink.yml +++ b/config/application-sink.yml @@ -5,11 +5,11 @@ logging: spring: check: server-uri: http://127.0.0.1:9000 - core-pool-size: 5 - maximum-pool-size: 10 + core-pool-size: 3 + maximum-pool-size: 3 maximum-topic-size: 1 - maximum-table-slice-size: 100000 - extend-maximum-pool-size: 5 + maximum-table-slice-size: 10000 + extend-maximum-pool-size: 3 extract: schema: test databaseType: OG @@ -27,7 +27,7 @@ spring: # driver-class-name: com.mysql.cj.jdbc.Driver # url: jdbc:mysql://127.0.0.1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&allowPublicKeyRetrieval=true driver-class-name: org.opengauss.Driver - url: jdbc:opengauss://127.0.0.1:5432/postgres?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&bitToString=true + url: jdbc:opengauss://127.0.0.1:5432/postgres?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&bitToString=true&loggerLevel=OFF&autocommit=false # driver-class-name: oracle.jdbc.OracleDriver # url: jdbc:oracle:thin:@127.0.0.1:1521/TEST username: diff --git a/config/application-source.yml b/config/application-source.yml index a6ea15b177697be7dedd35d912a01c0e7392d4bf..2c3d136f2564d3b29c8c04645cc5abf43577ffc6 100644 --- a/config/application-source.yml +++ b/config/application-source.yml @@ -6,11 +6,11 @@ logging: spring: check: server-uri: http://127.0.0.1:9000 - core-pool-size: 5 - maximum-pool-size: 10 + core-pool-size: 3 + maximum-pool-size: 3 maximum-topic-size: 1 - maximum-table-slice-size: 100000 - extend-maximum-pool-size: 5 + maximum-table-slice-size: 10000 + extend-maximum-pool-size: 3 extract: schema: test databaseType: MS # For MySQL @@ -32,7 +32,7 @@ spring: bootstrap-servers: localhost:9092 datasource: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://127.0.0.1:3306/mysql?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&allowPublicKeyRetrieval=true + url: jdbc:mysql://127.0.0.1:3306/mysql?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&allowPublicKeyRetrieval=true&dontAutoCommit=true # driver-class-name: org.opengauss.Driver # For openGauss # url: # jdbc:opengauss://127.0.0.1:5432/postgres?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&bitToString=true # For openGauss # driver-class-name: oracle.jdbc.OracleDriver diff --git a/config/application.yml b/config/application.yml index da778ee1c4a690a701c0fa30985882bdbeb9b200..eb627d0703111bcd138e689f8dcf1772c9118c34 100644 --- a/config/application.yml +++ b/config/application.yml @@ -6,8 +6,8 @@ spring: kafka: bootstrap-servers: localhost:9092 check: - core-pool-size: 5 - maximum-pool-size: 10 + core-pool-size: 3 + maximum-pool-size: 3 maximum-topic-size: 1 data: check: diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java index 024f503c9cba710b0e89b28acbd3f4decf35b97a..18cced281bb0bab56735c4c3bd7ea3a121b91727 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/AbstractCheckDiffResultBuilder.java @@ -16,6 +16,7 @@ package org.opengauss.datachecker.check.modules.check; import lombok.Getter; + import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.entry.check.Difference; import org.opengauss.datachecker.common.entry.enums.CheckMode; @@ -40,7 +41,8 @@ import java.util.stream.Collectors; * @since :11 */ @Getter -public abstract class AbstractCheckDiffResultBuilder> { +public abstract class AbstractCheckDiffResultBuilder> { private static final Logger log = LogUtils.getLogger(); private static final int MAX_DIFF_REPAIR_SIZE = 5000; @@ -173,7 +175,7 @@ public abstract class AbstractCheckDiffResultBuilder insert, List update, List delete) { - this.keyInsert.addAll(insert); - this.keyUpdate.addAll(update); - this.keyDelete.addAll(delete); - this.keyInsertSet.addAll(insert.stream().map(Difference::getKey).collect(Collectors.toSet())); - this.keyUpdateSet.addAll(update.stream().map(Difference::getKey).collect(Collectors.toSet())); - this.keyDeleteSet.addAll(delete.stream().map(Difference::getKey).collect(Collectors.toSet())); + if (Objects.nonNull(insert)) { + this.keyInsert.addAll(insert); + this.keyInsertSet.addAll(insert.stream().map(Difference::getKey).collect(Collectors.toSet())); + } + if (Objects.nonNull(update)) { + this.keyUpdate.addAll(update); + this.keyUpdateSet.addAll(update.stream().map(Difference::getKey).collect(Collectors.toSet())); + } + if (Objects.nonNull(delete)) { + this.keyDelete.addAll(delete); + this.keyDeleteSet.addAll(delete.stream().map(Difference::getKey).collect(Collectors.toSet())); + } diffSort.sort(this.keyInsert); diffSort.sort(this.keyUpdate); diffSort.sort(this.keyDelete); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java index 36d7218a03e68c0a71140cde2551e74adeefaf16..d96d8f870eb0951870cdf1201f66aca3c2dba41e 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java @@ -77,58 +77,59 @@ public class SliceCheckEventHandler { } } else { LogUtils.info(log, "slice check event , table structure diff [{}][{} : {}]", checkEvent.getCheckName(), - checkEvent.getSource() - .getTableHash(), checkEvent.getSink() - .getTableHash()); + checkEvent.getSource().getTableHash(), checkEvent.getSink().getTableHash()); handleTableStructureDiff(checkEvent); - registerCenter.refreshCheckedTableCompleted(checkEvent.getSlice() - .getTable()); + registerCenter.refreshCheckedTableCompleted(checkEvent.getSlice().getTable()); } } /** * 添加校验失败分片事件处理流程 * - * @param checkEvent + * @param checkEvent checkEvent */ public void handleFailed(SliceCheckEvent checkEvent) { LogUtils.warn(log, "slice check event , table slice has unknown error [{}][{} : {}]", checkEvent.getCheckName(), - checkEvent.getSource(), checkEvent.getSink()); + checkEvent.getSource(), checkEvent.getSink()); long count = getCheckSliceCount(checkEvent); sliceCheckContext.refreshSliceCheckProgress(checkEvent.getSlice(), count); - CheckDiffResult result = buildSliceDiffResult(checkEvent.getSlice(), (int) count, true, "slice has unknown error"); + CheckDiffResult result = buildSliceDiffResult(checkEvent.getSlice(), (int) count, true, + "slice has unknown error"); sliceCheckContext.addCheckResult(checkEvent.getSlice(), result); - registerCenter.refreshCheckedTableCompleted(checkEvent.getSlice() - .getTable()); + registerCenter.refreshCheckedTableCompleted(checkEvent.getSlice().getTable()); } private static long getCheckSliceCount(SliceCheckEvent checkEvent) { SliceExtend source = checkEvent.getSource(); SliceExtend sink = checkEvent.getSink(); - long count = Math.max(source.getCount(), sink.getCount()); - return count; + if (Objects.nonNull(sink) && Objects.nonNull(source)) { + return Math.max(source.getCount(), sink.getCount()); + } else { + return Objects.nonNull(sink) ? sink.getCount() : Objects.nonNull(source) ? source.getCount() : 0; + } } private void handleTableStructureDiff(SliceCheckEvent checkEvent) { long count = getCheckSliceCount(checkEvent); sliceCheckContext.refreshSliceCheckProgress(checkEvent.getSlice(), count); - CheckDiffResult result = buildSliceDiffResult(checkEvent.getSlice(), (int) count, false, "table structure diff"); + CheckDiffResult result = buildSliceDiffResult(checkEvent.getSlice(), (int) count, false, + "table structure diff"); sliceCheckContext.addTableStructureDiffResult(checkEvent.getSlice(), result); } private CheckDiffResult buildSliceDiffResult(SliceVo slice, int count, boolean isTableStructure, String message) { CheckDiffResultBuilder builder = CheckDiffResultBuilder.builder(); builder.checkMode(ConfigCache.getCheckMode()) - .process(ConfigCache.getValue(ConfigConstants.PROCESS_NO)) - .schema(slice.getSchema()) - .table(slice.getTable()) - .sno(slice.getNo()) - .startTime(LocalDateTime.now()) - .endTime(LocalDateTime.now()) - .isTableStructureEquals(isTableStructure) - .isExistTableMiss(false, null) - .rowCount(count) - .error(message); + .process(ConfigCache.getValue(ConfigConstants.PROCESS_NO)) + .schema(slice.getSchema()) + .table(slice.getTable()) + .sno(slice.getNo()) + .startTime(LocalDateTime.now()) + .endTime(LocalDateTime.now()) + .isTableStructureEquals(isTableStructure) + .isExistTableMiss(false, null) + .rowCount(count) + .error(message); return builder.build(); } @@ -141,25 +142,25 @@ public class SliceCheckEventHandler { /** * handleIgnoreTable * - * @param slice slice + * @param slice slice * @param source source - * @param sink sink + * @param sink sink */ public void handleIgnoreTable(SliceVo slice, SliceExtend source, SliceExtend sink) { sliceCheckContext.refreshSliceCheckProgress(slice, 0); CheckDiffResultBuilder builder = CheckDiffResultBuilder.builder(); Endpoint existEndpoint = Objects.nonNull(source) && Objects.isNull(sink) ? Endpoint.SOURCE : Endpoint.SINK; builder.checkMode(ConfigCache.getCheckMode()) - .process(ConfigCache.getValue(ConfigConstants.PROCESS_NO)) - .schema(slice.getSchema()) - .table(slice.getTable()) - .sno(slice.getNo()) - .startTime(LocalDateTime.now()) - .endTime(LocalDateTime.now()) - .isTableStructureEquals(false) - .isExistTableMiss(true, existEndpoint) - .rowCount(0) - .error("table miss"); + .process(ConfigCache.getValue(ConfigConstants.PROCESS_NO)) + .schema(slice.getSchema()) + .table(slice.getTable()) + .sno(slice.getNo()) + .startTime(LocalDateTime.now()) + .endTime(LocalDateTime.now()) + .isTableStructureEquals(false) + .isExistTableMiss(true, existEndpoint) + .rowCount(0) + .error("table miss"); CheckDiffResult result = builder.build(); sliceCheckContext.addTableStructureDiffResult(slice, result); registerCenter.refreshCheckedTableCompleted(slice.getTable()); diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/common/CheckPointData.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/common/CheckPointData.java index 2cdfba15da0f34f4ebb46ee7e2b79c28f5e9f54f..2bbde1c93d5344ec69eb0aa9daa0eba39bcadcdf 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/common/CheckPointData.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/common/CheckPointData.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.common.entry.common; import lombok.Data; import lombok.experimental.Accessors; + import org.opengauss.datachecker.common.entry.enums.Endpoint; import java.util.List; @@ -33,4 +34,10 @@ public class CheckPointData { private String tableName; private boolean isDigit; private List checkPointList; + + @Override + public String toString() { + return "endpoint=" + endpoint + ", tableName=" + tableName + ", isDigit=" + isDigit + ", checkPointList=" + + checkPointList.size(); + } } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/SliceVo.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/SliceVo.java index 7eab24bcf7227a9be69bdfb956bc276a0a8e4881..9392c3c322b32eaf0d95a27e93ef81a6afab0726 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/SliceVo.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/SliceVo.java @@ -78,6 +78,6 @@ public class SliceVo extends BaseSlice { return super.getName() + " total=" + super.getTotal() + " no=" + super.getNo() + ", [ fetch full ]"; } return super.getName() + " total=" + super.getTotal() + " no=" + super.getNo() + ", [" + super.getBeginIdx() - + " , " + super.getEndIdx() + " ]" + " fetchSize=" + super.getFetchSize(); + + " , " + super.getEndIdx() + " ]"; } } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/TableMetadata.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/TableMetadata.java index 8842669cafa4d5d082f00e0f6e5ca82fbf62aa7c..7283fc966e2605bf41070debc3f79d976c45d0a1 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/TableMetadata.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/TableMetadata.java @@ -18,6 +18,7 @@ package org.opengauss.datachecker.common.entry.extract; import lombok.Data; import lombok.ToString; import lombok.experimental.Accessors; + import org.opengauss.datachecker.common.entry.enums.DataBaseType; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.springframework.util.CollectionUtils; @@ -84,8 +85,7 @@ public class TableMetadata { if (primaryMetas == null || primaryMetas.size() != 1) { return false; } - return primaryMetas.get(0) - .isAutoIncrementColumn(); + return primaryMetas.get(0).isAutoIncrementColumn(); } /** @@ -97,6 +97,20 @@ public class TableMetadata { return !CollectionUtils.isEmpty(primaryMetas) && primaryMetas.size() == 1; } + /** + * judge if this table is union primary key table. + * + * @return true if primary is union primary key + */ + public boolean isUnionPrimary() { + return !CollectionUtils.isEmpty(primaryMetas) && primaryMetas.size() > 1; + } + + /** + * judge if this table is single col primary key table. + * + * @return true if primary is union primary key + */ public ColumnsMetaData getSinglePrimary() { if (hasPrimary()) { return primaryMetas.get(0); @@ -134,8 +148,6 @@ public class TableMetadata { public static TableMetadata parse(ResultSet rs, String schema, Endpoint endpoint, DataBaseType databaseType) throws SQLException { - return parse(rs).setSchema(schema) - .setEndpoint(endpoint) - .setDataBaseType(databaseType); + return parse(rs).setSchema(schema).setEndpoint(endpoint).setDataBaseType(databaseType); } } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ShutdownService.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ShutdownService.java index 7daa050b07a05f341c5be671e85deb2070dadd07..261fe1588fbf7fb837ca8d29bba71d26255fd851 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ShutdownService.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/service/ShutdownService.java @@ -35,6 +35,8 @@ public class ShutdownService { @Resource private DynamicThreadPoolManager dynamicThreadPoolManager; @Resource + private ThreadPoolTaskExecutor sliceSendExecutor; + @Resource private ProcessLogService processLogService; @Async @@ -50,6 +52,7 @@ public class ShutdownService { processLogService.saveStopProcessLog(); threadExecutorList.forEach(ExecutorConfigurationSupport::shutdown); executorServiceList.forEach(ExecutorService::shutdownNow); + sliceSendExecutor.shutdown(); System.exit(SpringApplication.exit(SpringUtil.getApplicationContext())); } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/ThreadUtil.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/ThreadUtil.java index ca7cdc2fe68daec5ca92c4ea5749d0977ddebe38..98d2b4aaf0c3432057f15abff8372bcab1d6fea5 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/ThreadUtil.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/ThreadUtil.java @@ -16,17 +16,14 @@ package org.opengauss.datachecker.common.util; import org.apache.commons.lang3.RandomUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.logging.log4j.Logger; -import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicInteger; /** * ThreadUtil @@ -64,6 +61,19 @@ public class ThreadUtil { } } + /** + * sleep circle,max sleep time is 5 seconds (sleep 1-5 sec) + * + * @param times current circle times + */ + public static void sleepCircle(int times) { + try { + TimeUnit.SECONDS.sleep(times / 5 + 1); + } catch (InterruptedException ie) { + LogUtils.warn(log, "thread sleep interrupted exception "); + } + } + /** * The current thread sleeps for 10 - 500 milliseconds */ @@ -103,47 +113,12 @@ public class ThreadUtil { sleep(RandomUtils.nextInt(100, 500)); } - /** - * kill thread by thread name - * - * @param name thread name - */ - public static void killThreadByName(String name) { - AtomicInteger threadCount = new AtomicInteger(0); - do { - ThreadGroup currentGroup = Thread.currentThread().getThreadGroup(); - int noThreads = currentGroup.activeCount(); - Thread[] lstThreads = new Thread[noThreads]; - currentGroup.enumerate(lstThreads); - threadCount.set(0); - Arrays.stream(lstThreads) - .filter(thread -> { - if (StringUtils.containsIgnoreCase(thread.getName(), name)) { - threadCount.incrementAndGet(); - return true; - } - return false; - }) - .forEach(thread -> { - if (thread.getState().equals(Thread.State.WAITING)) { - log.warn("thread [{}] :[{} ] has interrupted", thread.getName(), thread.getState()); - thread.interrupt(); - } else { - threadCount.decrementAndGet(); - log.warn("thread [{}] :[{} ] has stop", thread.getName(), thread.getState()); - thread.stop(); - } - }); - } while (threadCount.get() > 0); - - } - /** * Custom thread pool construction * * @return thread pool */ - @SuppressWarnings({"all"}) + @SuppressWarnings( {"all"} ) public static ExecutorService newSingleThreadExecutor() { return Executors.newFixedThreadPool(1, Executors.defaultThreadFactory()); } @@ -154,14 +129,11 @@ public class ThreadUtil { * @return Scheduled task single thread */ public static ScheduledExecutorService newSingleThreadScheduledExecutor() { - return new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().daemon(true) - .build()); + return new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().daemon(true).build()); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name) { - return new ScheduledThreadPoolExecutor(1, new BasicThreadFactory. - Builder().namingPattern(name) - .build()); + return new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern(name).build()); } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/AsyncConfig.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/AsyncConfig.java index 200ce05d5bcc2fbdb67394d782e8064b289c158f..d63f3fa2383073b2f61594f8ebb55a8f63dee7d7 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/AsyncConfig.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/AsyncConfig.java @@ -15,6 +15,7 @@ package org.opengauss.datachecker.extract.config; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; @@ -22,6 +23,8 @@ import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import javax.annotation.PreDestroy; + /** * AsyncConfig * @@ -29,10 +32,15 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; * @date 2022/5/8 19:17 * @since 11 **/ -@EnableAsync @EnableScheduling @Configuration +@EnableAsync(proxyTargetClass = true) public class AsyncConfig implements AsyncConfigurer { + private ThreadPoolTaskExecutor executor; + @Value("${spring.check.core-pool-size}") + private int corePoolSize; + @Value("${spring.check.maximum-pool-size}") + private int maxPoolSize; /** * Asynchronous processing scenario for data extraction non-core business @@ -40,14 +48,22 @@ public class AsyncConfig implements AsyncConfigurer { * @return ThreadPoolTaskExecutor */ @Override - @Bean(name = "taskAsyncExecutor") + @Bean(name = "sliceSendExecutor") public ThreadPoolTaskExecutor getAsyncExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(1); - executor.setMaxPoolSize(5); - executor.setQueueCapacity(1000); - executor.setThreadNamePrefix("TaskAsyncExecutor-"); + executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(corePoolSize); + executor.setMaxPoolSize(maxPoolSize); + executor.setQueueCapacity(10000); + executor.setThreadNamePrefix("slice-send-executor-"); executor.initialize(); return executor; } + + /** + * destroy executor + */ + @PreDestroy + public void closeExecutor() { + executor.shutdown(); + } } \ No newline at end of file diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java index 4744fc0e002d7071addb3768c060e61c81585b86..fd607fa52b72f8a7686ade1ca41bdaa09667e5a0 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java @@ -64,6 +64,7 @@ import org.springframework.stereotype.Service; import org.springframework.util.StopWatch; import javax.annotation.Resource; + import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -72,9 +73,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -152,8 +151,8 @@ public class DataExtractServiceImpl implements DataExtractService { * * @param processNo Execution process number * @throws ProcessMultipleException The previous instance is executing the data extraction service. - * It cannot restart the new verification - * and throws a ProcessMultipleException exception. + * It cannot restart the new verification + * and throws a ProcessMultipleException exception. */ @Override public PageExtract buildExtractTaskAllTables(String processNo) throws ProcessMultipleException { @@ -174,7 +173,7 @@ public class DataExtractServiceImpl implements DataExtractService { return PageExtract.buildInitPage(taskList.size()); } else { LogUtils.error(log, "process={} is running extract task , {} please wait ... ", atomicProcessNo.get(), - processNo); + processNo); throw new ProcessMultipleException("process {" + atomicProcessNo.get() + "} is running extract task"); } } @@ -185,8 +184,7 @@ public class DataExtractServiceImpl implements DataExtractService { int startIdx = pageExtract.getPageStartIdx(); int endIdx = pageExtract.getPageEndIdx(); for (; startIdx < pageExtract.getSize() && startIdx < endIdx; startIdx++) { - pageList.add(taskReference.get() - .get(startIdx)); + pageList.add(taskReference.get().get(startIdx)); } LogUtils.info(log, "fetchExtractTaskPageTables ={}", pageExtract); return pageList; @@ -197,8 +195,8 @@ public class DataExtractServiceImpl implements DataExtractService { * * @param taskList taskList * @throws ProcessMultipleException The previous instance is executing the data extraction service. - * It cannot restart the new verification - * and throws a ProcessMultipleException exception. + * It cannot restart the new verification + * and throws a ProcessMultipleException exception. */ @Override public void dispatchSinkExtractTaskPage(@NonNull List taskList) throws ProcessMultipleException { @@ -212,22 +210,18 @@ public class DataExtractServiceImpl implements DataExtractService { // Verify whether the task list built on the source side exists on the destination side, // and filter the nonexistent task list final Set tableNames = MetaDataCache.getAllKeys(); - if (CollectionUtils.isEmpty(taskList) || CollectionUtils.isEmpty(tableNames)) { LogUtils.info(log, "build extract task process={} taskList={} ,MetaCache tableNames={}", processNo, - taskList.size(), tableNames); + taskList.size(), tableNames); return; } final List extractTasks = taskList.stream() - .filter(task -> tableNames.contains(task.getTableName())) - .collect(Collectors.toList()); + .filter(task -> tableNames.contains(task.getTableName())) + .collect(Collectors.toList()); extractTasks.forEach(this::updateSinkMetadata); - taskReference.get() - .addAll(extractTasks); - LogUtils.info(log, "build extract task process={} count={},", processNo, taskReference.get() - .size()); + taskReference.get().addAll(extractTasks); + LogUtils.info(log, "build extract task process={} count={},", processNo, taskReference.get().size()); atomicProcessNo.set(processNo); - // taskCountMap is used to count the number of tasks in table fragment query Map taskCountMap = new HashMap<>(Constants.InitialCapacity.EMPTY); taskList.forEach(task -> { @@ -250,8 +244,7 @@ public class DataExtractServiceImpl implements DataExtractService { @Override public void cleanBuildTask() { if (Objects.nonNull(taskReference.getAcquire())) { - taskReference.getAcquire() - .clear(); + taskReference.getAcquire().clear(); } TableExtractStatusCache.removeAll(); atomicProcessNo.set(PROCESS_NO_RESET); @@ -308,9 +301,8 @@ public class DataExtractServiceImpl implements DataExtractService { while (CollectionUtils.isEmpty(taskReference.get())) { ThreadUtil.sleep(MAX_SLEEP_MILLIS_TIME); if (sleepCount++ > MAX_SLEEP_COUNT) { - LogUtils.info(log, "endpoint [{}] and process[{}}] task is empty!", extractProperties.getEndpoint() - .getDescription(), - processNo); + LogUtils.info(log, "endpoint [{}] and process[{}}] task is empty!", + extractProperties.getEndpoint().getDescription(), processNo); break; } } @@ -332,12 +324,11 @@ public class DataExtractServiceImpl implements DataExtractService { final String tableName = task.getTableName(); if (!tableCheckStatus.containsKey(tableName) || tableCheckStatus.get(tableName) == -1) { LogUtils.warn(log, "Abnormal table[{}] status, ignoring the current table data extraction task", - tableName); + tableName); return; } Endpoint endpoint = extractProperties.getEndpoint(); - while (!tableCheckPointCache.getAll() - .containsKey(tableName)) { + while (!tableCheckPointCache.getAll().containsKey(tableName)) { ThreadUtil.sleepHalfSecond(); } List summarizedCheckPoint = tableCheckPointCache.get(tableName); @@ -351,7 +342,7 @@ public class DataExtractServiceImpl implements DataExtractService { } private List buildSliceByTask(List summarizedCheckPoint, TableMetadata tableMetadata, - Endpoint endpoint) { + Endpoint endpoint) { List sliceVoList; if (noTableSlice(tableMetadata, summarizedCheckPoint)) { sliceVoList = buildSingleSlice(tableMetadata, endpoint); @@ -366,16 +357,18 @@ public class DataExtractServiceImpl implements DataExtractService { sliceRegister.batchRegister(sliceVoList); if (sliceVoList.size() <= 20) { ExecutorService executorService = dynamicThreadPoolManager.getExecutor(EXTRACT_EXECUTOR); - LogUtils.debug(log, "table [{}] get executorService success", sliceVoList.get(0) - .getTable()); - sliceVoList.forEach(sliceVo -> executorService.submit(sliceFactory.createSliceProcessor(sliceVo))); + LogUtils.debug(log, "table [{}] get executorService success", sliceVoList.get(0).getTable()); + sliceVoList.forEach(sliceVo -> { + executorService.submit(sliceFactory.createSliceProcessor(sliceVo)); + }); } else { int topicSize = ConfigCache.getIntValue(ConfigConstants.MAXIMUM_TOPIC_SIZE); int extendMaxPoolSize = ConfigCache.getIntValue(ConfigConstants.EXTEND_MAXIMUM_POOL_SIZE); ExecutorService extendExecutor = dynamicThreadPoolManager.getFreeExecutor(topicSize, extendMaxPoolSize); - LogUtils.debug(log, "table [{}] get extendExecutor success", sliceVoList.get(0) - .getTable()); - sliceVoList.forEach(sliceVo -> extendExecutor.submit(sliceFactory.createSliceProcessor(sliceVo))); + LogUtils.debug(log, "table [{}] get extendExecutor success", sliceVoList.get(0).getTable()); + sliceVoList.forEach(sliceVo -> { + extendExecutor.submit(sliceFactory.createSliceProcessor(sliceVo)); + }); } } @@ -431,7 +424,7 @@ public class DataExtractServiceImpl implements DataExtractService { List checkPointList; try { checkPointList = sliceStatement.getCheckPoint(metadata, - ConfigCache.getIntValue(ConfigConstants.MAXIMUM_TABLE_SLICE_SIZE)); + ConfigCache.getIntValue(ConfigConstants.MAXIMUM_TABLE_SLICE_SIZE)); } catch (Exception ex) { LogUtils.error(log, "getCheckPoint error:", ex); return new ArrayList<>(); @@ -449,25 +442,14 @@ public class DataExtractServiceImpl implements DataExtractService { LogUtils.info(log, "start pollSwapPoint thread to register CheckPoint taskSize=" + taskList.size()); checkPointManager.pollSwapPoint(tableCheckPointCache); Endpoint endpoint = ConfigCache.getEndPoint(); - CountDownLatch countDownLatch = new CountDownLatch(taskList.size()); - ExecutorService executorService = Executors.newFixedThreadPool(5); taskList.forEach(task -> { - executorService.submit(() -> { - registerCheckPoint(task, endpoint); - countDownLatch.countDown(); - }); + registerCheckPoint(task, endpoint); }); - try { - countDownLatch.await(); - } catch (InterruptedException e) { - LogUtils.warn(log, "tableRegisterCheckPoint CountDownLatch InterruptedException"); - } LogUtils.info(log, "tableRegisterCheckPoint finished"); while (tableCheckPointCache.tableCount() != taskList.size()) { ThreadUtil.sleepHalfSecond(); } checkPointManager.close(); - executorService.shutdownNow(); sliceRegister.stopCheckPointMonitor(ConfigCache.getEndPoint()); }).start(); } @@ -483,17 +465,15 @@ public class DataExtractServiceImpl implements DataExtractService { tableCheckPointCache.put(tableName, checkPointList); } checkPointManager.send(new CheckPointData().setTableName(tableName) - .setDigit(checkPoint.checkPkNumber(task.getTableMetadata())) - .setCheckPointList(checkPointList)); + .setDigit(checkPoint.checkPkNumber(task.getTableMetadata())) + .setCheckPointList(checkPointList)); } catch (Exception e) { log.error("register check point failed ", e); } } private String sliceTaskNameBuilder(@NonNull String tableName, int index) { - return TASK_NAME_PREFIX.concat(tableName) - .concat("_slice_") - .concat(String.valueOf(index + 1)); + return TASK_NAME_PREFIX.concat(tableName).concat("_slice_").concat(String.valueOf(index + 1)); } private void registerTopic(ExtractTask task) { @@ -511,7 +491,7 @@ public class DataExtractServiceImpl implements DataExtractService { /** * Query table data * - * @param tableName tableName + * @param tableName tableName * @param compositeKeys Review primary key set * @return Primary key corresponds to table data */ diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java index ffdd607a40c863ece3c8ae15c48b0f93874740d7..a35691fac73576d0b0f72eba51f447b81393d4ab 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java @@ -16,6 +16,7 @@ package org.opengauss.datachecker.extract.slice; import com.alibaba.fastjson.JSONObject; + import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; @@ -56,7 +57,7 @@ public class ExtractPointSwapManager { private KafkaConsumerConfig kafkaConsumerConfig; public ExtractPointSwapManager(KafkaTemplate kafkaTemplate, - KafkaConsumerConfig kafkaConsumerConfig) { + KafkaConsumerConfig kafkaConsumerConfig) { this.kafkaTemplate = kafkaTemplate; this.endpoint = ConfigCache.getEndPoint(); this.endpoint = ConfigCache.getEndPoint(); @@ -77,6 +78,7 @@ public class ExtractPointSwapManager { ConsumerRecords records; AtomicInteger deliveredCount = new AtomicInteger(); LogUtils.info(log, "pollSwapPoint thread started"); + int retryTimesWait = 0; while (!isCompletedSwapTablePoint) { try { records = kafkaConsumer.poll(Duration.ofSeconds(1)); @@ -87,11 +89,13 @@ public class ExtractPointSwapManager { tableCheckPointCache.put(pointData.getTableName(), translateDigitPoint(pointData)); deliveredCount.getAndIncrement(); LogUtils.info(log, "swap summarized checkpoint of table [{}]:[{}] ", deliveredCount, - pointData); + pointData.toString()); } }); + ThreadUtil.sleepHalfSecond(); } else { - ThreadUtil.sleepOneSecond(); + LogUtils.info(log, "wait swap summarized checkpoint of table {}...", ++retryTimesWait); + ThreadUtil.sleepCircle(retryTimesWait); } } catch (Exception ex) { if (Objects.equals("java.lang.InterruptedException", ex.getMessage())) { @@ -102,17 +106,16 @@ public class ExtractPointSwapManager { } } LogUtils.warn(log, "close check point swap consumer {} :{}", checkPointSwapTopicName, - kafkaConsumer.groupMetadata() - .groupId()); + kafkaConsumer.groupMetadata().groupId()); kafkaConsumerConfig.closeConsumer(kafkaConsumer); }); } private List translateDigitPoint(CheckPointData pointData) { return pointData.isDigit() ? pointData.getCheckPointList() - .stream() - .map(obj -> Long.parseLong((String) obj)) - .collect(Collectors.toList()) : pointData.getCheckPointList(); + .stream() + .map(obj -> Long.parseLong((String) obj)) + .collect(Collectors.toList()) : pointData.getCheckPointList(); } private void trySubscribe() { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java index d8bedb576bf39e84b71b15b1307ff01cc44e5f35..c061f28938d2b122e4d897f561291ea77a928624 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/SliceProcessorContext.java @@ -31,12 +31,16 @@ import org.opengauss.datachecker.extract.task.sql.AutoSliceQueryStatement; import org.opengauss.datachecker.extract.task.sql.FullQueryStatement; import org.opengauss.datachecker.extract.task.sql.QueryStatementFactory; import org.opengauss.datachecker.extract.task.sql.SliceQueryStatement; +import org.opengauss.datachecker.extract.task.sql.UnionPrimarySliceQueryStatement; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import javax.annotation.Resource; + import java.util.Objects; +import java.util.concurrent.Future; /** * SliceProcessorContext @@ -60,12 +64,23 @@ public class SliceProcessorContext { private KafkaConsumerConfig kafkaConsumerConfig; @Resource private CheckingFeignClient checkingFeignClient; + @Resource + private ThreadPoolTaskExecutor sliceSendExecutor; private SliceStatusFeedbackService sliceStatusFeedbackService; public void saveProcessing(SliceVo slice) { processLogService.saveProcessHistoryLogging(slice.getTable(), slice.getNo()); } + /** + * async thread add threadPool + * + * @param sliceSendRunnable sliceSendRunnable + * @return future + */ + public Future asyncSendSlice(Runnable sliceSendRunnable) { + return sliceSendExecutor.submit(sliceSendRunnable); + } /** * 销毁kafkaTemplate @@ -79,7 +94,7 @@ public class SliceProcessorContext { * 创建分片kafka代理 * * @param topicName topic 名称 - * @param groupId GroupID + * @param groupId GroupID * @return 分片kafka代理 */ public SliceKafkaAgents createSliceFixedKafkaAgents(String topicName, String groupId) { @@ -123,6 +138,15 @@ public class SliceProcessorContext { return factory.createSliceQueryStatement(); } + /** + * create slice query statement of union primary slice + * + * @return UnionPrimarySliceQueryStatement + */ + public UnionPrimarySliceQueryStatement createSlicePageQueryStatement() { + return factory.createSlicePageQueryStatement(); + } + public AutoSliceQueryStatement createAutoSliceQueryStatement(TableMetadata tableMetadata) { CheckPoint checkPoint = new CheckPoint(baseDataService.getDataAccessService()); return factory.createSliceQueryStatement(checkPoint, tableMetadata); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java index da36dd6122be913fa1bc14b3043d46aa06262408..88db5249be458c92ba12a273579e55a94d9e3bb3 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceResultSetSender.java @@ -55,7 +55,7 @@ public class SliceResultSetSender { * constructor * * @param tableMetadata tableMetadata - * @param kafkaOperate kafkaOperate + * @param kafkaOperate kafkaOperate */ public SliceResultSetSender(@NonNull TableMetadata tableMetadata, SliceKafkaAgents kafkaOperate) { this.resultSetHandler = new ResultSetHandlerFactory().createHandler(tableMetadata.getDataBaseType()); @@ -69,14 +69,13 @@ public class SliceResultSetSender { /** * resultSetTranslateAndSendSync * - * @param rsmd rsmd - * @param rs rs - * @param result result - * @param sNo sNo + * @param rsmd rsmd + * @param rs rs + * @param sNo sNo */ public ListenableFuture> resultSetTranslateAndSendSync(ResultSetMetaData rsmd, - ResultSet rs, Map result, int sNo) { - RowDataHash dataHash = resultSetTranslate(rsmd, rs, result, sNo); + ResultSet rs, int sNo) { + RowDataHash dataHash = resultSetTranslate(rsmd, rs, sNo); return kafkaOperate.sendRowDataSync(dataHash); } @@ -93,19 +92,40 @@ public class SliceResultSetSender { /** * resultSetTranslate * - * @param rsmd rsmd - * @param rs rs - * @param result result - * @param sNo sNo + * @param rsmd rsmd + * @param rs rs + * @param sNo sNo */ - public RowDataHash resultSetTranslate(ResultSetMetaData rsmd, ResultSet rs, Map result, int sNo) { - resultSetHandler.putOneResultSetToMap(tableName, rsmd, rs, result); - RowDataHash dataHash = handler(primary, columns, result); + public RowDataHash resultSetTranslate(ResultSetMetaData rsmd, ResultSet rs, int sNo) { + RowDataHash dataHash = handler(primary, columns, resultSetHandler.putOneResultSetToMap(tableName, rsmd, rs)); dataHash.setSNo(sNo); - result.clear(); return dataHash; } + /** + * translate result set and send row kafka + * + * @param values result set + * @param sNo sn + * @return result + */ + public ListenableFuture> resultSetTranslate(Map values, int sNo) { + RowDataHash dataHash = handler(primary, columns, values); + dataHash.setSNo(sNo); + return kafkaOperate.sendRowDataSync(dataHash); + } + + /** + * resultSet read and parse + * + * @param rsmd rsmd + * @param resultSet rs + * @return parse result + */ + public Map resultSet(ResultSetMetaData rsmd, ResultSet resultSet) { + return resultSetHandler.putOneResultSetToMap(tableName, rsmd, resultSet); + } + /** * checkOffsetEnd * @@ -138,12 +158,9 @@ public class SliceResultSetSender { private RowDataHash handler(List primary, List columns, Map rowData) { long rowHash = HASH_HANDLER.xx3Hash(rowData, columns); String primaryValue = HASH_HANDLER.value(rowData, primary); - long primaryHash = HASH_HANDLER.xx3Hash(rowData, primary); + long primaryHash = HASH_HANDLER.xx3Hash(primaryValue); RowDataHash hashData = new RowDataHash(); - hashData.setKey(primaryValue) - .setKHash(primaryHash) - .setSliceKey(sliceKey) - .setVHash(rowHash); + hashData.setKey(primaryValue).setKHash(primaryHash).setSliceKey(sliceKey).setVHash(rowHash); return hashData; } @@ -158,9 +175,9 @@ public class SliceResultSetSender { * csv mode, translate next line data to map and send it to kafka topic * * @param nextLine next line - * @param result temp map - * @param rowIdx row idx of csv file - * @param sNo sNo + * @param result temp map + * @param rowIdx row idx of csv file + * @param sNo sNo */ public void csvTranslateAndSend(String[] nextLine, Map result, int rowIdx, int sNo) { RowDataHash dataHash = csvTranslate(nextLine, result, rowIdx, sNo); @@ -171,9 +188,9 @@ public class SliceResultSetSender { * csv mode, translate next line data to map and send it to kafka topic * * @param nextLine next line - * @param result temp map - * @param rowIdx row idx of csv file - * @param sNo sNo + * @param result temp map + * @param rowIdx row idx of csv file + * @param sNo sNo */ public ListenableFuture> csvTranslateAndSendSync(String[] nextLine, Map result, int rowIdx, int sNo) { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/AbstractProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/AbstractProcessor.java index 733fcc62ef93c24944ef8bc183bbc45c77957c1c..843a8b940c3017815085f7a9a5157f1197ce6c2a 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/AbstractProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/AbstractProcessor.java @@ -40,7 +40,7 @@ public abstract class AbstractProcessor implements SliceProcessor { /** * JDBC fetch size */ - protected static final int FETCH_SIZE = 10000; + protected static final int FETCH_SIZE = 200; /** * log diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java index b90ad5d4d51f7eec8d3cc42db65ee4862893f221..450f8683d80b49a842d8e5653f05c4f44bc9f390 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java @@ -16,7 +16,11 @@ package org.opengauss.datachecker.extract.slice.process; import com.alibaba.druid.pool.DruidDataSource; + import org.apache.logging.log4j.Logger; +import org.opengauss.datachecker.common.config.ConfigCache; +import org.opengauss.datachecker.common.constant.ConfigConstants; +import org.opengauss.datachecker.common.entry.enums.DataBaseType; import org.opengauss.datachecker.common.entry.extract.SliceExtend; import org.opengauss.datachecker.common.entry.extract.SliceVo; import org.opengauss.datachecker.common.entry.extract.TableMetadata; @@ -29,8 +33,9 @@ import org.opengauss.datachecker.extract.slice.common.SliceResultSetSender; import org.opengauss.datachecker.extract.task.sql.FullQueryStatement; import org.opengauss.datachecker.extract.task.sql.QuerySqlEntry; import org.opengauss.datachecker.extract.task.sql.SliceQueryStatement; +import org.opengauss.datachecker.extract.task.sql.UnionPrimarySliceQueryStatement; import org.springframework.kafka.support.SendResult; -import org.springframework.util.StopWatch; +import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; import java.sql.Connection; @@ -38,10 +43,12 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.TreeMap; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; /** @@ -60,7 +67,7 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor { /** * JdbcSliceProcessor * - * @param slice slice + * @param slice slice * @param context context */ public JdbcSliceProcessor(SliceVo slice, SliceProcessorContext context, DruidDataSource dataSource) { @@ -75,19 +82,30 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor { TableMetadata tableMetadata = context.getTableMetaData(table); SliceExtend sliceExtend = createSliceExtend(tableMetadata.getTableHash()); try { - QuerySqlEntry queryStatement = createQueryStatement(tableMetadata); - LogUtils.debug(log, "table [{}] query statement : {}", table, queryStatement.getSql()); - executeQueryStatement(queryStatement, tableMetadata, sliceExtend); - } catch (Exception ex) { + if (tableMetadata.isUnionPrimary()) { + DataBaseType dataBaseType = ConfigCache.getValue(ConfigConstants.DATA_BASE_TYPE, DataBaseType.class); + Assert.isTrue(isSuiteUnionPrimary(dataBaseType), + "Union primary is not supported by current database type " + dataBaseType.getDescription()); + executeSliceQueryStatementPage(tableMetadata, sliceExtend); + } else { + QuerySqlEntry queryStatement = createQueryStatement(tableMetadata); + LogUtils.debug(log, "table [{}] query statement : {}", table, queryStatement.getSql()); + executeQueryStatement(queryStatement, tableMetadata, sliceExtend); + } + } catch (Exception | Error ex) { sliceExtend.setStatus(-1); LogUtils.error(log, "table slice [{}] is error", slice.toSimpleString(), ex); } finally { - LogUtils.info(log, "table slice [{}] is finally ", slice.toSimpleString()); + LogUtils.info(log, "table slice [{}] is finally ", slice.toSimpleString()); feedbackStatus(sliceExtend); context.saveProcessing(slice); } } + private boolean isSuiteUnionPrimary(DataBaseType dataBaseType) { + return Objects.equals(dataBaseType, DataBaseType.OG) || Objects.equals(dataBaseType, DataBaseType.MS); + } + private QuerySqlEntry createQueryStatement(TableMetadata tableMetadata) { if (slice.isSlice()) { SliceQueryStatement sliceStatement = context.createSliceQueryStatement(); @@ -98,27 +116,144 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor { } } + private void executeSliceQueryStatementPage(TableMetadata tableMetadata, SliceExtend sliceExtend) { + // 分片数据统计 + UnionPrimarySliceQueryStatement sliceStatement = context.createSlicePageQueryStatement(); + QuerySqlEntry sliceCountSql = sliceStatement.buildSliceCount(tableMetadata, slice); + int sliceCount = querySliceRowTotalCount(sliceExtend, sliceCountSql); + QuerySqlEntry baseSliceSql = sliceStatement.buildSlice(tableMetadata, slice); + List pageStatementList = sliceStatement.buildPageStatement(baseSliceSql, sliceCount, + slice.getFetchSize()); + SliceResultSetSender sliceSender = null; + Connection connection = null; + try { + // 申请数据库链接 + long estimatedRowCount = slice.isSlice() ? slice.getFetchSize() : tableMetadata.getTableRows(); + long estimatedMemorySize = estimatedMemorySize(tableMetadata.getAvgRowLength(), estimatedRowCount); + connection = jdbcOperation.tryConnectionAndClosedAutoCommit(estimatedMemorySize, dataSource); + // 获取连接,准备查询分片数据: 并开启数据异步处理线程 + sliceSender = createSliceResultSetSender(tableMetadata); + sliceSender.setRecordSendKey(slice.getName()); + List offsetList = new CopyOnWriteArrayList<>(); + List>> batchFutures = new CopyOnWriteArrayList<>(); + AsyncDataHandler asyncHandler = new AsyncDataHandler(batchFutures, sliceSender, offsetList); + asyncHandler.start(); + context.asyncSendSlice(asyncHandler); + // 开始查询数据,并将结果推送到异步处理线程中。 + boolean isFirstStatement = true; + long startOffset = 0L; + for (String pageStatement : pageStatementList) { + if (isFirstStatement) { + // only use first page statement's start offset + startOffset = pageQueryUnionPrimarySlice(pageStatement, connection, sliceSender, asyncHandler); + } else { + // other page statement's start offset is ignored + pageQueryUnionPrimarySlice(pageStatement, connection, sliceSender, asyncHandler); + } + isFirstStatement = false; + } + sliceExtend.setStartOffset(startOffset); + waitToStopAsyncHandlerAndResources(asyncHandler); + updateExtendSliceOffsetAndCount(sliceExtend, rowCount.get(), offsetList); + } catch (Exception ex) { + LogUtils.error(log, "slice [{}] has exception :", slice.getName(), ex); + throw new ExtractDataAccessException(ex.getMessage()); + } finally { + ConnectionMgr.close(connection, null, null); + if (sliceSender != null) { + sliceSender.agentsClosed(); + } + jdbcOperation.releaseConnection(connection); + LogUtils.info(log, "query union primary slice and send data {} Count:{}", sliceExtend.getName(), + rowCount.get()); + } + } + + private long pageQueryUnionPrimarySlice(String pageStatement, Connection connection, + SliceResultSetSender sliceSender, AsyncDataHandler asyncHandler) throws SQLException, InterruptedException { + long startOffset; + PreparedStatement ps = connection.prepareStatement(pageStatement); + ps.setFetchSize(FETCH_SIZE); + ResultSet resultSet = ps.executeQuery(); + startOffset = sliceSender.checkOffsetEnd(); + ResultSetMetaData rsmd = resultSet.getMetaData(); + while (resultSet.next()) { + this.rowCount.incrementAndGet(); + if (asyncHandler.isSenderBusy()) { + Thread.sleep(100); + } + asyncHandler.addRow(sliceSender.resultSet(rsmd, resultSet)); + } + // 数据发送到异步处理线程中,关闭ps与rs + ConnectionMgr.close(null, ps, resultSet); + return startOffset; + } + + private static void waitToStopAsyncHandlerAndResources(AsyncDataHandler asyncHandler) { + // 全部分页查询处理完成,关闭数据库连接,并关闭异步数据处理线程 + try { + asyncHandler.waitToStop(); + } catch (InterruptedException e) { + throw new ExtractDataAccessException("slice data async handler is interrupted"); + } + } + + private int querySliceRowTotalCount(SliceExtend sliceExtend, QuerySqlEntry sliceCountSql) { + int sliceCount = 0; + try (Connection connection = jdbcOperation.tryConnectionAndClosedAutoCommit(1L, dataSource); + PreparedStatement ps = connection.prepareStatement(sliceCountSql.getSql()); + ResultSet resultSet = ps.executeQuery();) { + if (resultSet.next()) { + sliceCount = resultSet.getInt(1); + } + } catch (SQLException ex) { + log.error("execute slice count query error ", ex); + throw new ExtractDataAccessException("execute slice count query error"); + } + log.info("query union primary table slice {} Count:{}", sliceExtend.getName(), sliceCount); + return sliceCount; + } + private void executeQueryStatement(QuerySqlEntry sqlEntry, TableMetadata tableMetadata, SliceExtend sliceExtend) { - StopWatch stopWatch = new StopWatch(slice.getName()); - stopWatch.start("start " + slice.getName()); SliceResultSetSender sliceSender = null; Connection connection = null; PreparedStatement ps = null; ResultSet resultSet = null; try { + // 申请数据库链接 long estimatedRowCount = slice.isSlice() ? slice.getFetchSize() : tableMetadata.getTableRows(); long estimatedMemorySize = estimatedMemorySize(tableMetadata.getAvgRowLength(), estimatedRowCount); connection = jdbcOperation.tryConnectionAndClosedAutoCommit(estimatedMemorySize, dataSource); - LogUtils.debug(log, "query slice and send data sql : {}", sqlEntry.getSql()); - ps = connection.prepareStatement(sqlEntry.getSql()); - resultSet = ps.executeQuery(); - resultSet.setFetchSize(FETCH_SIZE); + // 获取连接,准备查询分片数据: 并开启数据异步处理线程 + List offsetList = new CopyOnWriteArrayList<>(); + List>> batchFutures = new CopyOnWriteArrayList<>(); sliceSender = createSliceResultSetSender(tableMetadata); sliceSender.setRecordSendKey(slice.getName()); + AsyncDataHandler asyncHandler = new AsyncDataHandler(batchFutures, sliceSender, offsetList); + asyncHandler.start(); + context.asyncSendSlice(asyncHandler); + // 开始查询数据,并将结果推送到异步处理线程中。 + ps = connection.prepareStatement(sqlEntry.getSql()); + ps.setFetchSize(FETCH_SIZE); + resultSet = ps.executeQuery(); sliceExtend.setStartOffset(sliceSender.checkOffsetEnd()); - List offsetList = sliceQueryResultAndSendSync(sliceSender, resultSet); + ResultSetMetaData rsmd = resultSet.getMetaData(); + while (resultSet.next()) { + this.rowCount.incrementAndGet(); + if (asyncHandler.isSenderBusy()) { + Thread.sleep(100); + } + // 数据发送到异步处理线程 + asyncHandler.addRow(sliceSender.resultSet(rsmd, resultSet)); + } + // 全部分片查询处理完成,关闭数据库连接,并关闭异步数据处理线程 ,关闭ps与rs + try { + ConnectionMgr.close(null, ps, resultSet); + asyncHandler.waitToStop(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } updateExtendSliceOffsetAndCount(sliceExtend, rowCount.get(), offsetList); - stopWatch.stop(); } catch (Exception ex) { LogUtils.error(log, "slice [{}] has exception :", slice.getName(), ex); throw new ExtractDataAccessException(ex.getMessage()); @@ -128,29 +263,90 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor { sliceSender.agentsClosed(); } jdbcOperation.releaseConnection(connection); - LogUtils.info(log, "query slice and send data cost: {}", stopWatch.shortSummary()); + LogUtils.info(log, "query slice and send data count {}", rowCount.get()); } } - private List sliceQueryResultAndSendSync(SliceResultSetSender sliceSender, ResultSet resultSet) - throws SQLException { - ResultSetMetaData rsmd = resultSet.getMetaData(); - Map result = new TreeMap<>(); - List offsetList = new LinkedList<>(); - List>> batchFutures = new LinkedList<>(); - while (resultSet.next()) { - this.rowCount.incrementAndGet(); - batchFutures.add(sliceSender.resultSetTranslateAndSendSync(rsmd, resultSet, result, slice.getNo())); - if (batchFutures.size() == FETCH_SIZE) { + /** + * async data handler thread + */ + class AsyncDataHandler implements Runnable { + private final List>> batchFutures; + private final SliceResultSetSender sliceSender; + private final int maxQueueSize = 10000; + private final BlockingQueue> batchData = new LinkedBlockingQueue<>(); + private final List offsetList; + + private boolean canStartFetchRow = false; + + AsyncDataHandler(List>> batchFutures, + SliceResultSetSender sliceSender, List offsetList) { + this.batchFutures = batchFutures; + this.sliceSender = sliceSender; + this.offsetList = offsetList; + } + + /** + * start async data handler thread + */ + public void start() { + this.canStartFetchRow = true; + } + + /** + * add row to batch handler queue + * + * @param row row + */ + public void addRow(Map row) { + this.batchData.add(row); + } + + /** + * wait queue empty to stop + * + * @throws InterruptedException InterruptedException + */ + public void waitToStop() throws InterruptedException { + while (!batchData.isEmpty()) { + Thread.sleep(100); + } + this.canStartFetchRow = false; + } + + @Override + public void run() { + log.info("start send slice row {}", slice.getName()); + while (canStartFetchRow) { + if (Objects.isNull(batchData.peek())) { + try { + Thread.sleep(100); + } catch (InterruptedException ignore) { + } + } else { + Map value = batchData.poll(); + batchFutures.add(sliceSender.resultSetTranslate(value, slice.getNo())); + if (batchFutures.size() == FETCH_SIZE) { + offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); + batchFutures.clear(); + } + } + } + if (batchFutures.size() > 0) { offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); batchFutures.clear(); } } - if (batchFutures.size() > 0) { - offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); - batchFutures.clear(); + + /** + * check sender is busy , if busy return true , else return false + * batch queue size >= maxQueueSize return true , else return false + * + * @return boolean + */ + public boolean isSenderBusy() { + return batchData.size() >= maxQueueSize; } - return offsetList; } private SliceResultSetSender createSliceResultSetSender(TableMetadata tableMetadata) { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java index 4a034d841bcfc5390220677bc9b21d28f189cf63..64cfa8b35df73812c7b2c762e6b48b701f0ae076 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcTableProcessor.java @@ -38,8 +38,6 @@ import java.time.Duration; import java.time.LocalDateTime; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.TreeMap; /** * JdbcTableProcessor @@ -110,17 +108,16 @@ public class JdbcTableProcessor extends AbstractTableProcessor { ResultSet resultSet = ps.executeQuery()) { resultSet.setFetchSize(fetchSize); ResultSetMetaData rsmd = resultSet.getMetaData(); - Map result = new TreeMap<>(); int rowCount = 0; while (resultSet.next()) { rowCount++; - batchFutures.add(sliceSender.resultSetTranslateAndSendSync(rsmd, resultSet, result, i)); + batchFutures.add(sliceSender.resultSetTranslateAndSendSync(rsmd, resultSet, i)); if (batchFutures.size() == FETCH_SIZE) { offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); batchFutures.clear(); } } - if (batchFutures.size() > 0) { + if (!batchFutures.isEmpty()) { offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); batchFutures.clear(); } @@ -162,16 +159,15 @@ public class JdbcTableProcessor extends AbstractTableProcessor { ResultSet resultSet = ps.executeQuery()) { resultSet.setFetchSize(fetchSize); ResultSetMetaData rsmd = resultSet.getMetaData(); - Map result = new TreeMap<>(); while (resultSet.next()) { tableRowCount++; - batchFutures.add(sliceSender.resultSetTranslateAndSendSync(rsmd, resultSet, result, 0)); + batchFutures.add(sliceSender.resultSetTranslateAndSendSync(rsmd, resultSet, 0)); if (batchFutures.size() == FETCH_SIZE) { offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); batchFutures.clear(); } } - if (batchFutures.size() > 0) { + if (!batchFutures.isEmpty()) { offsetList.add(getBatchFutureRecordOffsetScope(batchFutures)); batchFutures.clear(); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java index 49d3a3b0bd061e959117ea36b6ce116d3d2db1d2..674e50d80a8675b4dc5da029cbe2050e020de109 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.extract.task; import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.pool.DruidPooledConnection; + import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.constant.ConfigConstants; @@ -66,7 +67,7 @@ public class CheckPoint { * init table CheckPoint List * * @param tableMetadata tableMetadata - * @param slice slice + * @param slice slice * @return check point */ public List initCheckPointList(TableMetadata tableMetadata, int slice) { @@ -80,16 +81,14 @@ public class CheckPoint { stopWatch.start(); DataBaseType dataBaseType = ConfigCache.getValue(ConfigConstants.DATA_BASE_TYPE, DataBaseType.class); DataAccessParam param = new DataAccessParam().setSchema(SqlUtil.escape(schema, dataBaseType)) - .setName(SqlUtil.escape(tableName, dataBaseType)) - .setColName(SqlUtil.escape(pkName, dataBaseType)); + .setName(SqlUtil.escape(tableName, dataBaseType)) + .setColName(SqlUtil.escape(pkName, dataBaseType)); Connection connection = getConnection(); param.setOffset(slice); Object maxPoint = dataAccessService.max(connection, param); List checkPointList = dataAccessService.queryPointList(connection, param); checkPointList.add(maxPoint); - checkPointList = checkPointList.stream() - .distinct() - .collect(Collectors.toList()); + checkPointList = checkPointList.stream().distinct().collect(Collectors.toList()); stopWatch.stop(); LogUtils.info(log, "init check-point-list table [{}]:[{}] ", tableName, stopWatch.shortSummary()); ConnectionMgr.close(connection); @@ -112,15 +111,12 @@ public class CheckPoint { } public boolean checkPkNumber(TableMetadata tableMetadata) { - ColumnsMetaData pkColumn = tableMetadata.getPrimaryMetas() - .get(0); + ColumnsMetaData pkColumn = tableMetadata.getPrimaryMetas().get(0); return MetaDataUtil.isDigitPrimaryKey(pkColumn); } private String getPkName(TableMetadata tableMetadata) { - return tableMetadata.getPrimaryMetas() - .get(0) - .getColumnName(); + return tableMetadata.getPrimaryMetas().get(0).getColumnName(); } public Long[][] translateBetween(List checkPointList) { @@ -128,8 +124,8 @@ public class CheckPoint { for (int i = 0; i < between.length; i++) { String value = (String) checkPointList.get(i); String value2 = (String) checkPointList.get(i + 1); - between[i][0] = Long.parseLong(value); - between[i][1] = Long.parseLong(value2); + between[i][0] = Objects.isNull(value) ? null : Long.parseLong(value); + between[i][1] = Objects.isNull(value2) ? null : Long.parseLong(value2); } return between; } @@ -150,8 +146,8 @@ public class CheckPoint { public long queryMaxIdOfAutoIncrementTable(TableMetadata tableMetadata) { DataAccessParam param = new DataAccessParam(); param.setSchema(tableMetadata.getSchema()) - .setName(tableMetadata.getTableName()) - .setColName(getPkName(tableMetadata)); + .setName(tableMetadata.getTableName()) + .setColName(getPkName(tableMetadata)); Connection connection = ConnectionMgr.getConnection(); String maxId = dataAccessService.max(connection, param); ConnectionMgr.close(connection, null, null); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java index d5ffb07656d4ae3fc98d49930badf249c27875b4..b2a4eae0338acabbd39ad029368f54fa4530bc4b 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.extract.task; import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.fastjson.JSON; + import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.logging.log4j.Logger; @@ -47,6 +48,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.lang.NonNull; import javax.sql.DataSource; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -92,13 +94,13 @@ public class ExtractTaskRunnable implements Runnable { * Thread Constructor * * @param processNo processNo - * @param task task information - * @param support Thread helper class + * @param task task information + * @param support Thread helper class */ public ExtractTaskRunnable(String processNo, ExtractTask task, ExtractThreadSupport support) { this.task = task; - JdbcDataExtractionOperations jdbcOperate = - new JdbcDataExtractionOperations(support.getDataSource(), support.getResourceManager()); + JdbcDataExtractionOperations jdbcOperate = new JdbcDataExtractionOperations(support.getDataSource(), + support.getResourceManager()); this.jdbcOperation = new AtomicReference<>(jdbcOperate); this.checkingFeignClient = support.getCheckingFeignClient(); this.dynamicThreadPoolManager = support.getDynamicThreadPoolManager(); @@ -122,15 +124,13 @@ public class ExtractTaskRunnable implements Runnable { executeMultiTaskOffset(context); } checkingFeignClient.refreshTableExtractStatus(task.getTableName(), extractContext.getEndpoint(), - extractContext.getEndpoint() - .getCode()); + extractContext.getEndpoint().getCode()); log.info("refresh table {} extract status success", task.getTableName()); } catch (Exception ex) { checkingFeignClient.refreshTableExtractStatus(task.getTableName(), extractContext.getEndpoint(), -1); log.error("extract", ex); } finally { - Runtime.getRuntime() - .gc(); + Runtime.getRuntime().gc(); } } @@ -141,19 +141,15 @@ public class ExtractTaskRunnable implements Runnable { try { FullQueryStatement fullQueryStatement = factory.createFullQueryStatement(); QuerySqlEntry querySqlEntry = fullQueryStatement.builderByTaskOffset(context.getTableMetadata()); - connection = jdbcOperation.get() - .tryConnectionAndClosedAutoCommit(context.evaluateMemorySize()); - rowCount = jdbcOperation.get() - .resultSetHandler(connection, querySqlEntry, context, FETCH_SIZE); + connection = jdbcOperation.get().tryConnectionAndClosedAutoCommit(context.evaluateMemorySize()); + rowCount = jdbcOperation.get().resultSetHandler(connection, querySqlEntry, context, FETCH_SIZE); } catch (SQLException ex) { log.error("jdbc query {} error : {}", context.getTableName(), ex.getMessage()); throw new ExtractDataAccessException(); } finally { - jdbcOperation.get() - .releaseConnection(connection); + jdbcOperation.get().releaseConnection(connection); log.info("query table [{}] row-count [{}] cost [{}] milliseconds", context.getTableName(), rowCount, - Duration.between(start, LocalDateTime.now()) - .toMillis()); + Duration.between(start, LocalDateTime.now()).toMillis()); logNumberOfGlobalTasks(context.getTableName(), null, dynamicThreadPoolManager.getExecutor(DynamicTpConstant.EXTRACT_EXECUTOR)); } @@ -164,8 +160,8 @@ public class ExtractTaskRunnable implements Runnable { int slice = extractContext.getMaximumTableSliceSize(); long totalRows = 0; try { - AutoSliceQueryStatement sliceStatement = - factory.createSliceQueryStatement(checkPoint, context.getTableMetadata()); + AutoSliceQueryStatement sliceStatement = factory.createSliceQueryStatement(checkPoint, + context.getTableMetadata()); List querySqlList = sliceStatement.builderByTaskOffset(context.getTableMetadata(), slice); if (CollectionUtils.isNotEmpty(querySqlList)) { totalRows = executeParallelTask(querySqlList, context); @@ -176,8 +172,7 @@ public class ExtractTaskRunnable implements Runnable { throw new ExtractDataAccessException(); } finally { log.info("table [{}] execution [{}] rows completed, taking a total of {} milliseconds", - context.getTableName(), totalRows, Duration.between(start, LocalDateTime.now()) - .toMillis()); + context.getTableName(), totalRows, Duration.between(start, LocalDateTime.now()).toMillis()); } } @@ -194,25 +189,23 @@ public class ExtractTaskRunnable implements Runnable { Connection connection = null; try { connection = jdbcOperation.get() - .tryConnectionAndClosedAutoCommit(context.evaluateMemorySize(sliceSize)); + .tryConnectionAndClosedAutoCommit(context.evaluateMemorySize(sliceSize)); totalRowCount.addAndGet(jdbcOperation.get() - .resultSetHandlerParallelContext(connection, queryEntry, - context, FETCH_SIZE)); + .resultSetHandlerParallelContext(connection, queryEntry, context, FETCH_SIZE)); } catch (SQLException ex) { exceptionCount.incrementAndGet(); log.error("jdbc parallel query [{}] error : {}", queryEntry.getSql(), ex.getMessage()); throw new ExtractDataAccessException(); } finally { countDown(context.getTableName(), countDownLatch, executor); - jdbcOperation.get() - .releaseConnection(connection); + jdbcOperation.get().releaseConnection(connection); } }); }); countDownLatch.await(); if (exceptionCount.get() > 0) { - String msg = - "Table " + context.getTableName() + " parallel query has " + exceptionCount.get() + " task exception"; + String msg = "Table " + context.getTableName() + " parallel query has " + exceptionCount.get() + + " task exception"; log.error(msg); throw new ExtractDataAccessException(msg); } @@ -221,30 +214,26 @@ public class ExtractTaskRunnable implements Runnable { private void seekExtractTableInfo(TableMetadata tableMetadata) { log.info("table [{}] isAutoIncrement=[{}] , column=[{}] , avgRowLength=[{}] , tableRows=[{}] ", - tableMetadata.getTableName(), tableMetadata.isAutoIncrement(), tableMetadata.getColumnsMetas() - .size(), + tableMetadata.getTableName(), tableMetadata.isAutoIncrement(), tableMetadata.getColumnsMetas().size(), tableMetadata.getAvgRowLength(), tableMetadata.getTableRows()); log.info("table [{}] table column metadata -> {}", tableMetadata.getTableName(), getTableColumnInformation(tableMetadata)); } private void enableParallelQueryDop(int taskOffset) throws SQLException { - int dop = Math.min(taskOffset, jdbcOperation.get() - .getParallelQueryDop()); - jdbcOperation.get() - .enableDatabaseParallelQuery(dop); + int dop = Math.min(taskOffset, jdbcOperation.get().getParallelQueryDop()); + jdbcOperation.get().enableDatabaseParallelQuery(dop); } private String getTableColumnInformation(TableMetadata tableMetadata) { return tableMetadata.getColumnsMetas() - .stream() - .map(ColumnsMetaData::getColumnMsg) - .collect(Collectors.joining(" , ")); + .stream() + .map(ColumnsMetaData::getColumnMsg) + .collect(Collectors.joining(" , ")); } private boolean isNotSlice() { - return taskUtilHelper.noTableSlice() || jdbcOperation.get() - .getParallelQueryDop() == 1; + return taskUtilHelper.noTableSlice() || jdbcOperation.get().getParallelQueryDop() == 1; } private void countDown(String tableName, CountDownLatch countDownLatch, ThreadPoolExecutor executor) { @@ -277,7 +266,7 @@ public class ExtractTaskRunnable implements Runnable { /** * constructor * - * @param jdbcDataSource datasource + * @param jdbcDataSource datasource * @param resourceManager resourceManager */ public JdbcDataExtractionOperations(DataSource jdbcDataSource, ResourceManager resourceManager) { @@ -334,9 +323,9 @@ public class ExtractTaskRunnable implements Runnable { * use a jdbc connection to query sql ,and parse and hash query result.then hash result send kafka topic * * @param connection connection - * @param sqlEntry sqlEntry - * @param context context - * @param fetchSize fetchSize + * @param sqlEntry sqlEntry + * @param context context + * @param fetchSize fetchSize * @return resultSize * @throws SQLException SQLException */ @@ -352,13 +341,13 @@ public class ExtractTaskRunnable implements Runnable { Map result = new TreeMap<>(); if (kafkaOperate.isMultiplePartitions()) { while (resultSet.next()) { - context.resultSetMultiplePartitionsHandler(rsmd, resultSet, result); + context.resultSetMultiplePartitionsHandler(rsmd, resultSet); rowCount++; logProcessTableRowNum(context, sqlEntry, rowCount); } } else { while (resultSet.next()) { - context.resultSetSinglePartitionHandler(rsmd, resultSet, result); + context.resultSetSinglePartitionHandler(rsmd, resultSet); rowCount++; logProcessTableRowNum(context, sqlEntry, rowCount); } @@ -372,10 +361,8 @@ public class ExtractTaskRunnable implements Runnable { private void logSliceQueryInfo(QuerySqlEntry sqlEntry, LocalDateTime start, int rowCount, LocalDateTime endQuery, LocalDateTime end) { log.info(" extract table {} , row-count=[{}] completed , cost=[ query={} send={} all={} ]", - sqlEntry.toString(), rowCount, Duration.between(start, endQuery) - .toMillis(), Duration.between(endQuery, end) - .toMillis(), Duration.between(start, end) - .toMillis()); + sqlEntry.toString(), rowCount, Duration.between(start, endQuery).toMillis(), + Duration.between(endQuery, end).toMillis(), Duration.between(start, end).toMillis()); } private void logProcessTableRowNum(QueryTableRowContext context, QuerySqlEntry sqlEntry, int rowCount) { @@ -389,9 +376,9 @@ public class ExtractTaskRunnable implements Runnable { * use a jdbc connection to query sql ,and parse and hash query result.then hash result send kafka topic * * @param connection connection - * @param sqlEntry sqlEntry - * @param context context - * @param fetchSize fetchSize + * @param sqlEntry sqlEntry + * @param context context + * @param fetchSize fetchSize * @return resultSize * @throws SQLException SQLException */ @@ -551,7 +538,7 @@ public class ExtractTaskRunnable implements Runnable { * constructor * * @param tableMetadata tableMetadata - * @param kafkaOperate kafkaOperate + * @param kafkaOperate kafkaOperate */ QueryTableRowContext(@NonNull TableMetadata tableMetadata, KafkaOperations kafkaOperate) { this.resultSetHandler = new ResultSetHandlerFactory().createHandler(tableMetadata.getDataBaseType()); @@ -566,11 +553,9 @@ public class ExtractTaskRunnable implements Runnable { * * @param rs rs */ - public void resultSetMultiplePartitionsHandler(ResultSetMetaData rsmd, ResultSet rs, - Map result) { - resultSetHandler.putOneResultSetToMap(getTableName(), rsmd, rs, result); - RowDataHash dataHash = handler(primary, columns, result); - result.clear(); + public void resultSetMultiplePartitionsHandler(ResultSetMetaData rsmd, ResultSet rs) { + RowDataHash dataHash = handler(primary, columns, + resultSetHandler.putOneResultSetToMap(getTableName(), rsmd, rs)); kafkaOperate.sendMultiplePartitionsRowData(dataHash); } @@ -579,10 +564,9 @@ public class ExtractTaskRunnable implements Runnable { * * @param rs rs */ - public void resultSetSinglePartitionHandler(ResultSetMetaData rsmd, ResultSet rs, Map result) { - resultSetHandler.putOneResultSetToMap(getTableName(), rsmd, rs, result); - RowDataHash dataHash = handler(primary, columns, result); - result.clear(); + public void resultSetSinglePartitionHandler(ResultSetMetaData rsmd, ResultSet rs) { + RowDataHash dataHash = handler(primary, columns, + resultSetHandler.putOneResultSetToMap(getTableName(), rsmd, rs)); kafkaOperate.sendSinglePartitionRowData(dataHash); } @@ -604,9 +588,7 @@ public class ExtractTaskRunnable implements Runnable { String primaryValue = hashHandler.value(rowData, primary); long primaryHash = hashHandler.xx3Hash(rowData, primary); RowDataHash hashData = new RowDataHash(); - hashData.setKey(primaryValue) - .setKHash(primaryHash) - .setVHash(rowHash); + hashData.setKey(primaryValue).setKHash(primaryHash).setVHash(rowHash); return hashData; } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java index 60d2c21b5223b6d3010f64c8b1aa87c4f03d75cc..1f1bb73779871be09610fa891ed635d879c255c5 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ResultSetHandler.java @@ -75,28 +75,27 @@ public abstract class ResultSetHandler { * Convert the current query result set into map according to the metadata information of the result set * * @param tableName JDBC Data query table - * @param rsmd JDBC Data query result set + * @param rsmd JDBC Data query result set * @param resultSet JDBC Data query result set - * @param values values * @return JDBC Data encapsulation results */ - public Map putOneResultSetToMap(final String tableName, ResultSetMetaData rsmd, ResultSet resultSet, - Map values) { + public Map putOneResultSetToMap(final String tableName, ResultSetMetaData rsmd, + ResultSet resultSet) { + Map result = new TreeMap<>(); try { - IntStream.rangeClosed(1, rsmd.getColumnCount()) - .forEach(columnIdx -> { - String columnLabel = null; - try { - columnLabel = rsmd.getColumnLabel(columnIdx); - values.put(columnLabel, convert(resultSet, columnIdx, rsmd)); - } catch (SQLException ex) { - LOG.error(" Convert data [{}:{}] {} error ", tableName, columnLabel, ex.getMessage(), ex); - } - }); + IntStream.rangeClosed(1, rsmd.getColumnCount()).forEach(columnIdx -> { + String columnLabel = null; + try { + columnLabel = rsmd.getColumnLabel(columnIdx); + result.put(columnLabel, convert(resultSet, columnIdx, rsmd)); + } catch (SQLException ex) { + LOG.error(" Convert data [{}:{}] {} error ", tableName, columnLabel, ex.getMessage(), ex); + } + }); } catch (SQLException ex) { LOG.error(" parse data metadata information exception", ex); } - return values; + return result; } /** @@ -108,7 +107,7 @@ public abstract class ResultSetHandler { public Map putOneResultSetToMap(ResultSet resultSet) throws SQLException { final ResultSetMetaData rsmd = resultSet.getMetaData(); String tableName = rsmd.getTableName(1); - return putOneResultSetToMap(tableName, rsmd, resultSet, new TreeMap<>()); + return putOneResultSetToMap(tableName, rsmd, resultSet); } /** @@ -116,7 +115,7 @@ public abstract class ResultSetHandler { * * @param resultSet resultSet * @param columnIdx columnIdx - * @param rsmd rsmd + * @param rsmd rsmd * @return result * @throws SQLException SQLException */ diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/QueryStatementFactory.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/QueryStatementFactory.java index c15698ec7c8d4d3fea34a0bd91274a00570b38a5..c960b454b04b8bf136745b8a5e924cc8ae8033db 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/QueryStatementFactory.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/QueryStatementFactory.java @@ -31,9 +31,9 @@ public class QueryStatementFactory { /** * create SliceQueryStatement * - * @param checkPoint checkPoint + * @param checkPoint checkPoint * @param tableMetadata tableMetadata - * @return SliceQueryStatement + * @return A new AutoSliceQueryStatement instance. */ public AutoSliceQueryStatement createSliceQueryStatement(CheckPoint checkPoint, TableMetadata tableMetadata) { return new SinglePrimaryAutoSliceQueryStatement(checkPoint); @@ -42,12 +42,21 @@ public class QueryStatementFactory { /** * create slice query statement of single primary slice * - * @return SliceQueryStatement + * @return A new SinglePrimarySliceQueryStatement instance. */ public SliceQueryStatement createSliceQueryStatement() { return new SinglePrimarySliceQueryStatement(); } + /** + * create slice query statement of union primary slice + * + * @return A new UnionPrimarySliceQueryStatement instance. + */ + public UnionPrimarySliceQueryStatement createSlicePageQueryStatement() { + return new UnionPrimarySliceQueryStatement(); + } + /** * create FullQueryStatement * @@ -56,8 +65,7 @@ public class QueryStatementFactory { public FullQueryStatement createFullQueryStatement() { return tableMetadata -> { final SelectSqlBuilder sqlBuilder = new SelectSqlBuilder(tableMetadata); - String fullSql = sqlBuilder.isDivisions(false).isCsvMode(ConfigCache.isCsvMode()) - .builder(); + String fullSql = sqlBuilder.isDivisions(false).isCsvMode(ConfigCache.isCsvMode()).builder(); return new QuerySqlEntry(tableMetadata.getTableName(), fullSql, 0, tableMetadata.getTableRows()); }; } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SelectSqlBuilder.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SelectSqlBuilder.java index 9ad37a8119c3ba7b680e21a68f55c77a267bde6d..f3e16b849b59f78e1438edaf730b2f9cb9716500 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SelectSqlBuilder.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SelectSqlBuilder.java @@ -16,6 +16,7 @@ package org.opengauss.datachecker.extract.task.sql; import lombok.Getter; + import org.apache.commons.lang3.StringUtils; import org.opengauss.datachecker.common.entry.enums.DataBaseType; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; @@ -50,35 +51,35 @@ import static org.opengauss.datachecker.extract.task.sql.QuerySqlTemplate.TABLE_ * @since 11 **/ public class SelectSqlBuilder { + private static final String QUERY_WHERE_BETWEEN + = "SELECT :columnsList FROM :schema.:tableName where :pkCondition :orderBy "; private static final Map SQL_GENERATE = new HashMap<>(); - private static final SqlGenerateTemplate GENERATE_TEMPLATE = - (template, sqlGenerateMeta) -> template.replace(COLUMN, sqlGenerateMeta.getColumns()) - .replace(SCHEMA, sqlGenerateMeta.getSchema()) - .replace(TABLE_NAME, sqlGenerateMeta.getTableName()) - .replace(ORDER_BY, sqlGenerateMeta.getOrder()) - .replace(START, String.valueOf(sqlGenerateMeta.getStart())) - .replace(OFFSET, String.valueOf(sqlGenerateMeta.getOffset())); - private static final SqlGenerateTemplate NO_OFFSET_SQL_GENERATE_TEMPLATE = - (template, sqlGenerateMeta) -> template.replace(COLUMN, sqlGenerateMeta.getColumns()) - .replace(SCHEMA, sqlGenerateMeta.getSchema()) - .replace(TABLE_NAME, sqlGenerateMeta.getTableName()); - private static final SqlGenerate OFFSET_GENERATE = - (sqlGenerateMeta) -> GENERATE_TEMPLATE.replace(QuerySqlTemplate.QUERY_OFF_SET, sqlGenerateMeta); - private static final SqlGenerate NO_OFFSET_GENERATE = - (sqlGenerateMeta) -> NO_OFFSET_SQL_GENERATE_TEMPLATE.replace(QuerySqlTemplate.QUERY_NO_OFF_SET, - sqlGenerateMeta); - - private static final SqlGenerateTemplate QUERY_BETWEEN_TEMPLATE = - (template, sqlGenerateMeta) -> template.replace(COLUMN, sqlGenerateMeta.getColumns()) - .replace(SCHEMA, sqlGenerateMeta.getSchema()) - .replace(TABLE_NAME, sqlGenerateMeta.getTableName()) - .replace(ORDER_BY, sqlGenerateMeta.getOrder()) - .replace(PRIMARY_KEY, sqlGenerateMeta.getPrimaryKey()) - .replace(START, String.valueOf(sqlGenerateMeta.getStart())) - .replace(OFFSET, String.valueOf( - sqlGenerateMeta.getStart() + sqlGenerateMeta.getOffset() - 1)); - private static final SqlGenerate QUERY_BETWEEN_GENERATE = - (sqlGenerateMeta -> QUERY_BETWEEN_TEMPLATE.replace(QUERY_BETWEEN_SET, sqlGenerateMeta)); + private static final SqlGenerateTemplate GENERATE_TEMPLATE = (template, sqlGenerateMeta) -> template.replace(COLUMN, + sqlGenerateMeta.getColumns()) + .replace(SCHEMA, sqlGenerateMeta.getSchema()) + .replace(TABLE_NAME, sqlGenerateMeta.getTableName()) + .replace(ORDER_BY, sqlGenerateMeta.getOrder()) + .replace(START, String.valueOf(sqlGenerateMeta.getStart())) + .replace(OFFSET, String.valueOf(sqlGenerateMeta.getOffset())); + private static final SqlGenerateTemplate NO_OFFSET_SQL_GENERATE_TEMPLATE + = (template, sqlGenerateMeta) -> template.replace(COLUMN, sqlGenerateMeta.getColumns()) + .replace(SCHEMA, sqlGenerateMeta.getSchema()) + .replace(TABLE_NAME, sqlGenerateMeta.getTableName()); + private static final SqlGenerate OFFSET_GENERATE = (sqlGenerateMeta) -> GENERATE_TEMPLATE.replace( + QuerySqlTemplate.QUERY_OFF_SET, sqlGenerateMeta); + private static final SqlGenerate NO_OFFSET_GENERATE = (sqlGenerateMeta) -> NO_OFFSET_SQL_GENERATE_TEMPLATE.replace( + QuerySqlTemplate.QUERY_NO_OFF_SET, sqlGenerateMeta); + + private static final SqlGenerateTemplate QUERY_BETWEEN_TEMPLATE = (template, sqlGenerateMeta) -> template.replace( + COLUMN, sqlGenerateMeta.getColumns()) + .replace(SCHEMA, sqlGenerateMeta.getSchema()) + .replace(TABLE_NAME, sqlGenerateMeta.getTableName()) + .replace(ORDER_BY, sqlGenerateMeta.getOrder()) + .replace(PRIMARY_KEY, sqlGenerateMeta.getPrimaryKey()) + .replace(START, String.valueOf(sqlGenerateMeta.getStart())) + .replace(OFFSET, String.valueOf(sqlGenerateMeta.getStart() + sqlGenerateMeta.getOffset() - 1)); + private static final SqlGenerate QUERY_BETWEEN_GENERATE = (sqlGenerateMeta -> QUERY_BETWEEN_TEMPLATE.replace( + QUERY_BETWEEN_SET, sqlGenerateMeta)); static { SQL_GENERATE.put(DataBaseType.MS, OFFSET_GENERATE); @@ -99,6 +100,7 @@ public class SelectSqlBuilder { private boolean isFirst = false; private boolean isEnd = false; private boolean isCsvMode = false; + private String countSnippet = "count(1)"; /** * Table fragment query SQL Statement Builder @@ -114,7 +116,7 @@ public class SelectSqlBuilder { /** * Table fragment query SQL Statement Builder * - * @param start start + * @param start start * @param offset offset * @return builder */ @@ -172,6 +174,7 @@ public class SelectSqlBuilder { this.isCsvMode = isCsvMode; return this; } + /** * Table fragment query SQL Statement Builder * @@ -183,25 +186,41 @@ public class SelectSqlBuilder { Assert.notEmpty(columnsMetas, Message.COLUMN_METADATA_EMPTY_NOT_TO_BUILD_SQL); final ConditionLimit conditionLimit = tableMetadata.getConditionLimit(); if (Objects.nonNull(conditionLimit)) { - return buildSelectSqlConditionLimit(tableMetadata, conditionLimit); + return buildSelectSqlConditionLimit(tableMetadata, conditionLimit, null); } else if (isDivisions) { - return buildSelectSqlWherePrimary(tableMetadata); + return buildSelectSqlWherePrimary(tableMetadata, null); } else { - return buildSelectSqlOffsetZero(columnsMetas, tableMetadata.getTableName()); + return buildSelectSqlOffsetZero(columnsMetas, tableMetadata.getTableName(), null); } } - String QUERY_WHERE_BETWEEN = "SELECT :columnsList FROM :schema.:tableName where :pkCondition :orderBy "; + /** + * select row count sql builder + * + * @return sql + */ + public String countBuilder() { + Assert.isTrue(Objects.nonNull(tableMetadata), Message.TABLE_METADATA_NULL_NOT_TO_BUILD_SQL); + List columnsMetas = tableMetadata.getColumnsMetas(); + Assert.notEmpty(columnsMetas, Message.COLUMN_METADATA_EMPTY_NOT_TO_BUILD_SQL); + final ConditionLimit conditionLimit = tableMetadata.getConditionLimit(); + if (Objects.nonNull(conditionLimit)) { + return buildSelectSqlConditionLimit(tableMetadata, conditionLimit, countSnippet); + } else if (isDivisions) { + return buildSelectSqlWherePrimary(tableMetadata, countSnippet); + } else { + return buildSelectSqlOffsetZero(columnsMetas, tableMetadata.getTableName(), countSnippet); + } + } - private String buildSelectSqlWherePrimary(TableMetadata tableMetadata) { + private String buildSelectSqlWherePrimary(TableMetadata tableMetadata, String countSnippet) { List columnsMetas = tableMetadata.getColumnsMetas(); String schemaEscape = escape(schema, dataBaseType); String tableName = escape(tableMetadata.getTableName(), dataBaseType); - String columnNames = getColumnNameList(columnsMetas, dataBaseType); - String primaryKey = escape(tableMetadata.getPrimaryMetas() - .get(0) - .getColumnName(), dataBaseType); - final String orderBy = getOrderBy(tableMetadata.getPrimaryMetas(), dataBaseType); + boolean isCountSnippet = StringUtils.isNotEmpty(countSnippet); + String columnNames = isCountSnippet ? countSnippet : getColumnNameList(columnsMetas, dataBaseType); + String primaryKey = escape(tableMetadata.getPrimaryMetas().get(0).getColumnName(), dataBaseType); + final String orderBy = isCountSnippet ? "" : getOrderBy(tableMetadata.getPrimaryMetas(), dataBaseType); String pkCondition; if (StringUtils.isNotEmpty(seqStart) && StringUtils.isNotEmpty(seqEnd)) { pkCondition = getPkCondition(primaryKey); @@ -209,10 +228,10 @@ public class SelectSqlBuilder { pkCondition = getNumberPkCondition(primaryKey); } return QUERY_WHERE_BETWEEN.replace(COLUMN, columnNames) - .replace(SCHEMA, schemaEscape) - .replace(TABLE_NAME, tableName) - .replace(PK_CONDITION, pkCondition) - .replace(ORDER_BY, orderBy); + .replace(SCHEMA, schemaEscape) + .replace(TABLE_NAME, tableName) + .replace(PK_CONDITION, pkCondition) + .replace(ORDER_BY, orderBy); } private String getNumberPkCondition(String primaryKey) { @@ -265,23 +284,24 @@ public class SelectSqlBuilder { } } - private String buildSelectSqlConditionLimit(TableMetadata tableMetadata, ConditionLimit conditionLimit) { + private String buildSelectSqlConditionLimit(TableMetadata tableMetadata, ConditionLimit conditionLimit, + String countSnippet) { List columnsMetas = tableMetadata.getColumnsMetas(); - String columnNames = getColumnNameList(columnsMetas, dataBaseType); + boolean isCountSnippet = StringUtils.isNotEmpty(countSnippet); + String columnNames = isCountSnippet ? countSnippet : getColumnNameList(columnsMetas, dataBaseType); final String schemaEscape = escape(schema, dataBaseType); final String tableName = escape(tableMetadata.getTableName(), dataBaseType); - final String orderBy = getOrderBy(tableMetadata.getPrimaryMetas(), dataBaseType); - SqlGenerateMeta sqlGenerateMeta = - new SqlGenerateMeta(schemaEscape, tableName, columnNames, orderBy, conditionLimit.getStart(), - conditionLimit.getOffset()); + final String orderBy = isCountSnippet ? "" : getOrderBy(tableMetadata.getPrimaryMetas(), dataBaseType); + SqlGenerateMeta sqlGenerateMeta = new SqlGenerateMeta(schemaEscape, tableName, columnNames, orderBy, + conditionLimit.getStart(), conditionLimit.getOffset()); return getSqlGenerate(dataBaseType).replace(sqlGenerateMeta); } private String getOrderBy(List primaryMetas, DataBaseType dataBaseType) { return "order by " + primaryMetas.stream() - .map(ColumnsMetaData::getColumnName) - .map(key -> escape(key, dataBaseType) + " asc") - .collect(Collectors.joining(DELIMITER)); + .map(ColumnsMetaData::getColumnName) + .map(key -> escape(key, dataBaseType) + " asc") + .collect(Collectors.joining(DELIMITER)); } public String buildSelectSqlOffset(TableMetadata tableMetadata, long start, long offset) { @@ -295,11 +315,9 @@ public class SelectSqlBuilder { sqlGenerateMeta = new SqlGenerateMeta(schemaEscape, tableName, columnNames, orderBy, start, offset); return getSqlGenerate(dataBaseType).replace(sqlGenerateMeta); } else { - String primaryKey = escape(tableMetadata.getPrimaryMetas() - .get(0) - .getColumnName(), dataBaseType); - sqlGenerateMeta = - new SqlGenerateMeta(schemaEscape, tableName, columnNames, orderBy, start, offset, primaryKey); + String primaryKey = escape(tableMetadata.getPrimaryMetas().get(0).getColumnName(), dataBaseType); + sqlGenerateMeta = new SqlGenerateMeta(schemaEscape, tableName, columnNames, orderBy, start, offset, + primaryKey); return QUERY_BETWEEN_GENERATE.replace(sqlGenerateMeta); } } @@ -311,19 +329,20 @@ public class SelectSqlBuilder { return SqlUtil.escape(content, dataBaseType); } - private String buildSelectSqlOffsetZero(List columnsMetas, String tableName) { - String columnNames = getColumnNameList(columnsMetas, dataBaseType); + private String buildSelectSqlOffsetZero(List columnsMetas, String tableName, String countSnippet) { + boolean isCountSnippet = StringUtils.isNotEmpty(countSnippet); + String columnNames = isCountSnippet ? countSnippet : getColumnNameList(columnsMetas, dataBaseType); String schemaEscape = escape(schema, dataBaseType); - SqlGenerateMeta sqlGenerateMeta = - new SqlGenerateMeta(schemaEscape, escape(tableName, dataBaseType), columnNames); + SqlGenerateMeta sqlGenerateMeta = new SqlGenerateMeta(schemaEscape, escape(tableName, dataBaseType), + columnNames); return NO_OFFSET_GENERATE.replace(sqlGenerateMeta); } private String getColumnNameList(@NonNull List columnsMetas, DataBaseType dataBaseType) { return columnsMetas.stream() - .map(ColumnsMetaData::getColumnName) - .map(column -> escape(column, dataBaseType)) - .collect(Collectors.joining(DELIMITER)); + .map(ColumnsMetaData::getColumnName) + .map(column -> escape(column, dataBaseType)) + .collect(Collectors.joining(DELIMITER)); } private SqlGenerate getSqlGenerate(DataBaseType dataBaseType) { @@ -397,7 +416,7 @@ public class SelectSqlBuilder { /** * Generate SQL statement according to SQL generator metadata object * - * @param template SQL template + * @param template SQL template * @param sqlGenerateMeta SQL generator metadata * @return sql */ diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SinglePrimarySliceQueryStatement.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SinglePrimarySliceQueryStatement.java index 48623feb54dfa5efd4223ff62921b5557dde0a6d..607e0e23a5fcf11a3cdd288ecf15cf6fd5c9d2f4 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SinglePrimarySliceQueryStatement.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/SinglePrimarySliceQueryStatement.java @@ -59,6 +59,6 @@ public class SinglePrimarySliceQueryStatement implements SliceQueryStatement { } private Object translateOffset(boolean isDigit, String beginIdx) { - return isDigit ? Long.valueOf(beginIdx) : beginIdx; + return Objects.isNull(beginIdx) ? null : isDigit ? Long.valueOf(beginIdx) : beginIdx; } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/UnionPrimarySliceQueryStatement.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/UnionPrimarySliceQueryStatement.java new file mode 100644 index 0000000000000000000000000000000000000000..ca917e850a08d6f4b37548029057f7338b910812 --- /dev/null +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/UnionPrimarySliceQueryStatement.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2022-2022 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.extract.task.sql; + +import org.opengauss.datachecker.common.config.ConfigCache; +import org.opengauss.datachecker.common.entry.enums.CheckMode; +import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; +import org.opengauss.datachecker.common.entry.extract.SliceVo; +import org.opengauss.datachecker.common.entry.extract.TableMetadata; +import org.opengauss.datachecker.extract.util.MetaDataUtil; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * single primary slice query statement + * + * @author :wangchao + * @date :Created in 2023/8/9 + * @since :11 + */ +public class UnionPrimarySliceQueryStatement implements SliceQueryStatement { + private final boolean isHalfOpenHalfClosed; + + /** + * create SinglePrimarySliceQueryStatement + */ + public UnionPrimarySliceQueryStatement() { + // csv mode, slice data scope is full closed , but jdbc mode ,slice data scope is half open and half closed + this.isHalfOpenHalfClosed = !Objects.equals(ConfigCache.getCheckMode(), CheckMode.CSV); + } + + /** + * build slice count sql entry + * + * @param tableMetadata tableMetadata + * @param slice slice + * @return sql entry + */ + public QuerySqlEntry buildSliceCount(TableMetadata tableMetadata, SliceVo slice) { + final SelectSqlBuilder sqlBuilder = new SelectSqlBuilder(tableMetadata); + sqlBuilder.isDivisions(slice.getTotal() > 1); + sqlBuilder.isFirstCondition(slice.getNo() == 1); + sqlBuilder.isEndCondition(slice.getNo() == slice.getTotal()); + sqlBuilder.isHalfOpenHalfClosed(isHalfOpenHalfClosed); + sqlBuilder.isCsvMode(ConfigCache.isCsvMode()); + ColumnsMetaData primaryKey = tableMetadata.getSinglePrimary(); + boolean isDigit = MetaDataUtil.isDigitPrimaryKey(primaryKey); + Object offset = translateOffset(isDigit, slice.getBeginIdx()); + Object endOffset = translateOffset(isDigit, slice.getEndIdx()); + sqlBuilder.offset(offset, endOffset); + return new QuerySqlEntry(slice.getTable(), sqlBuilder.countBuilder(), offset, endOffset); + } + + @Override + public QuerySqlEntry buildSlice(TableMetadata tableMetadata, SliceVo slice) { + final SelectSqlBuilder sqlBuilder = new SelectSqlBuilder(tableMetadata); + sqlBuilder.isDivisions(slice.getTotal() > 1); + sqlBuilder.isFirstCondition(slice.getNo() == 1); + sqlBuilder.isEndCondition(slice.getNo() == slice.getTotal()); + sqlBuilder.isHalfOpenHalfClosed(isHalfOpenHalfClosed); + sqlBuilder.isCsvMode(ConfigCache.isCsvMode()); + ColumnsMetaData primaryKey = tableMetadata.getSinglePrimary(); + boolean isDigit = MetaDataUtil.isDigitPrimaryKey(primaryKey); + Object offset = translateOffset(isDigit, slice.getBeginIdx()); + Object endOffset = translateOffset(isDigit, slice.getEndIdx()); + sqlBuilder.offset(offset, endOffset); + return new QuerySqlEntry(slice.getTable(), sqlBuilder.builder(), offset, endOffset); + } + + private Object translateOffset(boolean isDigit, String beginIdx) { + return Objects.isNull(beginIdx) ? null : isDigit ? Long.valueOf(beginIdx) : beginIdx; + } + + /** + * build slice select sql, if select count bigger than a large number,so we will select it by page select. + * page select for example select * from where ... limit xxx offset xxx + * + * @param baseSliceSql slice sql entry + * @param sliceCount slice total count + * @param fetchSize page select fetch size + * @return page select sql + */ + public List buildPageStatement(QuerySqlEntry baseSliceSql, int sliceCount, int fetchSize) { + int totalPage = sliceCount / fetchSize + (sliceCount % fetchSize == 0 ? 0 : 1); + List statements = new ArrayList<>(totalPage); + for (int i = 0; i < totalPage; i++) { + StringBuilder sqlBuilder = new StringBuilder(baseSliceSql.getSql()); + sqlBuilder.append(" limit ").append(fetchSize).append(" offset ").append(i * fetchSize); + statements.add(sqlBuilder.toString()); + } + return statements; + } +} diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/HashHandler.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/HashHandler.java index c2a6b8382ddb03780eac0f5617c0b9bb8bd45250..3810a500ee27b934766622e0d8482433bfbf5d6e 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/HashHandler.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/util/HashHandler.java @@ -16,13 +16,13 @@ package org.opengauss.datachecker.extract.util; import net.openhft.hashing.LongHashFunction; + +import org.apache.commons.lang3.StringUtils; import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.stream.Collectors; import static org.opengauss.datachecker.extract.constants.ExtConstants.PRIMARY_DELIMITER; @@ -46,24 +46,35 @@ public class HashHandler { * find the corresponding value of the field in the map, and splice the found value. * * @param columnsValueMap Field corresponding query data - * @param columns List of field names + * @param columns List of field names * @return Hash calculation result corresponding to the current row */ public long xx3Hash(Map columnsValueMap, List columns) { if (CollectionUtils.isEmpty(columns)) { return 0L; } - String colValue = - columnsValueMap.entrySet().stream().filter(entry -> columns.contains(entry.getKey())).map(Entry::getValue) - .collect(Collectors.joining()); - return XX_3_HASH.hashChars(colValue); + StringBuilder valueBuffer = new StringBuilder(); + for (String column : columns) { + valueBuffer.append(columnsValueMap.getOrDefault(column, "")); + } + return XX_3_HASH.hashChars(valueBuffer); + } + + /** + * Hash calculation of a single field + * + * @param value field value + * @return hash result + */ + public long xx3Hash(String value) { + return XX_3_HASH.hashChars(value); } /** * column hash result * * @param columnsValueMap columns value - * @param columns column names + * @param columns column names * @return column hash result */ public String value(Map columnsValueMap, List columns) { @@ -71,11 +82,17 @@ public class HashHandler { return ""; } List values = new ArrayList<>(); - columns.forEach(column -> { - if (columnsValueMap.containsKey(column)) { - values.add(columnsValueMap.get(column)); - } - }); - return values.stream().map(String::valueOf).collect(Collectors.joining(PRIMARY_DELIMITER)); + if (columns.size() == 1) { + return columnsValueMap.getOrDefault(columns.get(0), ""); + } else if (columns.size() == 2) { + return columnsValueMap.get(columns.get(0)) + PRIMARY_DELIMITER + columnsValueMap.get(columns.get(1)); + } else { + columns.forEach(column -> { + if (columnsValueMap.containsKey(column)) { + values.add(columnsValueMap.get(column)); + } + }); + return StringUtils.join(values, PRIMARY_DELIMITER); + } } } diff --git a/datachecker-extract/src/main/resources/application-sink.yml b/datachecker-extract/src/main/resources/application-sink.yml index 2cd02755bdc47706d4b201178151c44159a10c4e..d9348e1ffcf5002668920fe8645d6eba87497021 100644 --- a/datachecker-extract/src/main/resources/application-sink.yml +++ b/datachecker-extract/src/main/resources/application-sink.yml @@ -43,4 +43,5 @@ spring: maxPoolPreparedStatementPerConnectionSize: 20 useGlobalDataSourceStat: true connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500 + maxAllowedPacketSize: 1073741824 diff --git a/datachecker-extract/src/main/resources/application-source.yml b/datachecker-extract/src/main/resources/application-source.yml index a74745c59b4799ab82795450e7cddd7c988ad4ff..37cf04b741446332b773e548fc7933d3c5c38f19 100644 --- a/datachecker-extract/src/main/resources/application-source.yml +++ b/datachecker-extract/src/main/resources/application-source.yml @@ -49,3 +49,4 @@ spring: maxPoolPreparedStatementPerConnectionSize: 20 useGlobalDataSourceStat: true connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500 + maxAllowedPacketSize: 1073741824 diff --git a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/BaseDataResultSetHandlerTest.java b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/BaseDataResultSetHandlerTest.java index 1b7dd716e46e0e68c1e62dc11b7f91a5a095b2d5..f18c42eeb1a96ba6ef221462486499bb9524947c 100644 --- a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/BaseDataResultSetHandlerTest.java +++ b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/BaseDataResultSetHandlerTest.java @@ -39,7 +39,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TreeMap; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -129,10 +128,8 @@ public class BaseDataResultSetHandlerTest { preparedStatement = connection.prepareStatement(executeQueryStatement); resultSet = preparedStatement.executeQuery(); ResultSetMetaData rsmd = resultSet.getMetaData(); - Map values = new TreeMap<>(); while (resultSet.next()) { - resultSetHandler.putOneResultSetToMap(tableName, rsmd, resultSet, values); - result.add(new HashMap<>(values)); + result.add(resultSetHandler.putOneResultSetToMap(tableName, rsmd, resultSet)); } } catch (SQLException sqlErr) { log.error("test table [{}] error", tableName, sqlErr); diff --git a/run.sh b/run.sh new file mode 100644 index 0000000000000000000000000000000000000000..5dc511703b49c99bf0f89d85256d427d24308712 --- /dev/null +++ b/run.sh @@ -0,0 +1,100 @@ +#!/bin/bash + +run_path=$(cd `dirname $0`; pwd) + +function parse_app_info() { + local file_name="$1" + if [[ $file_name =~ ^(datachecker-check|datachecker-extract)-([0-9]+\.[0-9]+\.[0-9]+(\.[a-zA-Z0-9]+)?)\.jar$ ]]; then + local app_name="${BASH_REMATCH[1]}" + local app_version="${BASH_REMATCH[2]}" + echo "$app_name" + echo "$app_version" + else + echo "Invalid file name format: $file_name" + exit 1 + fi +} + +check_file=$(find $run_path -maxdepth 1 -name "datachecker-check-*.jar" | head -n 1) +if [ -n "$check_file" ]; then + IFS='-' read -ra parts <<< "$(basename $check_file)" + app_check_name=$(basename $check_file) + app_check_version="${parts[-1]%.jar}" +else + echo "No datachecker-check application file found." + exit 1 +fi + +extract_files=$(find $run_path -maxdepth 1 -name "datachecker-extract-*.jar") +if [ ${#extract_files} -gt 0 ]; then + app_extract_names=() + app_extract_versions=() + for extract_file in $extract_files; do + IFS='-' read -ra parts <<< "$(basename $extract_file)" + app_extract_names+=("$(basename $extract_file)") + app_extract_versions+=("${parts[-1]%.jar}") + done +else + echo "No datachecker-extract application file found." + exit 1 +fi + +extract_source="--source" +extract_sink="--sink" + +function start_apps() { + echo "Starting datachecker-check application..." + sleep 1s + nohup java -jar $run_path/$app_check_name > /dev/null 2>&1 & + echo "datachecker-check started with PID: $!" + sleep 2s + echo "Starting datachecker-extract applications..." + nohup java -jar $run_path/${app_extract_names[0]} $extract_source > /dev/null 2>&1 & + echo "datachecker-extract instance $extract_source started with PID: $!" + sleep 2s + nohup java -jar $run_path/${app_extract_names[0]} $extract_sink > /dev/null 2>&1 & + echo "datachecker-extract instance $extract_sink started with PID: $!" + sleep 2s +} + +function stop_apps() { + echo "Stopping datachecker-check application..." + pids=$(ps -ef | grep "$run_path/$app_check_name" | grep -v grep | awk '{print $2}') + if [ -n "$pids" ]; then + for pid in $pids; do + kill $pid + sleep 2s + echo "Killed datachecker-check process with PID: $pid" + done + else + echo "datachecker-check application is not running." + fi + + echo "Stopping datachecker-extract applications..." + for ((i = 0; i < 2; i++)); do + extract_name=${app_extract_names[$i]} + pids=$(ps -ef | grep "$run_path/$extract_name" | grep -v grep | awk '{print $2}') + if [ -n "$pids" ]; then + for pid in $pids; do + kill $pid + sleep 2s + echo "Killed datachecker-extract instance $((i + 1)) process with PID: $pid" + done + else + echo "datachecker-extract instance $((i + 1)) is not running." + fi + done +} + +case "$1" in + "start") + start_apps + ;; + "stop") + stop_apps + ;; + *) + echo "Usage: $0 {start|stop}" + exit 1 + ;; +esac \ No newline at end of file