diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java index 367c7a3e2a41a70ec24281651d7990ebcf6d265c..7c42f691ba761caebd84f97369960c42a333f50b 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java @@ -169,6 +169,7 @@ public class KafkaConsumerHandler { public void closeConsumer() { if (kafkaConsumer != null) { kafkaConsumer.unsubscribe(); + kafkaConsumer.close(); } } } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/FileUtils.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/FileUtils.java index fe30ecb0c97c5ff6089ae78200f44213654a71d4..417607fc0cf231b0e286b4792a9a758519294cbc 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/FileUtils.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/util/FileUtils.java @@ -40,6 +40,7 @@ import java.util.stream.Collectors; */ public class FileUtils { private static final Logger log = LogUtils.getLogger(); + /** * Creates a directory by creating all nonexistent parent directories first. * @@ -63,7 +64,7 @@ public class FileUtils { * @param filename filename * @param content content */ - public static void writeAppendFile(String filename, List content) { + public static synchronized void writeAppendFile(String filename, List content) { try { Files.write(Paths.get(filename), content, StandardOpenOption.APPEND, StandardOpenOption.CREATE); } catch (IOException e) { @@ -77,7 +78,7 @@ public class FileUtils { * @param filename filename * @param content content */ - public static void writeAppendFile(String filename, Set content) { + public static synchronized void writeAppendFile(String filename, Set content) { try { Files.write(Paths.get(filename), content, StandardOpenOption.APPEND, StandardOpenOption.CREATE); } catch (IOException e) { @@ -91,7 +92,7 @@ public class FileUtils { * @param filename filename * @param content content */ - public static void writeAppendFile(String filename, String content) { + public static synchronized void writeAppendFile(String filename, String content) { try { Files.write(Paths.get(filename), content.getBytes(StandardCharsets.UTF_8), StandardOpenOption.APPEND, StandardOpenOption.CREATE); @@ -100,7 +101,13 @@ public class FileUtils { } } - public static void writeFile(String filename, String content) { + /** + * Write lines of text to a file. Characters are encoded into bytes using the UTF-8 charset. + * + * @param filename filename + * @param content content + */ + public static synchronized void writeFile(String filename, String content) { try { Files.write(Paths.get(filename), content.getBytes(StandardCharsets.UTF_8), StandardOpenOption.CREATE); } catch (IOException e) { @@ -118,7 +125,8 @@ public class FileUtils { try { final Path pathDirectory = Path.of(fileDirectory); if (Files.isDirectory(pathDirectory)) { - return Files.list(pathDirectory).collect(Collectors.toList()); + return Files.list(pathDirectory) + .collect(Collectors.toList()); } else { throw new NotDirectoryException(fileDirectory); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java index d3cd6a559b0f51c8845f97322887906c4e033742..d8b86048b11f9a9f535a69ce1e57d50c3f6494c9 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java @@ -94,9 +94,14 @@ public class BaseDataService { */ public List bdsQueryTableMetadataList() { List metadataList = dataAccessService.dasQueryTableMetadataList(); - List tableList = bdsQueryTableNameList(); return metadataList.stream() - .filter(meta -> tableList.contains(meta.getTableName())) + .filter(meta -> { + boolean isChecking = ruleAdapterService.filterTableByRule(meta.getTableName()); + if (isChecking) { + tableNameList.add(meta.getTableName()); + } + return isChecking; + }) .collect(Collectors.toList()); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java index aadec44751d3063d385434d0a542597280a06ae4..c67057a513daad58094516533e92251b57a85e5f 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java @@ -18,19 +18,24 @@ package org.opengauss.datachecker.extract.data.access; import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.entry.check.Difference; import org.opengauss.datachecker.common.entry.enums.DataBaseMeta; +import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.extract.config.ExtractProperties; -import org.opengauss.datachecker.extract.task.ResultSetHandlerFactory; -import org.opengauss.datachecker.extract.task.ResultSetHashHandler; +import org.opengauss.datachecker.extract.resource.ConnectionMgr; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import javax.annotation.Resource; import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.time.Duration; import java.time.LocalDateTime; +import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -43,6 +48,13 @@ import java.util.Map; */ public abstract class AbstractDataAccessService implements DataAccessService { protected static final Logger log = LogUtils.getLogger(); + + private static final String RS_COL_SCHEMA = "tableSchema"; + private static final String RS_COL_TABLE_NAME = "tableName"; + private static final String RS_COL_TABLE_ROWS = "tableRows"; + private static final String RS_COL_COLUMN_NAME = "columnName"; + private static final String RS_COL_AVG_ROW_LENGTH = "avgRowLength"; + protected boolean isOgCompatibilityB = false; @Resource protected JdbcTemplate jdbcTemplate; @@ -60,6 +72,96 @@ public abstract class AbstractDataAccessService implements DataAccessService { return jdbcTemplate.getDataSource(); } + /** + * adasQueryTableNameList + * + * @param executeQueryStatement executeQueryStatement + * @return table list + */ + public List adasQueryTableNameList(String executeQueryStatement) { + final LocalDateTime start = LocalDateTime.now(); + Connection connection = ConnectionMgr.getConnection(); + List list = new LinkedList<>(); + try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement); + ResultSet resultSet = ps.executeQuery()) { + while (resultSet.next()) { + list.add(resultSet.getString(RS_COL_TABLE_NAME)); + } + } catch (SQLException esql) { + log.error("", esql); + } finally { + ConnectionMgr.close(connection, null, null); + } + long betweenToMillis = durationBetweenToMillis(start, LocalDateTime.now()); + log.info("adasQueryTableNameList cost [{}ms]", betweenToMillis); + return list; + } + + /** + * adasQueryTablePrimaryColumns + * + * @param executeQueryStatement executeQueryStatement + * @return PrimaryColumnBean list + */ + public List adasQueryTablePrimaryColumns(String executeQueryStatement) { + final LocalDateTime start = LocalDateTime.now(); + Connection connection = ConnectionMgr.getConnection(); + List list = new LinkedList<>(); + try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement); + ResultSet resultSet = ps.executeQuery()) { + PrimaryColumnBean metadata; + while (resultSet.next()) { + metadata = new PrimaryColumnBean(); + metadata.setColumnName(resultSet.getString(RS_COL_COLUMN_NAME)); + metadata.setTableName(resultSet.getString(RS_COL_TABLE_NAME)); + list.add(metadata); + } + } catch (SQLException esql) { + log.error("", esql); + } finally { + ConnectionMgr.close(connection, null, null); + } + long betweenToMillis = durationBetweenToMillis(start, LocalDateTime.now()); + log.info("adasQueryTablePrimaryColumns cost [{}ms]", betweenToMillis); + return list; + } + + /** + * adasQueryTableMetadataList + * + * @param executeQueryStatement executeQueryStatement + * @return TableMetadata list + */ + public List adasQueryTableMetadataList(String executeQueryStatement) { + final LocalDateTime start = LocalDateTime.now(); + Connection connection = ConnectionMgr.getConnection(); + List list = new LinkedList<>(); + try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement); + ResultSet resultSet = ps.executeQuery()) { + TableMetadata metadata; + while (resultSet.next()) { + metadata = new TableMetadata(); + metadata.setSchema(resultSet.getString(RS_COL_SCHEMA)); + metadata.setTableName(resultSet.getString(RS_COL_TABLE_NAME)); + metadata.setTableRows(resultSet.getLong(RS_COL_TABLE_ROWS)); + metadata.setAvgRowLength(resultSet.getLong(RS_COL_AVG_ROW_LENGTH)); + list.add(metadata); + } + } catch (SQLException esql) { + log.error("", esql); + } finally { + ConnectionMgr.close(connection, null, null); + } + long betweenToMillis = durationBetweenToMillis(start, LocalDateTime.now()); + log.info("dasQueryTableMetadataList cost [{}ms]", betweenToMillis); + return list; + } + + private long durationBetweenToMillis(LocalDateTime start, LocalDateTime end) { + return Duration.between(start, end) + .toMillis(); + } + /** * wrapper table metadata of endpoint and databaseType * @@ -81,7 +183,7 @@ public abstract class AbstractDataAccessService implements DataAccessService { * @param table table * @param fileName fileName * @param differenceList differenceList - * @return + * @return result */ @Override public List> query(String table, String fileName, List differenceList) { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java index 82ea3cb036da3c6009e31ea1552e8531d5bef029..64347bea13ac811fd285da1a32a606c2fd12cae6 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java @@ -54,7 +54,10 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public List dasQueryTableNameList() { - return mysqlMetaDataMapper.queryTableNameList(properties.getSchema()); + String schema = properties.getSchema(); + String sql = + "SELECT info.table_name tableName FROM information_schema.tables info WHERE table_schema='" + schema + "'"; + return adasQueryTableNameList(sql); } @Override @@ -69,7 +72,9 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public List queryTablePrimaryColumns() { - return mysqlMetaDataMapper.queryTablePrimaryColumns(properties.getSchema()); + String sql = "select table_name tableName ,lower(column_name) columnName from information_schema.columns " + + "where table_schema='" + properties.getSchema() + "' and column_key='PRI' order by ordinal_position asc "; + return adasQueryTablePrimaryColumns(sql); } @Override @@ -79,7 +84,10 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public List dasQueryTableMetadataList() { - return wrapperTableMetadata(mysqlMetaDataMapper.queryTableMetadataList(properties.getSchema())); + String sql = " SELECT info.TABLE_SCHEMA tableSchema,info.table_name tableName,info.table_rows tableRows , " + + "info.avg_row_length avgRowLength FROM information_schema.tables info WHERE TABLE_SCHEMA='" + + properties.getSchema() + "'"; + return wrapperTableMetadata(adasQueryTableMetadataList(sql)); } @Override diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java index 3aad85ca137c13dba1808a04e4199db7460202eb..2e57aba5422875ca0ace3a12092466769619a33b 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java @@ -62,17 +62,32 @@ public class OpgsDataAccessService extends AbstractDataAccessService { @Override public List dasQueryTableNameList() { - return opgsMetaDataMapper.queryTableNameList(properties.getSchema()); + String schema = properties.getSchema(); + String sql = "select c.relname tableName from pg_class c LEFT JOIN pg_namespace n on n.oid = c.relnamespace " + + " where n.nspname='" + schema + "' and c.relkind ='r';"; + return adasQueryTableNameList(sql); } @Override public List queryTablePrimaryColumns() { - return opgsMetaDataMapper.queryTablePrimaryColumns(properties.getSchema()); + String schema = properties.getSchema(); + String sql = "select c.relname tableName,ns.nspname,ns.oid,a.attname columnName from pg_class c " + + "left join pg_namespace ns on c.relnamespace=ns.oid " + + "left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped " + + "inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey) " + + "where ns.nspname='" + schema + "' and cs.contype='p';"; + return adasQueryTablePrimaryColumns(sql); } @Override public List queryTablePrimaryColumns(String tableName) { - return opgsMetaDataMapper.queryTablePrimaryColumnsByTableName(properties.getSchema(), tableName); + String schema = properties.getSchema(); + String sql = "select c.relname tableName,ns.nspname,ns.oid,a.attname columnName from pg_class c " + + "left join pg_namespace ns on c.relnamespace=ns.oid " + + "left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped " + + "inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey) " + + "where ns.nspname='" + schema + "' and c.relname='" + tableName + "' and cs.contype='p';"; + return adasQueryTablePrimaryColumns(sql); } @Override @@ -87,7 +102,11 @@ public class OpgsDataAccessService extends AbstractDataAccessService { @Override public List dasQueryTableMetadataList() { - return wrapperTableMetadata(opgsMetaDataMapper.queryTableMetadataList(properties.getSchema())); + String sql = " select n.nspname tableSchema, c.relname tableName,c.reltuples tableRows, " + + "case when c.reltuples>0 then pg_table_size(c.oid)/c.reltuples else 0 end as avgRowLength " + + "from pg_class c LEFT JOIN pg_namespace n on n.oid = c.relnamespace " + "where n.nspname='" + + properties.getSchema() + "' and c.relkind ='r';"; + return wrapperTableMetadata(adasQueryTableMetadataList(sql)); } @Override diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java index 7cf497ab79bf95a6a00d857fcfdb63003d0c59ba..54b185b7a78b60210bef5e08348489fc37660a75 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java @@ -54,7 +54,9 @@ public class OracleDataAccessService extends AbstractDataAccessService { @Override public List dasQueryTableNameList() { - return oracleMetaDataMapper.queryTableNameList(properties.getSchema()); + String schema = properties.getSchema(); + String sql = "SELECT TABLE_NAME tableName FROM ALL_TABLES WHERE OWNER = '" + schema + "'"; + return adasQueryTableNameList(sql); } @Override @@ -69,17 +71,24 @@ public class OracleDataAccessService extends AbstractDataAccessService { @Override public List queryTablePrimaryColumns() { - return oracleMetaDataMapper.queryTablePrimaryColumns(properties.getSchema()); + String sql = "SELECT A.TABLE_NAME tableName, A.COLUMN_NAME columnName FROM ALL_CONS_COLUMNS A,ALL_CONSTRAINTS B" + + " WHERE A.constraint_name = B.constraint_name AND B.constraint_type = 'P' AND A.OWNER = '" + + properties.getSchema() + "'"; + return adasQueryTablePrimaryColumns(sql); } @Override public List queryTablePrimaryColumns(String tableName) { - return oracleMetaDataMapper.queryTablePrimaryColumnsByTableName(properties.getSchema(),tableName); + return oracleMetaDataMapper.queryTablePrimaryColumnsByTableName(properties.getSchema(), tableName); } @Override public List dasQueryTableMetadataList() { - return wrapperTableMetadata(oracleMetaDataMapper.queryTableMetadataList(properties.getSchema())); + String schema = properties.getSchema(); + String sql = "SELECT t.owner tableSchema,t.table_name tableName,t.num_rows tableRows,avg_row_len avgRowLength" + + " FROM ALL_TABLES t LEFT JOIN (SELECT DISTINCT table_name from ALL_CONSTRAINTS where OWNER = '" + schema + + "' AND constraint_type='P') pc on t.table_name=pc.table_name WHERE t.OWNER = '" + schema + "'"; + return wrapperTableMetadata(adasQueryTableMetadataList(sql)); } @Override diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/ConnectionMgr.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/ConnectionMgr.java index cd46f3834430bf894b84e0b524bca4e61f184dbc..6a2d20114d552b64d0251e3a9805aee9394544a2 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/ConnectionMgr.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/resource/ConnectionMgr.java @@ -54,7 +54,7 @@ public class ConnectionMgr { username = getPropertyValue(ConfigConstants.DS_USER_NAME); databasePassport = getPropertyValue(ConfigConstants.DS_PASSWORD); try { - log.info("connection class loader ,[{}],[{}]", driverClassName, url); + log.debug("connection class loader ,[{}],[{}]", driverClassName, url); Class.forName(driverClassName); isFirstLoad.set(false); } catch (ClassNotFoundException e) { @@ -65,7 +65,7 @@ public class ConnectionMgr { try { conn = DriverManager.getConnection(url, username, databasePassport); conn.setAutoCommit(false); - log.debug("Connection succeed!"); + log.info("Connection succeed!"); } catch (SQLException exp) { log.error("create connection [{},{}]:[{}]", username, databasePassport, url, exp); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java index 053addd23a2ad08fe140e0056d694758a377e5dc..294252660add1eec851517d3a32a26528dcf50d8 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java @@ -457,6 +457,7 @@ public class DataExtractServiceImpl implements DataExtractService { KafkaConsumer consumer = kafkaConsumerConfig.createConsumer(IdGenerator.nextId36()); checkPointManager = new ExtractPointSwapManager(kafkaTemplate, consumer); checkPointManager.setCheckPointSwapTopicName(ConfigCache.getValue(ConfigConstants.PROCESS_NO)); + log.info("tableRegisterCheckPoint start pollSwapPoint thread"); checkPointManager.pollSwapPoint(tableCheckPointCache); taskList.forEach(this::registerCheckPoint); while (tableCheckPointCache.tableCount() != taskList.size()) { diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/RuleAdapterService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/RuleAdapterService.java index 84203101d13b51432b4a36578c5dd1c229f1f9ea..72b6c7e52d6e2f7b677dbddb72481cde4faac50a 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/RuleAdapterService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/RuleAdapterService.java @@ -72,6 +72,16 @@ public class RuleAdapterService { return tableRuleAdapterService.executeTableRule(rules, tableList); } + /** + * filterTableByRule + * + * @param tableName tableName + * @return filter result + */ + public boolean filterTableByRule(String tableName) { + return tableRuleAdapterService.executeTableRule(RULES.get(RuleType.TABLE), tableName); + } + /** * Execute column-level rules * diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/TableRuleAdapterService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/TableRuleAdapterService.java index deedd8c3bd53d97dbd22dc9c3c49ceea49f42bd6..768ebe6b3274a767e3d767785128f12d6867abc6 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/TableRuleAdapterService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/TableRuleAdapterService.java @@ -40,12 +40,21 @@ public class TableRuleAdapterService { private static final Map EXECUTORS = new HashMap<>(); static { - EXECUTORS - .put(WHITE, (patterns, table) -> patterns.stream().anyMatch(pattern -> pattern.matcher(table).matches())); - EXECUTORS - .put(BLACK, (patterns, table) -> patterns.stream().noneMatch(pattern -> pattern.matcher(table).matches())); + EXECUTORS.put(WHITE, (patterns, table) -> patterns.stream() + .anyMatch(pattern -> pattern.matcher(table) + .matches())); + EXECUTORS.put(BLACK, (patterns, table) -> patterns.stream() + .noneMatch(pattern -> pattern.matcher(table) + .matches())); } + /** + * executeTableRule + * + * @param rules rules + * @param tableList tableList + * @return filter list + */ public List executeTableRule(List rules, List tableList) { if (CollectionUtils.isEmpty(rules)) { return tableList; @@ -53,12 +62,32 @@ public class TableRuleAdapterService { final Rule ruleOne = rules.get(0); final TableRuleExecutor tableRuleExecutor = EXECUTORS.get(ruleOne.getName()); final List patterns = buildRulePatterns(rules); - return tableList.parallelStream().filter(table -> tableRuleExecutor.apply(patterns, table)) + return tableList.parallelStream() + .filter(table -> tableRuleExecutor.apply(patterns, table)) .collect(Collectors.toList()); } + /** + * executeTableRule + * + * @param rules rules + * @param table table + * @return filter result + */ + public boolean executeTableRule(List rules, String table) { + if (CollectionUtils.isEmpty(rules)) { + return true; + } + final Rule ruleOne = rules.get(0); + final TableRuleExecutor tableRuleExecutor = EXECUTORS.get(ruleOne.getName()); + final List patterns = buildRulePatterns(rules); + return tableRuleExecutor.apply(patterns, table); + } + private List buildRulePatterns(List rules) { - return rules.stream().map(rule -> Pattern.compile(rule.getText())).collect(Collectors.toList()); + return rules.stream() + .map(rule -> Pattern.compile(rule.getText())) + .collect(Collectors.toList()); } @FunctionalInterface diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java index 530148a6b58bf9f0ebba994eaedc0dc0799d5699..56c982a106f8cf2e5d350b712d25308094983b37 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/ExtractPointSwapManager.java @@ -69,6 +69,7 @@ public class ExtractPointSwapManager { trySubscribe(); ConsumerRecords records; AtomicInteger deliveredCount = new AtomicInteger(); + log.info("pollSwapPoint thread started"); while (!isCompletedSwapTablePoint) { try { records = kafkaConsumer.poll(Duration.ofSeconds(1)); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceKafkaAgents.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceKafkaAgents.java index e0007c03fb847b16586a38ce15c95a697a113c68..b9f6424bc29d0e836eef6a193490a876cedbb0da 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceKafkaAgents.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/common/SliceKafkaAgents.java @@ -87,7 +87,6 @@ public class SliceKafkaAgents { public void agentsClosed() { kafkaConsumer.unsubscribe(); kafkaConsumer.close(); - kafkaTemplate.destroy(); } /** diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java index 0577dbe949bec5eddb59335f917cb7ad97005bb3..fe90cd8a1cfea16581253fef0810f2d6d3dbd895 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/slice/process/JdbcSliceProcessor.java @@ -19,6 +19,7 @@ import org.opengauss.datachecker.common.entry.extract.SliceExtend; import org.opengauss.datachecker.common.entry.extract.SliceVo; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.exception.ExtractDataAccessException; +import org.opengauss.datachecker.extract.resource.ConnectionMgr; import org.opengauss.datachecker.extract.resource.JdbcDataOperations; import org.opengauss.datachecker.extract.slice.SliceProcessorContext; import org.opengauss.datachecker.extract.slice.common.SliceResultSetSender; @@ -95,32 +96,36 @@ public class JdbcSliceProcessor extends AbstractSliceProcessor { private void executeQueryStatement(QuerySqlEntry sqlEntry, TableMetadata tableMetadata, SliceExtend sliceExtend) { final LocalDateTime start = LocalDateTime.now(); - Connection connection = null; long jdbcQueryCost = 0; long sendDataCost = 0; long sliceAllCost = 0; - + SliceResultSetSender sliceSender = null; + Connection connection = null; + PreparedStatement ps = null; + ResultSet resultSet = null; try { long estimatedRowCount = slice.isSlice() ? slice.getFetchSize() : tableMetadata.getTableRows(); long estimatedMemorySize = estimatedMemorySize(tableMetadata.getAvgRowLength(), estimatedRowCount); connection = jdbcOperation.tryConnectionAndClosedAutoCommit(estimatedMemorySize); log.debug("slice [{}] fetch jdbc connection.", slice.getName()); - SliceResultSetSender sliceSender = createSliceResultSetSender(tableMetadata); + sliceSender = createSliceResultSetSender(tableMetadata); sliceExtend.setStartOffset(sliceSender.checkOffsetEnd()); - - try (PreparedStatement ps = connection.prepareStatement(sqlEntry.getSql()); - ResultSet resultSet = ps.executeQuery()) { - log.debug("slice [{}] jdbc execute query complete.", slice.getName()); - LocalDateTime jdbcQuery = LocalDateTime.now(); - jdbcQueryCost = durationBetweenToMillis(start, jdbcQuery); - resultSet.setFetchSize(FETCH_SIZE); - List offsetList = sliceQueryResultAndSendSync(sliceSender, resultSet); - updateExtendSliceOffsetAndCount(sliceExtend, rowCount.get(), offsetList); - sendDataCost = durationBetweenToMillis(jdbcQuery, LocalDateTime.now()); - } - } catch (SQLException ex) { - throw new ExtractDataAccessException(ex); + ps = connection.prepareStatement(sqlEntry.getSql()); + resultSet = ps.executeQuery(); + log.debug("slice [{}] jdbc execute query complete.", slice.getName()); + LocalDateTime jdbcQuery = LocalDateTime.now(); + jdbcQueryCost = durationBetweenToMillis(start, jdbcQuery); + resultSet.setFetchSize(FETCH_SIZE); + List offsetList = sliceQueryResultAndSendSync(sliceSender, resultSet); + updateExtendSliceOffsetAndCount(sliceExtend, rowCount.get(), offsetList); + sendDataCost = durationBetweenToMillis(jdbcQuery, LocalDateTime.now()); + } catch (Exception ex) { + throw new ExtractDataAccessException(ex.getMessage()); } finally { + ConnectionMgr.close(connection, ps, resultSet); + if (sliceSender != null) { + sliceSender.agentsClosed(); + } jdbcOperation.releaseConnection(connection); log.debug("slice [{}] release jdbc connection.", slice.getName()); sliceAllCost = durationBetweenToMillis(start, LocalDateTime.now()); diff --git a/datachecker-extract/src/main/resources/mapper/OpgsMetaDataMapper.xml b/datachecker-extract/src/main/resources/mapper/OpgsMetaDataMapper.xml index e2cc1d3545b0684f11d00b184ccc3dbaa1082892..c6988d368d0e0951791b8d9dec6b7da49613c78a 100644 --- a/datachecker-extract/src/main/resources/mapper/OpgsMetaDataMapper.xml +++ b/datachecker-extract/src/main/resources/mapper/OpgsMetaDataMapper.xml @@ -56,10 +56,12 @@