diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/DatacheckerCheckApplication.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/DatacheckerCheckApplication.java index 16b88395604b540ee86d522e6f2e43f7c8b5cdcf..3a872aa6b9a6b4c861090237e248900f088c6686 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/DatacheckerCheckApplication.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/DatacheckerCheckApplication.java @@ -35,12 +35,10 @@ import org.springframework.scheduling.annotation.EnableAsync; @EnableFeignClients(basePackages = {"org.opengauss.datachecker.check.client"}) @SpringBootApplication public class DatacheckerCheckApplication { - private static ConfigurableApplicationContext context; public static void main(String[] args) { context = SpringApplication.run(DatacheckerCheckApplication.class, args); - final EndpointManagerService managerService = context.getBean(EndpointManagerService.class); managerService.start(); if (!managerService.isEndpointHealth()) { diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java index d40cbf2d5e7b8e14cad6d1806690884584bd926a..e21f37143c72d914a818a7bfda852d47aaa3a13b 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/cache/TableStatusRegister.java @@ -16,8 +16,10 @@ package org.opengauss.datachecker.check.cache; import lombok.extern.slf4j.Slf4j; +import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; import org.opengauss.datachecker.common.entry.check.Pair; import org.opengauss.datachecker.common.exception.ExtractException; +import org.opengauss.datachecker.common.util.ThreadUtil; import org.springframework.stereotype.Service; import javax.validation.constraints.NotEmpty; @@ -27,7 +29,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingDeque; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -97,7 +98,7 @@ public class TableStatusRegister implements Cache { * Start and execute self-test thread */ public void selfCheck() { - ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + ScheduledExecutorService scheduledExecutor = ThreadUtil.newSingleThreadScheduledExecutor(); scheduledExecutor.scheduleWithFixedDelay(() -> { Thread.currentThread().setName(SELF_CHECK_THREAD_NAME); doCheckingStatus(); @@ -190,8 +191,7 @@ public class TableStatusRegister implements Cache { /** * Initialize cache and set default values for key values * - * @param keys - * @return + * @param keys keys */ @Override public void init(@NotEmpty Set keys) { @@ -226,7 +226,7 @@ public class TableStatusRegister implements Cache { // The current key already exists and cannot be added repeatedly throw new ExtractException("The current key= " + key + " already exists and cannot be added repeatedly"); } - Map partitionMap = new ConcurrentHashMap<>(); + Map partitionMap = new ConcurrentHashMap<>(InitialCapacity.CAPACITY_16); IntStream.range(0, partitions).forEach(partition -> { partitionMap.put(partition, TASK_STATUS_COMPLETED_VALUE); }); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java index a5bbb2108470dbc7b9471a3a2c902094430883eb..d3422b1738d760a369228dd7ed347bcde6498da9 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/DataCheckRunnable.java @@ -216,7 +216,7 @@ public class DataCheckRunnable extends DataCheckKafkaConsumer implements Runnabl * @param bucketList Bucket list */ private void initBucketList(Endpoint endpoint, int partitions, List bucketList) { - Map bucketMap = new ConcurrentHashMap<>(Constants.InitialCapacity.MAP); + Map bucketMap = new ConcurrentHashMap<>(Constants.InitialCapacity.EMPTY); // Use feign client to pull Kafka data List dataList = getTopicPartitionsData(endpoint, partitions); if (CollectionUtils.isEmpty(dataList)) { @@ -273,7 +273,7 @@ public class DataCheckRunnable extends DataCheckKafkaConsumer implements Runnabl Map entriesOnlyOnLeft = bucketDifference.entriesOnlyOnLeft(); Map entriesOnlyOnRight = bucketDifference.entriesOnlyOnRight(); Map> entriesDiffering = bucketDifference.entriesDiffering(); - Map> differing = new HashMap<>(Constants.InitialCapacity.MAP); + Map> differing = new HashMap<>(Constants.InitialCapacity.EMPTY); entriesDiffering.forEach((key, diff) -> { differing.put(key, Pair.of(diff.leftValue(), diff.rightValue())); }); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java index 81f83497d2142da839a4c3adac75757fc95bfa0f..466033dfc7700c4d22c75c3988bf25a624a98c6e 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/IncrementCheckThread.java @@ -23,6 +23,7 @@ import org.opengauss.datachecker.check.modules.bucket.Bucket; import org.opengauss.datachecker.check.modules.bucket.BuilderBucketHandler; import org.opengauss.datachecker.check.modules.merkle.MerkleTree; import org.opengauss.datachecker.check.modules.merkle.MerkleTree.Node; +import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; import org.opengauss.datachecker.common.entry.check.DataCheckParam; import org.opengauss.datachecker.common.entry.check.DifferencePair; import org.opengauss.datachecker.common.entry.check.Pair; @@ -318,7 +319,7 @@ public class IncrementCheckThread extends Thread { if (CollectionUtils.isEmpty(dataList)) { return; } - Map bucketMap = new HashMap<>(); + Map bucketMap = new HashMap<>(InitialCapacity.CAPACITY_16); BuilderBucketHandler bucketBuilder = new BuilderBucketHandler(bucketCapacity); // Pull the data to build the bucket list @@ -393,7 +394,7 @@ public class IncrementCheckThread extends Thread { Map entriesOnlyOnLeft = bucketDifference.entriesOnlyOnLeft(); Map entriesOnlyOnRight = bucketDifference.entriesOnlyOnRight(); Map> entriesDiffering = bucketDifference.entriesDiffering(); - Map> differing = new HashMap<>(); + Map> differing = new HashMap<>(InitialCapacity.CAPACITY_16); entriesDiffering.forEach((key, diff) -> { differing.put(key, Pair.of(diff.leftValue(), diff.rightValue())); }); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/EndpointManagerService.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/EndpointManagerService.java index 008f2b80581dc1cc1e117319a051e3e289a6ec42..3729117ec45f5bc37c03cf9e5c155489c9947b3e 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/EndpointManagerService.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/EndpointManagerService.java @@ -20,6 +20,7 @@ import org.apache.commons.lang3.StringUtils; import org.opengauss.datachecker.check.client.FeignClientService; import org.opengauss.datachecker.check.config.DataCheckProperties; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.util.ThreadUtil; import org.opengauss.datachecker.common.web.Result; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -28,7 +29,6 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.Charset; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -43,7 +43,7 @@ import java.util.concurrent.TimeUnit; @Service public class EndpointManagerService { private static final String ENDPOINT_HEALTH_CHECK_THREAD_NAME = "endpoint-health-check-thread"; - private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors.newSingleThreadScheduledExecutor(); + private static final ScheduledExecutorService SCHEDULED_EXECUTOR = ThreadUtil.newSingleThreadScheduledExecutor(); @Autowired private FeignClientService feignClientService; diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/EndpointMetaDataManager.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/EndpointMetaDataManager.java index a68e2bd787ca460552ca06dd30f149028d763f82..8061294c1797bfb1a5b56193e15f371ac315b905 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/EndpointMetaDataManager.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/EndpointMetaDataManager.java @@ -15,11 +15,13 @@ package org.opengauss.datachecker.check.service; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.MapUtils; import org.opengauss.datachecker.check.client.FeignClientService; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.TableMetadata; -import org.springframework.beans.factory.annotation.Autowired; +import org.opengauss.datachecker.common.exception.CheckMetaDataException; import org.springframework.stereotype.Service; import java.util.ArrayList; @@ -37,28 +39,41 @@ import java.util.stream.Collectors; */ @Slf4j @Service +@RequiredArgsConstructor public class EndpointMetaDataManager { private static final List CHECK_TABLE_LIST = new ArrayList<>(); - @Autowired - private EndpointStatusManager endpointStatusManager; - - @Autowired - private FeignClientService feignClientService; + private final EndpointStatusManager endpointStatusManager; + private final FeignClientService feignClientService; /** * Reload metadata information */ public void load() { CHECK_TABLE_LIST.clear(); - final Map metadataMap = feignClientService.queryMetaDataOfSchema(Endpoint.SOURCE); - feignClientService.queryMetaDataOfSchema(Endpoint.SINK); - if (!metadataMap.isEmpty()) { - CHECK_TABLE_LIST.addAll( - metadataMap.values().stream().sorted(Comparator.comparing(TableMetadata::getTableRows)) - .map(TableMetadata::getTableName).collect(Collectors.toUnmodifiableList())); + final Map sourceMetadataMap = feignClientService.queryMetaDataOfSchema(Endpoint.SOURCE); + final Map sinkMetadataMap = feignClientService.queryMetaDataOfSchema(Endpoint.SINK); + if (MapUtils.isNotEmpty(sourceMetadataMap) && MapUtils.isNotEmpty(sinkMetadataMap)) { + final List sourceTables = getEndpointTableNamesSortByTableRows(sourceMetadataMap); + final List sinkTables = getEndpointTableNamesSortByTableRows(sinkMetadataMap); + final List checkTables = compareAndFilterEndpointTables(sourceTables, sinkTables); + CHECK_TABLE_LIST.addAll(checkTables); + log.info("Load endpoint metadata information"); + } else { + log.error("The metadata information is empty, and the verification is terminated abnormally," + + "sourceMetadata={},sinkMetadata={}", sourceMetadataMap.size(), sinkMetadataMap.size()); + throw new CheckMetaDataException( + "The metadata information is empty, and the verification is terminated abnormally"); } - log.info("Load endpoint metadata information"); + } + + private List compareAndFilterEndpointTables(List sourceTables, List sinkTables) { + return sourceTables.stream().filter(table -> sinkTables.contains(table)).collect(Collectors.toList()); + } + + private List getEndpointTableNamesSortByTableRows(Map metadataMap) { + return metadataMap.values().stream().sorted(Comparator.comparing(TableMetadata::getTableRows)) + .map(TableMetadata::getTableName).collect(Collectors.toUnmodifiableList()); } /** diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java index ffad95a00e106303fd1af6f353bd8a0528619937..3cc844a1150e5194518458a7fd7b4bf32e8ae6f6 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/service/impl/CheckServiceImpl.java @@ -15,7 +15,6 @@ package org.opengauss.datachecker.check.service.impl; -import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -49,7 +48,6 @@ import java.time.LocalDateTime; import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -179,8 +177,6 @@ public class CheckServiceImpl implements CheckService { log.info("check full mode : query meta data from db schema (source and sink )"); // Source endpoint task construction final List extractTasks = feignClientService.buildExtractTaskAllTables(Endpoint.SOURCE, processNo); - extractTasks.forEach(task -> log - .debug("check full mode : build extract task source {} : {}", processNo, JSON.toJSONString(task))); // Sink endpoint task construction feignClientService.buildExtractTaskAllTables(Endpoint.SINK, processNo, extractTasks); log.info("check full mode : build extract task sink {}", processNo); @@ -199,7 +195,7 @@ public class CheckServiceImpl implements CheckService { */ public void startCheckPollingThread() { if (Objects.nonNull(PROCESS_SIGNATURE.get()) && Objects.equals(CHECK_MODE_REF.getAcquire(), CheckMode.FULL)) { - ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + ScheduledExecutorService scheduledExecutor = ThreadUtil.newSingleThreadScheduledExecutor(); endpointMetaDataManager.load(); scheduledExecutor.scheduleWithFixedDelay(() -> { Thread.currentThread().setName(SELF_CHECK_POLL_THREAD_NAME); @@ -224,7 +220,7 @@ public class CheckServiceImpl implements CheckService { private void startCheckIncrementMode() { // Enable incremental verification mode - polling thread start if (Objects.equals(CHECK_MODE_REF.getAcquire(), CheckMode.INCREMENT)) { - ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + ScheduledExecutorService scheduledExecutor = ThreadUtil.newSingleThreadScheduledExecutor(); scheduledExecutor.scheduleWithFixedDelay(() -> { Thread.currentThread().setName(SELF_CHECK_POLL_THREAD_NAME); log.debug("check polling check mode=[{}]", CHECK_MODE_REF.get()); @@ -332,7 +328,9 @@ public class CheckServiceImpl implements CheckService { @Override public synchronized void cleanCheck() { final String processNo = PROCESS_SIGNATURE.get(); - cleanBuildTask(processNo); + if (Objects.nonNull(processNo)) { + cleanBuildTask(processNo); + } ThreadUtil.sleep(3000); CHECK_MODE_REF.set(null); PROCESS_SIGNATURE.set(null); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/ExtractApplication.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/ExtractApplication.java index 5170a2308ac34a357fde38f47d15176c8d2576f4..457dafce3d47fbf1dbb773dc64c4fe2a0db37947 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/ExtractApplication.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/ExtractApplication.java @@ -31,9 +31,7 @@ import org.springframework.scheduling.annotation.EnableAsync; @EnableFeignClients(basePackages = {"org.opengauss.datachecker.extract.client"}) @SpringBootApplication public class ExtractApplication { - public static void main(String[] args) { SpringApplication.run(ExtractApplication.class, args); } - } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/cache/TableExtractStatusCache.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/cache/TableExtractStatusCache.java index 129dba6d6c706705f84de253d74a8bbc31fb49ed..1bd464a7608117e133fd2cf2360fc5c8e4d82e1c 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/cache/TableExtractStatusCache.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/cache/TableExtractStatusCache.java @@ -16,6 +16,7 @@ package org.opengauss.datachecker.extract.cache; import lombok.extern.slf4j.Slf4j; +import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; import org.springframework.lang.NonNull; import org.springframework.util.Assert; @@ -67,7 +68,7 @@ public class TableExtractStatusCache { public static void init(Map map) { Assert.isTrue(Objects.nonNull(map), Message.INIT_STATUS_PARAM_EMPTY); map.forEach((table, taskCount) -> { - Map tableStatus = new ConcurrentHashMap<>(); + Map tableStatus = new ConcurrentHashMap<>(InitialCapacity.CAPACITY_16); IntStream.rangeClosed(TASK_ORDINAL_START_INDEX, taskCount).forEach(ordinal -> { tableStatus.put(ordinal, STATUS_INIT); }); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/DruidDataSourceConfig.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/DruidDataSourceConfig.java index 88bcddbdcaec0bd7e8efdf6d5aa791b2189b9e94..ed309ff44007a995d711962807d2022d64501c67 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/DruidDataSourceConfig.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/config/DruidDataSourceConfig.java @@ -18,6 +18,7 @@ package org.opengauss.datachecker.extract.config; import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.support.http.StatViewServlet; import com.alibaba.druid.support.http.WebStatFilter; +import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.web.servlet.FilterRegistrationBean; @@ -74,7 +75,7 @@ public class DruidDataSourceConfig { public ServletRegistrationBean initServletRegistrationBean() { ServletRegistrationBean bean = new ServletRegistrationBean<>(new StatViewServlet(), "/druid/*"); - HashMap initParameters = new HashMap<>(); + HashMap initParameters = new HashMap<>(InitialCapacity.CAPACITY_1); initParameters.put("allow", ""); bean.setInitParameters(initParameters); return bean; @@ -92,7 +93,7 @@ public class DruidDataSourceConfig { bean.setFilter(new WebStatFilter()); // exclusions: sets the requests to be filtered out so that statistics are not collected. - Map initParams = new HashMap<>(); + Map initParams = new HashMap<>(InitialCapacity.CAPACITY_1); // this things don't count. initParams.put("exclusions", "*.js,*.css,/druid/*,/jdbc/*"); bean.setInitParameters(initParams); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dao/DataBaseMetaDataDAOImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dao/DataBaseMetaDataDAOImpl.java index 1060da3ca38f195487f58543a37b0f67dc4ba968..34c36d8ec79ae3c073e39c921609d90c6edcde9d 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dao/DataBaseMetaDataDAOImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dao/DataBaseMetaDataDAOImpl.java @@ -35,7 +35,6 @@ import org.springframework.util.CollectionUtils; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -163,7 +162,7 @@ public class DataBaseMetaDataDAOImpl implements MetaDataDAO { final List tableMetadata = new ArrayList<>(); String sqlQueryTableRowCount = MetaSqlMapper.getTableCount(); final String schema = getSchema(); - tableNameList.stream().forEach(tableName -> { + tableNameList.forEach(tableName -> { final Long rowCount = JdbcTemplateOne.queryForObject(String.format(sqlQueryTableRowCount, schema, tableName), Long.class); tableMetadata.add(new TableMetadata().setTableName(tableName).setTableRows(rowCount)); @@ -173,22 +172,21 @@ public class DataBaseMetaDataDAOImpl implements MetaDataDAO { @Override public List queryColumnMetadata(String tableName) { - return queryColumnMetadata(Arrays.asList(tableName)); + return queryColumnMetadata(List.of(tableName)); } @Override public List queryColumnMetadata(List tableNames) { - Map map = new HashMap<>(Constants.InitialCapacity.MAP); + Map map = new HashMap<>(Constants.InitialCapacity.EMPTY); map.put("tableNames", tableNames); map.put("databaseSchema", getSchema()); NamedParameterJdbcTemplate jdbc = new NamedParameterJdbcTemplate(JdbcTemplateOne); String sql = MetaSqlMapper.getMetaSql(extractProperties.getDatabaseType(), DataBaseMeta.COLUMN); - return jdbc.query(sql, map, new RowMapper() { + return jdbc.query(sql, map, new RowMapper<>() { int columnIndex = COLUMN_INDEX_FIRST_ZERO; @Override public ColumnsMetaData mapRow(ResultSet rs, int rowNum) throws SQLException { - ColumnsMetaData columnsMetaData = new ColumnsMetaData().setTableName(rs.getString(++columnIndex)) .setColumnName(rs.getString(++columnIndex)) .setOrdinalPosition(rs.getInt(++columnIndex)) @@ -210,5 +208,4 @@ public class DataBaseMetaDataDAOImpl implements MetaDataDAO { private String getSchema() { return extractProperties.getSchema(); } - } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dao/MetaSqlMapper.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dao/MetaSqlMapper.java index b7d55282bf9ff63bf4d7afcd57a0114181f15e77..52c1355e4f5142405580e74cea08f67435df6164 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dao/MetaSqlMapper.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/dao/MetaSqlMapper.java @@ -81,8 +81,10 @@ public class MetaSqlMapper { /** * Table metadata query SQL */ - String TABLE_METADATA_SQL = "select table_name tableName , table_rows tableRows " - + "from information_schema.tables WHERE table_schema=?"; + String TABLE_METADATA_SQL = + "select distinct info.table_name tableName , info.table_rows tableRows from information_schema.tables info " + + "left join information_schema.columns col on info.table_schema=col.table_schema " + + "and info.table_name=col.table_name where info.table_schema=? and col.column_key='PRI'"; /** * column metadata query SQL @@ -102,8 +104,10 @@ public class MetaSqlMapper { /** * Table metadata query SQL */ - String TABLE_METADATA_SQL = "select table_name tableName , 0 tableRows from information_schema.tables " - + "WHERE table_schema=? and TABLE_TYPE='BASE TABLE';"; + String TABLE_METADATA_SQL = "select distinct kcu.table_name tableName , 0 tableRows" + + " from information_schema.key_column_usage kcu WHERE kcu.constraint_name in (" + + " select constraint_name from information_schema.table_constraints tc" + + " where tc.constraint_schema=? and tc.constraint_type='PRIMARY KEY')"; /** * column metadata query SQL diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java index 78755ef7aebc4d35d6feccaff038ad90fee1927e..ccc2ea7ffe8cd0501ec3c25ade18e1a765d36579 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java @@ -194,6 +194,8 @@ public class DataExtractServiceImpl implements DataExtractService { final Set tableNames = MetaDataCache.getAllKeys(); if (atomicProcessNo.compareAndSet(PROCESS_NO_RESET, processNo)) { if (CollectionUtils.isEmpty(taskList) || CollectionUtils.isEmpty(tableNames)) { + log.info("build extract task process={} taskList={} ,MetaCache tableNames={}", processNo, + taskList.size(), tableNames); return; } final List extractTasks = diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java index 28a908aac2992b39dff8fd773565e862a992a470..202223c24ecca4a94a86d5eefe8f0f9862137f4f 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/DataManipulationService.java @@ -16,6 +16,7 @@ package org.opengauss.datachecker.extract.task; import org.apache.commons.lang3.StringUtils; +import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.entry.extract.TableMetadataHash; @@ -106,7 +107,7 @@ public class DataManipulationService { */ private List> queryColumnValuesByCompositePrimary(String selectDml, List batchParam) { // Query the current task data and organize the data - HashMap paramMap = new HashMap<>(); + HashMap paramMap = new HashMap<>(InitialCapacity.CAPACITY_16); paramMap.put(DmlBuilder.PRIMARY_KEYS, batchParam); return queryColumnValues(selectDml, paramMap); } @@ -120,7 +121,7 @@ public class DataManipulationService { */ private List> queryColumnValues(String selectDml, List primaryKeys) { // Query the current task data and organize the data - HashMap paramMap = new HashMap<>(); + HashMap paramMap = new HashMap<>(InitialCapacity.CAPACITY_16); paramMap.put(DmlBuilder.PRIMARY_KEYS, primaryKeys); return queryColumnValues(selectDml, paramMap); } @@ -205,7 +206,7 @@ public class DataManipulationService { private Map> transtlateColumnValues(List> columnValues, List primaryMetas) { final List primaryKeys = getCompositeKeyColumns(primaryMetas); - Map> map = new HashMap<>(); + Map> map = new HashMap<>(InitialCapacity.CAPACITY_16); columnValues.forEach(values -> { map.put(getCompositeKey(values, primaryKeys), values); }); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskBuilder.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskBuilder.java index b5c248a400b18dfaacd3f8df94c1b0832d5db6d3..cbba53b4f1623acc7f5d3b53ce3b63e55c8c9c27 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskBuilder.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskBuilder.java @@ -15,6 +15,7 @@ package org.opengauss.datachecker.extract.task; +import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; import org.opengauss.datachecker.common.entry.extract.ExtractIncrementTask; import org.opengauss.datachecker.common.entry.extract.ExtractTask; import org.opengauss.datachecker.common.entry.extract.SourceDataLog; @@ -75,7 +76,7 @@ public class ExtractTaskBuilder { }).collect(Collectors.toList()); // taskCountMap is used to count the number of tasks in table fragment query - Map taskCountMap = new HashMap<>(); + Map taskCountMap = new HashMap<>(InitialCapacity.CAPACITY_16); tableNameOrderList.forEach(tableName -> { TableMetadata metadata = MetaDataCache.get(tableName); if (Objects.nonNull(metadata)) { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java index a2bebe52e036473fd6ceedd17c25ceea3a5123db..9628e7fc6fd740a3c17f76a5f61ff603218bc074 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/ExtractTaskRunnable.java @@ -16,6 +16,7 @@ package org.opengauss.datachecker.extract.task; import lombok.extern.slf4j.Slf4j; +import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.ExtractTask; import org.opengauss.datachecker.common.entry.extract.RowDataHash; @@ -116,7 +117,7 @@ public class ExtractTaskRunnable extends KafkaProducerWapper implements Runnable } private List> queryColumnValues(String sql) { - Map map = new HashMap<>(); + Map map = new HashMap<>(InitialCapacity.CAPACITY_16); NamedParameterJdbcTemplate jdbc = new NamedParameterJdbcTemplate(jdbcTemplate); return jdbc.query(sql, map, (rs, rowNum) -> { ResultSetMetaData metaData = rs.getMetaData(); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/IncrementExtractTaskRunnable.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/IncrementExtractTaskRunnable.java index 5c023669e869921f580286ac6c0b63fe9c612bd8..e9f9c2198933ce2aea10051977317b1d28f11e82 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/IncrementExtractTaskRunnable.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/IncrementExtractTaskRunnable.java @@ -16,6 +16,7 @@ package org.opengauss.datachecker.extract.task; import lombok.extern.slf4j.Slf4j; +import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.ExtractIncrementTask; @@ -90,7 +91,7 @@ public class IncrementExtractTaskRunnable extends KafkaProducerWapper implements SelectDmlBuilder sqlBuilder = buildSelectSql(tableMetadata, schema); // Query the current task data and organize the data - HashMap paramMap = new HashMap<>(); + HashMap paramMap = new HashMap<>(InitialCapacity.CAPACITY_16); final List compositePrimaryValues = sourceDataLog.getCompositePrimaryValues(); paramMap.put(DmlBuilder.PRIMARY_KEYS, getSqlParam(sqlBuilder, tableMetadata.getPrimaryMetas(), compositePrimaryValues)); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/TaskJdbcDataCheckThread.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/TaskJdbcDataCheckThread.java index 43ed07ba9eb94b8688ea1fb5b29e5fe14c40f591..6c8083e4040cef145135eaf7a1ed99e03f37f230 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/TaskJdbcDataCheckThread.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/TaskJdbcDataCheckThread.java @@ -16,6 +16,7 @@ package org.opengauss.datachecker.extract.task; import lombok.extern.slf4j.Slf4j; +import org.opengauss.datachecker.common.constant.Constants.InitialCapacity; import org.opengauss.datachecker.common.entry.enums.Endpoint; import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.util.FileUtils; @@ -72,7 +73,7 @@ public class TaskJdbcDataCheckThread extends Thread { FileUtils.createDirectories(path); FileUtils.deleteFile(fileName); FileUtils.writeAppendFile(fileName, JsonObjectUtil.format(dataRowList)); - Map dataMap = new HashMap<>(); + Map dataMap = new HashMap<>(InitialCapacity.CAPACITY_16); dataRowList.forEach(row -> { if (dataMap.containsKey(row.getPrimaryKey())) { replateList.add(row.getPrimaryKey()); diff --git a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/DataBaseMetaDataDAOImplTests.java b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/DataBaseMetaDataDAOImplTests.java index 73878a64aa847ff6296eb7a8d836300a12c0d856..6bb27f8f9e684581a1f96702410e3aef4ab22891 100644 --- a/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/DataBaseMetaDataDAOImplTests.java +++ b/datachecker-extract/src/test/java/org/opengauss/datachecker/extract/dao/DataBaseMetaDataDAOImplTests.java @@ -22,7 +22,6 @@ import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; -import java.sql.SQLException; import java.util.List; /** @@ -34,20 +33,26 @@ import java.util.List; */ @Slf4j @SpringBootTest -class DataBaseMetaDataDAOImplTests { +public class DataBaseMetaDataDAOImplTests { @Autowired private MetaDataDAO mysqlMetadataDAO; + /** + * queryTableMetadata + */ @Test - void queryTableMetadata() throws SQLException { + public void queryTableMetadata() { List tableMetadata = mysqlMetadataDAO.queryTableMetadata(); for (TableMetadata metadata : tableMetadata) { log.info(metadata.toString()); } } + /** + * queryColumnMetadata + */ @Test - void queryColumnMetadata() throws SQLException { + public void queryColumnMetadata() { List tableMetadata = mysqlMetadataDAO.queryTableMetadata(); for (TableMetadata metadata : tableMetadata) { List columnsMetadata = mysqlMetadataDAO.queryColumnMetadata(metadata.getTableName()); diff --git a/datachecker-mock-data/src/main/java/org/opengauss/datachecker/extract/service/ExtractMockTableService.java b/datachecker-mock-data/src/main/java/org/opengauss/datachecker/extract/service/ExtractMockTableService.java index 9533662faf6d2aa61beaf5327a96ddddffa84108..dc9a3c0535082e997d84c76efc95848a084fab58 100644 --- a/datachecker-mock-data/src/main/java/org/opengauss/datachecker/extract/service/ExtractMockTableService.java +++ b/datachecker-mock-data/src/main/java/org/opengauss/datachecker/extract/service/ExtractMockTableService.java @@ -85,24 +85,24 @@ public class ExtractMockTableService { /** * create table sql */ - String CREATE = "CREATE TABLE :TABLENAME ( b_number VARCHAR(30) NOT NULL COLLATE 'utf8mb4_0900_ai_ci'," - + " b_type VARCHAR(20) NULL DEFAULT NULL COLLATE 'utf8mb4_0900_ai_ci'," - + " b_user VARCHAR(20) NULL DEFAULT NULL COLLATE 'utf8mb4_0900_ai_ci'," + String CREATE = "CREATE TABLE :TABLENAME ( b_number VARCHAR(30) NOT NULL ," + + " b_type VARCHAR(20) NULL DEFAULT NULL ," + + " b_user VARCHAR(20) NULL DEFAULT NULL ," + " b_int INT(10) NULL DEFAULT NULL, b_bigint BIGINT(19) NULL DEFAULT '0'," - + " b_text TEXT NULL DEFAULT NULL COLLATE 'utf8mb4_0900_ai_ci'," - + " b_longtext LONGTEXT NULL DEFAULT NULL COLLATE 'utf8mb4_0900_ai_ci'," + + " b_text TEXT NULL DEFAULT NULL ," + + " b_longtext LONGTEXT NULL DEFAULT NULL ," + " b_date DATE NULL DEFAULT NULL, b_datetime DATETIME NULL DEFAULT NULL," + " b_timestamp TIMESTAMP NULL DEFAULT NULL," - + " b_attr1 VARCHAR(255) NULL DEFAULT NULL COLLATE 'utf8mb4_0900_ai_ci'," - + " b_attr2 VARCHAR(255) NULL DEFAULT NULL COLLATE 'utf8mb4_0900_ai_ci'," - + " b_attr3 VARCHAR(255) NULL DEFAULT NULL COLLATE 'utf8mb4_0900_ai_ci'," - + " b_attr4 VARCHAR(255) NULL DEFAULT NULL COLLATE 'utf8mb4_0900_ai_ci'," - + " b_attr5 VARCHAR(255) NULL DEFAULT NULL COLLATE 'utf8mb4_0900_ai_ci'," - + " b_attr6 VARCHAR(255) NULL DEFAULT NULL COLLATE 'utf8mb4_0900_ai_ci'," - + " b_attr7 VARCHAR(255) NULL DEFAULT NULL COLLATE 'utf8mb4_0900_ai_ci'," - + " b_attr8 VARCHAR(255) NULL DEFAULT NULL COLLATE 'utf8mb4_0900_ai_ci'," - + " b_attr9 VARCHAR(255) NULL DEFAULT NULL COLLATE 'utf8mb4_0900_ai_ci'," - + " b_attr10 VARCHAR(255) NULL DEFAULT NULL COLLATE 'utf8mb4_0900_ai_ci'," + + " b_attr1 VARCHAR(255) NULL DEFAULT NULL ," + + " b_attr2 VARCHAR(255) NULL DEFAULT NULL ," + + " b_attr3 VARCHAR(255) NULL DEFAULT NULL ," + + " b_attr4 VARCHAR(255) NULL DEFAULT NULL ," + + " b_attr5 VARCHAR(255) NULL DEFAULT NULL ," + + " b_attr6 VARCHAR(255) NULL DEFAULT NULL ," + + " b_attr7 VARCHAR(255) NULL DEFAULT NULL ," + + " b_attr8 VARCHAR(255) NULL DEFAULT NULL ," + + " b_attr9 VARCHAR(255) NULL DEFAULT NULL ," + + " b_attr10 VARCHAR(255) NULL DEFAULT NULL ," + " PRIMARY KEY (`b_number`) USING BTREE ) COLLATE='utf8mb4_0900_ai_ci'" + " ENGINE=InnoDB ;"; } diff --git a/openGauss-tools-datachecker-performance.iml b/openGauss-tools-datachecker-performance.iml deleted file mode 100644 index ad5a890c48875203dd6f3618d68591dc2b891020..0000000000000000000000000000000000000000 --- a/openGauss-tools-datachecker-performance.iml +++ /dev/null @@ -1,12 +0,0 @@ - - - - - - - - - - - - \ No newline at end of file diff --git a/pom.xml b/pom.xml index 6b5935416f3059dd4df761a3467c06a8b89e498d..cd597e6d05de3511250d427f771a110fc0e6a2b8 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,6 @@ - org.springframework.cloud spring-cloud-dependencies @@ -49,13 +48,11 @@ pom import - org.springdoc springdoc-openapi-ui ${springdoc.openapi.ui.version} - mysql mysql-connector-java @@ -72,7 +69,6 @@ druid ${druid.version} - org.projectlombok lombok @@ -84,19 +80,16 @@ commons-collections4 4.4 - com.google.guava guava ${guava.version} - net.openhft zero-allocation-hashing ${zero.allocation.hashing.version} - com.alibaba fastjson