From 31889eee4418d102fc576786f9b6a44e735cd7ca Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Thu, 5 Sep 2024 15:09:33 +0800 Subject: [PATCH] =?UTF-8?q?lower=5Fcase=5Ftable=5Fnames=E4=B8=BA0=EF=BC=8C?= =?UTF-8?q?=E8=A1=A8=E5=90=8D=E7=A7=B0=E5=A4=A7=E5=B0=8F=E5=86=99=E6=95=8F?= =?UTF-8?q?=E6=84=9F=EF=BC=8C=E6=A0=A1=E9=AA=8C=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../check/load/CheckDatabaseLoader.java | 20 ++++++ .../common/config/ConfigCache.java | 12 +++- .../common/constant/ConfigConstants.java | 5 ++ .../entry/enums/LowerCaseTableNames.java | 69 ++++++++++++++++++ .../common/entry/extract/Database.java | 2 + .../access/AbstractDataAccessService.java | 48 ++++++++++--- .../data/access/CsvDataAccessService.java | 46 ++++++------ .../data/access/DataAccessService.java | 12 ++++ .../data/access/MysqlDataAccessService.java | 47 +++++++++--- .../data/access/OpgsDataAccessService.java | 72 ++++++++++++++----- .../data/access/OracleDataAccessService.java | 22 ++++-- .../DataConsolidationServiceImpl.java | 4 ++ .../debezium/DebeziumConsumerListener.java | 22 +++++- .../extract/debezium/DebeziumWorker.java | 7 +- .../service/DataExtractServiceImpl.java | 2 +- .../datachecker/extract/task/CheckPoint.java | 6 -- .../task/sql/QueryStatementFactory.java | 9 --- 17 files changed, 321 insertions(+), 84 deletions(-) create mode 100644 datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/enums/LowerCaseTableNames.java diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckDatabaseLoader.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckDatabaseLoader.java index f3a0086..06b632b 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckDatabaseLoader.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/load/CheckDatabaseLoader.java @@ -19,13 +19,17 @@ import org.opengauss.datachecker.check.client.FeignClientService; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.constant.ConfigConstants; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames; +import org.opengauss.datachecker.common.entry.extract.Database; import org.opengauss.datachecker.common.entry.extract.ExtractConfig; import org.opengauss.datachecker.common.util.LogUtils; import org.opengauss.datachecker.common.util.ThreadUtil; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Service; +import org.springframework.util.Assert; import javax.annotation.Resource; +import java.util.Objects; /** * CheckDatabaseLoader @@ -63,10 +67,26 @@ public class CheckDatabaseLoader extends AbstractCheckLoader { shutdown("sink endpoint server has error"); return; } + checkDatabaseLowerCaseTableNames(sourceConfig.getDatabase(), sinkConfig.getDatabase()); checkEnvironment.addExtractDatabase(Endpoint.SOURCE, sourceConfig.getDatabase()); checkEnvironment.addExtractDatabase(Endpoint.SINK, sinkConfig.getDatabase()); ConfigCache.put(ConfigConstants.DATA_CHECK_SOURCE_DATABASE, sourceConfig.getDatabase()); ConfigCache.put(ConfigConstants.DATA_CHECK_SINK_DATABASE, sinkConfig.getDatabase()); + ConfigCache.put(ConfigConstants.LOWER_CASE_TABLE_NAMES, sinkConfig.getDatabase().getLowercaseTableNames()); LogUtils.info(log, "check service load database configuration success."); } + + private void checkDatabaseLowerCaseTableNames(Database source, Database sink) { + Assert.notNull(source, "source database config can't be null"); + Assert.notNull(sink, "sink database config can't be null"); + Assert.notNull(source.getLowercaseTableNames(), "source database lower_case_table_name fetch error"); + Assert.notNull(sink.getLowercaseTableNames(), "sink database lower_case_table_name fetch error"); + Assert.isTrue(!Objects.equals(source.getLowercaseTableNames(), LowerCaseTableNames.UNKNOWN), + "sink database lower_case_table_name fetch unknown"); + Assert.isTrue(!Objects.equals(sink.getLowercaseTableNames(), LowerCaseTableNames.UNKNOWN), + "sink database lower_case_table_name fetch unknown"); + Assert.isTrue(Objects.equals(source.getLowercaseTableNames(), sink.getLowercaseTableNames()), + "source and sink database lower_case_table_name must be the same"); + LogUtils.info(log, "check database lower_case_table_name is {}", source.getLowercaseTableNames()); + } } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/config/ConfigCache.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/config/ConfigCache.java index 2a7a543..cc1d695 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/config/ConfigCache.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/config/ConfigCache.java @@ -244,6 +244,16 @@ public class ConfigCache { * @return boolean */ public static boolean hasCompatibility() { - return CACHE.containsKey(ConfigConstants.OG_COMPATIBILITY_B); + return hasKey(ConfigConstants.OG_COMPATIBILITY_B); + } + + /** + * check current cache has key + * + * @param key key + * @return boolean + */ + public static boolean hasKey(String key) { + return CACHE.containsKey(key); } } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java index f7e1872..2dd0325 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/constant/ConfigConstants.java @@ -307,4 +307,9 @@ public interface ConfigConstants { * data.check.auto-delete-topic */ String AUTO_DELETE_TOPIC = "data.check.auto-delete-topic"; + + /** + * lower_case_table_names + */ + String LOWER_CASE_TABLE_NAMES = "lower_case_table_names"; } diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/enums/LowerCaseTableNames.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/enums/LowerCaseTableNames.java new file mode 100644 index 0000000..abd29db --- /dev/null +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/enums/LowerCaseTableNames.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2024-2024 Huawei Technologies Co.,Ltd. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.common.entry.enums; + +import lombok.Getter; + +import java.util.Arrays; + +/** + * LowerCaseTableNames + * + * @author :wangchao + * @date :Created in 2022/5/29 + * @since :11 + */ +@Getter +public enum LowerCaseTableNames implements IEnum { + /** + * 表名大小写敏感,区分大小写 + */ + SENSITIVE("0", "lower_case_table_names=0"), + /** + * 表名大小写不敏感,不区分大小写 + */ + INSENSITIVE("1", "lower_case_table_names=1"), + + /** + * 未知 + */ + UNKNOWN("unknown", "lower_case_table_names=unknown"); + + private final String code; + private final String description; + + /** + * constructor + * + * @param code code + * @param description description + */ + LowerCaseTableNames(String code, String description) { + this.code = code; + this.description = description; + } + + /** + * get enum by code + * + * @param code code + * @return enum + */ + public static LowerCaseTableNames codeOf(String code) { + return Arrays.stream(LowerCaseTableNames.values()) + .filter(key -> key.code.equals(code)).findFirst().orElse(null); + } +} diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/Database.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/Database.java index 5be5f58..598da14 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/Database.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/Database.java @@ -19,6 +19,7 @@ import lombok.Data; import lombok.experimental.Accessors; import org.opengauss.datachecker.common.entry.enums.DataBaseType; import org.opengauss.datachecker.common.entry.enums.Endpoint; +import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames; import java.util.Objects; @@ -38,6 +39,7 @@ public class Database { String schema; DataBaseType databaseType; Endpoint endpoint; + LowerCaseTableNames lowercaseTableNames; /** * get schema diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java index 97bd886..a83d498 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java @@ -18,8 +18,11 @@ package org.opengauss.datachecker.extract.data.access; import com.alibaba.druid.pool.DruidDataSource; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.Logger; +import org.opengauss.datachecker.common.config.ConfigCache; +import org.opengauss.datachecker.common.constant.ConfigConstants; import org.opengauss.datachecker.common.entry.check.Difference; import org.opengauss.datachecker.common.entry.common.Health; +import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames; import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.common.exception.ExtractDataAccessException; @@ -68,7 +71,12 @@ public abstract class AbstractDataAccessService implements DataAccessService { @Resource protected ExtractProperties properties; - private Connection getConnection() { + /** + * 获取数据库连接 + * + * @return connection + */ + protected Connection getConnection() { try { return druidDataSource.getConnection(); } catch (SQLException e) { @@ -76,7 +84,12 @@ public abstract class AbstractDataAccessService implements DataAccessService { } } - private void closeConnection(Connection connection) { + /** + * 关闭数据库连接 + * + * @param connection connection + */ + protected void closeConnection(Connection connection) { ConnectionMgr.close(connection); } @@ -100,7 +113,7 @@ public abstract class AbstractDataAccessService implements DataAccessService { public String adasQuerySchema(Connection connection, String executeQueryStatement) { String schema = ""; try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement); - ResultSet resultSet = ps.executeQuery()) { + ResultSet resultSet = ps.executeQuery()) { if (resultSet.next()) { schema = resultSet.getString(RS_COL_SCHEMA); } @@ -147,7 +160,7 @@ public abstract class AbstractDataAccessService implements DataAccessService { Connection connection = getConnection(); List list = new LinkedList<>(); try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement); - ResultSet resultSet = ps.executeQuery()) { + ResultSet resultSet = ps.executeQuery()) { while (resultSet.next()) { list.add(resultSet.getString(RS_COL_TABLE_NAME)); } @@ -172,7 +185,7 @@ public abstract class AbstractDataAccessService implements DataAccessService { Connection connection = getConnection(); List list = new LinkedList<>(); try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement); - ResultSet resultSet = ps.executeQuery()) { + ResultSet resultSet = ps.executeQuery()) { PrimaryColumnBean metadata; while (resultSet.next()) { metadata = new PrimaryColumnBean(); @@ -201,7 +214,7 @@ public abstract class AbstractDataAccessService implements DataAccessService { Connection connection = getConnection(); List list = new LinkedList<>(); try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement); - ResultSet resultSet = ps.executeQuery()) { + ResultSet resultSet = ps.executeQuery()) { TableMetadata metadata; while (resultSet.next()) { metadata = new TableMetadata(); @@ -265,7 +278,7 @@ public abstract class AbstractDataAccessService implements DataAccessService { private long durationBetweenToMillis(LocalDateTime start, LocalDateTime end) { return Duration.between(start, end) - .toMillis(); + .toMillis(); } /** @@ -279,8 +292,8 @@ public abstract class AbstractDataAccessService implements DataAccessService { return null; } return tableMetadata.setDataBaseType(properties.getDatabaseType()) - .setEndpoint(properties.getEndpoint()) - .setOgCompatibilityB(isOgCompatibilityB); + .setEndpoint(properties.getEndpoint()) + .setOgCompatibilityB(isOgCompatibilityB); } /** @@ -304,8 +317,21 @@ public abstract class AbstractDataAccessService implements DataAccessService { */ protected List wrapperTableMetadata(List list) { list.forEach(meta -> meta.setDataBaseType(properties.getDatabaseType()) - .setEndpoint(properties.getEndpoint()) - .setOgCompatibilityB(isOgCompatibilityB)); + .setEndpoint(properties.getEndpoint()) + .setOgCompatibilityB(isOgCompatibilityB)); return list; } + + /** + * get lowerCaseTableNames + * + * @return lowerCaseTableNames + */ + protected LowerCaseTableNames getLowerCaseTableNames() { + if (!ConfigCache.hasKey(ConfigConstants.LOWER_CASE_TABLE_NAMES)) { + LowerCaseTableNames lowerCaseTableNames = queryLowerCaseTableNames(); + ConfigCache.put(ConfigConstants.LOWER_CASE_TABLE_NAMES, lowerCaseTableNames); + } + return ConfigCache.getValue(ConfigConstants.LOWER_CASE_TABLE_NAMES, LowerCaseTableNames.class); + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java index 5698f4c..f8eb60a 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java @@ -27,6 +27,7 @@ import org.opengauss.datachecker.common.entry.common.Health; import org.opengauss.datachecker.common.entry.csv.CsvTableColumnMeta; import org.opengauss.datachecker.common.entry.csv.CsvTableMeta; import org.opengauss.datachecker.common.entry.enums.ColumnKey; +import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean; import org.opengauss.datachecker.common.entry.extract.TableMetadata; @@ -87,10 +88,10 @@ public class CsvDataAccessService implements DataAccessService { } Stream lineOfTables = Files.lines(pathOfTables); return lineOfTables.parallel() - .map(tableJson -> JSONObject.parseObject(tableJson, CsvTableMeta.class)) - .filter(CsvTableMeta::isContain_primary_key) - .map(CsvTableMeta::getTable) - .collect(Collectors.toList()); + .map(tableJson -> JSONObject.parseObject(tableJson, CsvTableMeta.class)) + .filter(CsvTableMeta::isContain_primary_key) + .map(CsvTableMeta::getTable) + .collect(Collectors.toList()); } catch (IOException e) { log.error("load table name of csv exception : ", e); throw new ExtractDataAccessException("load table name of csv exception"); @@ -123,17 +124,17 @@ public class CsvDataAccessService implements DataAccessService { columns.add(csvColumnMeta.toColumnsMetaData()); }); columns.stream() - .sorted() - .collect(Collectors.groupingBy(ColumnsMetaData::getTableName)) - .forEach((table, tableColumns) -> { - TableMetadata tableMetadata = tableMetadataMap.get(table); - tableMetadata.setColumnsMetas(tableColumns); - tableMetadata.setPrimaryMetas(tableColumns.stream() - .filter(col -> Objects.equals(col.getColumnKey(), - ColumnKey.PRI)) - .sorted() - .collect(Collectors.toList())); - }); + .sorted() + .collect(Collectors.groupingBy(ColumnsMetaData::getTableName)) + .forEach((table, tableColumns) -> { + TableMetadata tableMetadata = tableMetadataMap.get(table); + tableMetadata.setColumnsMetas(tableColumns); + tableMetadata.setPrimaryMetas(tableColumns.stream() + .filter(col -> Objects.equals(col.getColumnKey(), + ColumnKey.PRI)) + .sorted() + .collect(Collectors.toList())); + }); } catch (IOException e) { log.error("load table name of csv exception : ", e); throw new ExtractDataAccessException("load table name of csv exception"); @@ -197,12 +198,12 @@ public class CsvDataAccessService implements DataAccessService { List> diffRowList = new LinkedList<>(); String csvDataRootPath = ConfigCache.getCsvData(); String sliceFilePath = Path.of(csvDataRootPath, fileName) - .toString(); + .toString(); TableMetadata metadata = tableMetadataMap.get(table); List keyIdxList = differenceList.stream() - .map(Difference::getIdx) - .sorted() - .collect(Collectors.toList()); + .map(Difference::getIdx) + .sorted() + .collect(Collectors.toList()); int fileReadIdx = 0; try (CSVReader reader = new CSVReader(new FileReader(sliceFilePath))) { String[] nextLine; @@ -223,7 +224,7 @@ public class CsvDataAccessService implements DataAccessService { Map result = new TreeMap<>(); for (int idx = 0; idx < nextLine.length && idx < columns.size(); idx++) { result.put(columns.get(idx) - .getColumnName(), nextLine[idx]); + .getColumnName(), nextLine[idx]); } return result; } @@ -263,4 +264,9 @@ public class CsvDataAccessService implements DataAccessService { public String next(DataAccessParam param) { return null; } + + @Override + public LowerCaseTableNames queryLowerCaseTableNames() { + return LowerCaseTableNames.INSENSITIVE; + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/DataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/DataAccessService.java index 8ff4583..5d2e84d 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/DataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/DataAccessService.java @@ -18,6 +18,7 @@ package org.opengauss.datachecker.extract.data.access; import org.opengauss.datachecker.common.entry.check.Difference; import org.opengauss.datachecker.common.entry.common.DataAccessParam; import org.opengauss.datachecker.common.entry.common.Health; +import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean; import org.opengauss.datachecker.common.entry.extract.TableMetadata; @@ -175,4 +176,15 @@ public interface DataAccessService { * @return boolean */ boolean dasCheckDatabaseNotEmpty(); + + /** + *
+     * query database variables lower_case_table_names
+     *  lower_case_table_names=0  : SENSITIVE    : 表名区分大小写
+     *  lower_case_table_names=1  : INSENSITIVE  : 表名不区分大小写
+     * 
+ * + * @return value + */ + LowerCaseTableNames queryLowerCaseTableNames(); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java index d22f8c1..40319a2 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java @@ -17,13 +17,18 @@ package org.opengauss.datachecker.extract.data.access; import org.opengauss.datachecker.common.entry.common.DataAccessParam; import org.opengauss.datachecker.common.entry.common.Health; +import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean; import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.extract.data.mapper.MysqlMetaDataMapper; import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.List; +import java.util.Objects; /** * MysqlDataAccessService @@ -47,8 +52,8 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public Health health() { String schema = properties.getSchema(); - String sql = "SELECT SCHEMA_NAME tableSchema FROM information_schema.SCHEMATA info WHERE SCHEMA_NAME='" + schema - + "' limit 1"; + String sql = "SELECT SCHEMA_NAME tableSchema FROM information_schema.SCHEMATA info WHERE SCHEMA_NAME='" + + schema + "' limit 1"; return health(schema, sql); } @@ -60,8 +65,8 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public List dasQueryTableNameList() { String schema = properties.getSchema(); - String sql = - "SELECT info.table_name tableName FROM information_schema.tables info WHERE table_schema='" + schema + "'"; + String sql = "SELECT info.table_name tableName FROM information_schema.tables info WHERE table_schema='" + + schema + "'"; return adasQueryTableNameList(sql); } @@ -78,7 +83,8 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public List queryTablePrimaryColumns() { String sql = "select table_name tableName ,lower(column_name) columnName from information_schema.columns " - + "where table_schema='" + properties.getSchema() + "' and column_key='PRI' order by ordinal_position asc "; + + "where table_schema='" + properties.getSchema() + + "' and column_key='PRI' order by ordinal_position asc "; return adasQueryTablePrimaryColumns(sql); } @@ -89,9 +95,13 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public List dasQueryTableMetadataList() { - String sql = " SELECT info.TABLE_SCHEMA tableSchema,info.table_name tableName,info.table_rows tableRows , " - + "info.avg_row_length avgRowLength FROM information_schema.tables info WHERE TABLE_SCHEMA='" - + properties.getSchema() + "'"; + LowerCaseTableNames lowerCaseTableNames = getLowerCaseTableNames(); + String colTableName = Objects.equals(LowerCaseTableNames.SENSITIVE, lowerCaseTableNames) + ? "info.table_name tableName" + : "lower(info.table_name) tableName"; + String sql = " SELECT info.TABLE_SCHEMA tableSchema," + colTableName + ",info.table_rows tableRows , " + + "info.avg_row_length avgRowLength FROM information_schema.tables info WHERE TABLE_SCHEMA='" + + properties.getSchema() + "'"; return wrapperTableMetadata(adasQueryTableMetadataList(sql)); } @@ -120,9 +130,9 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public List queryPointList(Connection connection, DataAccessParam param) { String sql = "select s.%s from (SELECT @rowno:=@rowno+1 as rn,r.%s from %s.%s r," - + " (select @rowno := 0) t ORDER BY r.%s asc) s where mod(s.rn, %s) = 1"; + + " (select @rowno := 0) t ORDER BY r.%s asc) s where mod(s.rn, %s) = 1"; sql = String.format(sql, param.getColName(), param.getColName(), param.getSchema(), param.getName(), - param.getColName(), param.getOffset()); + param.getColName(), param.getOffset()); return adasQueryPointList(connection, sql); } @@ -130,4 +140,21 @@ public class MysqlDataAccessService extends AbstractDataAccessService { public boolean dasCheckDatabaseNotEmpty() { return mysqlMetaDataMapper.checkDatabaseNotEmpty(properties.getSchema()); } + + @Override + public LowerCaseTableNames queryLowerCaseTableNames() { + String sql = "SHOW VARIABLES LIKE 'lower_case_table_names';"; + Connection connection = getConnection(); + try (PreparedStatement ps = connection.prepareStatement(sql); ResultSet resultSet = ps.executeQuery()) { + if (resultSet.next()) { + String value = resultSet.getString("value"); + return LowerCaseTableNames.codeOf(value); + } + } catch (SQLException ex) { + log.error("queryLowerCaseTableNames error", ex); + } finally { + closeConnection(connection); + } + return LowerCaseTableNames.UNKNOWN; + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java index 2b5f617..6b5dca7 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java @@ -19,6 +19,7 @@ import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.constant.ConfigConstants; import org.opengauss.datachecker.common.entry.common.DataAccessParam; import org.opengauss.datachecker.common.entry.common.Health; +import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames; import org.opengauss.datachecker.common.entry.enums.OgCompatibility; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean; @@ -27,7 +28,12 @@ import org.opengauss.datachecker.extract.data.mapper.OpgsMetaDataMapper; import javax.annotation.PostConstruct; import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -38,6 +44,9 @@ import java.util.Objects; * @since :11 */ public class OpgsDataAccessService extends AbstractDataAccessService { + private static final String LOWER_CASE_TABLE_NAMES = ConfigConstants.LOWER_CASE_TABLE_NAMES; + private static final String DOLPHIN_LOWER_CASE_TABLE_NAMES = "dolphin.lower_case_table_names"; + private OpgsMetaDataMapper opgsMetaDataMapper; public OpgsDataAccessService(OpgsMetaDataMapper opgsMetaDataMapper) { @@ -47,9 +56,11 @@ public class OpgsDataAccessService extends AbstractDataAccessService { @Override @PostConstruct public boolean isOgCompatibilityB() { - isOgCompatibilityB = Objects.equals(OgCompatibility.B, opgsMetaDataMapper.sqlCompatibility()); - ConfigCache.put(ConfigConstants.OG_COMPATIBILITY_B, isOgCompatibilityB); - return isOgCompatibilityB; + if (!ConfigCache.hasCompatibility()) { + isOgCompatibilityB = Objects.equals(OgCompatibility.B, opgsMetaDataMapper.sqlCompatibility()); + ConfigCache.put(ConfigConstants.OG_COMPATIBILITY_B, isOgCompatibilityB); + } + return ConfigCache.getBooleanValue(ConfigConstants.OG_COMPATIBILITY_B); } @Override @@ -68,7 +79,7 @@ public class OpgsDataAccessService extends AbstractDataAccessService { public List dasQueryTableNameList() { String schema = properties.getSchema(); String sql = "select c.relname tableName from pg_class c LEFT JOIN pg_namespace n on n.oid = c.relnamespace " - + " where n.nspname='" + schema + "' and c.relkind ='r';"; + + " where n.nspname='" + schema + "' and c.relkind ='r';"; return adasQueryTableNameList(sql); } @@ -76,10 +87,10 @@ public class OpgsDataAccessService extends AbstractDataAccessService { public List queryTablePrimaryColumns() { String schema = properties.getSchema(); String sql = "select c.relname tableName,ns.nspname,ns.oid,a.attname columnName from pg_class c " - + "left join pg_namespace ns on c.relnamespace=ns.oid " - + "left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped " - + "inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey) " - + "where ns.nspname='" + schema + "' and cs.contype='p';"; + + "left join pg_namespace ns on c.relnamespace=ns.oid " + + "left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped " + + "inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey) " + + "where ns.nspname='" + schema + "' and cs.contype='p';"; return adasQueryTablePrimaryColumns(sql); } @@ -87,10 +98,10 @@ public class OpgsDataAccessService extends AbstractDataAccessService { public List queryTablePrimaryColumns(String tableName) { String schema = properties.getSchema(); String sql = "select c.relname tableName,ns.nspname,ns.oid,a.attname columnName from pg_class c " - + "left join pg_namespace ns on c.relnamespace=ns.oid " - + "left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped " - + "inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey) " - + "where ns.nspname='" + schema + "' and c.relname='" + tableName + "' and cs.contype='p';"; + + "left join pg_namespace ns on c.relnamespace=ns.oid " + + "left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped " + + "inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey) " + + "where ns.nspname='" + schema + "' and c.relname='" + tableName + "' and cs.contype='p';"; return adasQueryTablePrimaryColumns(sql); } @@ -106,10 +117,14 @@ public class OpgsDataAccessService extends AbstractDataAccessService { @Override public List dasQueryTableMetadataList() { - String sql = " select n.nspname tableSchema, c.relname tableName,c.reltuples tableRows, " - + "case when c.reltuples>0 then pg_table_size(c.oid)/c.reltuples else 0 end as avgRowLength " - + "from pg_class c LEFT JOIN pg_namespace n on n.oid = c.relnamespace " + "where n.nspname='" - + properties.getSchema() + "' and c.relkind ='r';"; + LowerCaseTableNames lowerCaseTableNames = getLowerCaseTableNames(); + String colTableName = Objects.equals(LowerCaseTableNames.SENSITIVE, lowerCaseTableNames) + ? "c.relname tableName" + : "lower(c.relname) tableName"; + String sql = " select n.nspname tableSchema, " + colTableName + ",c.reltuples tableRows, " + + "case when c.reltuples>0 then pg_table_size(c.oid)/c.reltuples else 0 end as avgRowLength " + + "from pg_class c LEFT JOIN pg_namespace n on n.oid = c.relnamespace " + "where n.nspname='" + + properties.getSchema() + "' and c.relkind ='r';"; return wrapperTableMetadata(adasQueryTableMetadataList(sql)); } @@ -138,9 +153,9 @@ public class OpgsDataAccessService extends AbstractDataAccessService { @Override public List queryPointList(Connection connection, DataAccessParam param) { String sql = "select s.%s from ( select row_number() over(order by r.%s asc) as rn,r.%s from %s.%s r) s" - + " where mod(s.rn, %s ) = 1;"; + + " where mod(s.rn, %s ) = 1;"; sql = String.format(sql, param.getColName(), param.getColName(), param.getColName(), param.getSchema(), - param.getName(), param.getOffset()); + param.getName(), param.getOffset()); return adasQueryPointList(connection, sql); } @@ -148,4 +163,25 @@ public class OpgsDataAccessService extends AbstractDataAccessService { public boolean dasCheckDatabaseNotEmpty() { return opgsMetaDataMapper.checkDatabaseNotEmpty(properties.getSchema()); } + + + @Override + public LowerCaseTableNames queryLowerCaseTableNames() { + String sql = "SHOW VARIABLES LIKE \"lower_case_table_names\";"; + Connection connection = getConnection(); + Map result = new HashMap<>(); + try (PreparedStatement ps = connection.prepareStatement(sql); ResultSet resultSet = ps.executeQuery()) { + while (resultSet.next()) { + String name = resultSet.getString("name"); + LowerCaseTableNames setting = LowerCaseTableNames.codeOf(resultSet.getString("setting")); + result.put(name, setting); + } + } catch (SQLException ex) { + log.error("queryLowerCaseTableNames error", ex); + } finally { + closeConnection(connection); + } + return isOgCompatibilityB() ? result.getOrDefault(DOLPHIN_LOWER_CASE_TABLE_NAMES, LowerCaseTableNames.UNKNOWN) + : result.getOrDefault(LOWER_CASE_TABLE_NAMES, LowerCaseTableNames.UNKNOWN); + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java index 69c33eb..b2426c1 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.extract.data.access; import org.opengauss.datachecker.common.entry.common.DataAccessParam; import org.opengauss.datachecker.common.entry.common.Health; +import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean; import org.opengauss.datachecker.common.entry.extract.TableMetadata; @@ -24,6 +25,7 @@ import org.opengauss.datachecker.extract.data.mapper.OracleMetaDataMapper; import java.sql.Connection; import java.util.List; +import java.util.Objects; /** * OracleDataAccessService @@ -74,8 +76,8 @@ public class OracleDataAccessService extends AbstractDataAccessService { @Override public List queryTablePrimaryColumns() { String sql = "SELECT A.TABLE_NAME tableName, A.COLUMN_NAME columnName FROM ALL_CONS_COLUMNS A,ALL_CONSTRAINTS B" - + " WHERE A.constraint_name = B.constraint_name AND B.constraint_type = 'P' AND A.OWNER = '" - + properties.getSchema() + "'"; + + " WHERE A.constraint_name = B.constraint_name AND B.constraint_type = 'P' AND A.OWNER = '" + + properties.getSchema() + "'"; return adasQueryTablePrimaryColumns(sql); } @@ -87,9 +89,14 @@ public class OracleDataAccessService extends AbstractDataAccessService { @Override public List dasQueryTableMetadataList() { String schema = properties.getSchema(); - String sql = "SELECT t.owner tableSchema,t.table_name tableName,t.num_rows tableRows,avg_row_len avgRowLength" - + " FROM ALL_TABLES t LEFT JOIN (SELECT DISTINCT table_name from ALL_CONSTRAINTS where OWNER = '" + schema - + "' AND constraint_type='P') pc on t.table_name=pc.table_name WHERE t.OWNER = '" + schema + "'"; + LowerCaseTableNames lowerCaseTableNames = getLowerCaseTableNames(); + String colTableName = Objects.equals(LowerCaseTableNames.SENSITIVE, lowerCaseTableNames) + ? "t.table_name tableName" + : "lower(t.table_name) tableName"; + String sql = "SELECT t.owner tableSchema," + colTableName + ",t.num_rows tableRows,avg_row_len avgRowLength" + + " FROM ALL_TABLES t LEFT JOIN (SELECT DISTINCT table_name from ALL_CONSTRAINTS where OWNER = '" + + schema + "' AND constraint_type='P') pc on t.table_name=pc.table_name WHERE t.OWNER = '" + + schema + "'"; return wrapperTableMetadata(adasQueryTableMetadataList(sql)); } @@ -122,4 +129,9 @@ public class OracleDataAccessService extends AbstractDataAccessService { public boolean dasCheckDatabaseNotEmpty() { return oracleMetaDataMapper.checkDatabaseNotEmpty(properties.getSchema()); } + + @Override + public LowerCaseTableNames queryLowerCaseTableNames() { + return LowerCaseTableNames.INSENSITIVE; + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DataConsolidationServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DataConsolidationServiceImpl.java index 09e0def..73e907b 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DataConsolidationServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DataConsolidationServiceImpl.java @@ -32,6 +32,7 @@ import org.opengauss.datachecker.extract.config.KafkaConsumerConfig; import org.opengauss.datachecker.extract.kafka.KafkaAdminService; import org.opengauss.datachecker.extract.service.MetaDataService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.Assert; @@ -72,6 +73,8 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { private ExtractProperties extractProperties; @Autowired private MetaDataService metaDataService; + @Value("${spring.extract.debezium-num-period}") + private int maxBachSize; private DebeziumWorker worker = null; private ExecutorService executorService = null; @@ -81,6 +84,7 @@ public class DataConsolidationServiceImpl implements DataConsolidationService { @Override public void initIncrementConfig() { if (extractProperties.isDebeziumEnable()) { + debeziumListener.setMaxBatchSize(maxBachSize); worker = new DebeziumWorker(debeziumListener, kafkaConfig); executorService = ThreadUtil.newSingleThreadExecutor(); executorService.submit(worker); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumConsumerListener.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumConsumerListener.java index 60e4ade..e12bf22 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumConsumerListener.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumConsumerListener.java @@ -39,8 +39,10 @@ import java.util.concurrent.LinkedBlockingQueue; public class DebeziumConsumerListener { private static final Logger log = LogUtils.getLogger(); private static final LinkedBlockingQueue DATA_LOG_QUEUE = new LinkedBlockingQueue<>(); + private DeserializerAdapter adapter = new DeserializerAdapter(); private DebeziumDataHandler debeziumDataHandler; + private int maxBachSize; @Resource private MetaDataService metaDataService; @Resource @@ -66,7 +68,7 @@ public class DebeziumConsumerListener { } catch (DebeziumConfigException | JSONException ex) { // Abnormal message structure, ignoring the current message log.error("parse message abnormal: [{}] {} ignoring this message : {}", ex.getMessage(), - System.getProperty("line.separator"), record); + System.getProperty("line.separator"), record); } } @@ -87,4 +89,22 @@ public class DebeziumConsumerListener { public DebeziumDataBean poll() { return DATA_LOG_QUEUE.poll(); } + + /** + * maxBachSize + * + * @param maxBachSize maxBachSize + */ + public void setMaxBatchSize(int maxBachSize) { + this.maxBachSize = maxBachSize; + } + + /** + * maxBachSize + * + * @return maxBachSize + */ + public int getMaxBatchSize() { + return this.maxBachSize; + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java index d9df6b0..6a11136 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/debezium/DebeziumWorker.java @@ -43,10 +43,11 @@ public class DebeziumWorker implements Runnable { private static final AtomicBoolean RUNNING = new AtomicBoolean(true); private static final AtomicInteger POLL_BATCH_COUNT = new AtomicInteger(); private static final AtomicInteger RETRY_POLL_EMPTY = new AtomicInteger(); - private static final int MAX_BATCH_COUNT = 1000; + private static final int DEFAULT_MAX_BATCH_SIZE = 1000; private static final int RETRY_TIMES = 3; private static final String NAME = "DebeziumWorker"; + private int maxBatchCount; private DebeziumConsumerListener debeziumConsumerListener; private KafkaConsumerConfig kafkaConsumerConfig; private KafkaConsumer consumer = null; @@ -60,6 +61,8 @@ public class DebeziumWorker implements Runnable { public DebeziumWorker(DebeziumConsumerListener debeziumConsumerListener, KafkaConsumerConfig kafkaConsumerConfig) { this.debeziumConsumerListener = debeziumConsumerListener; this.kafkaConsumerConfig = kafkaConsumerConfig; + int maxBatchSize = debeziumConsumerListener.getMaxBatchSize(); + this.maxBatchCount = maxBatchSize <= 0 ? DEFAULT_MAX_BATCH_SIZE : maxBatchSize; } @Override @@ -90,7 +93,7 @@ public class DebeziumWorker implements Runnable { } } POLL_BATCH_COUNT.addAndGet(records.count()); - if (POLL_BATCH_COUNT.get() > MAX_BATCH_COUNT) { + if (POLL_BATCH_COUNT.get() > maxBatchCount) { PAUSE_OR_RESUME.set(WorkerSwitch.PAUSE); POLL_BATCH_COUNT.set(0); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java index 5c9955d..4744fc0 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/service/DataExtractServiceImpl.java @@ -99,7 +99,6 @@ public class DataExtractServiceImpl implements DataExtractService { * The sleep time of the thread executing the data extraction task each time, in milliseconds */ private static final int MAX_SLEEP_MILLIS_TIME = 2000; - private static final int MAX_QUERY_PAGE_SIZE = 500; private static final String PROCESS_NO_RESET = "0"; private static final String TASK_NAME_PREFIX = "extract_task_"; private static final int SINGLE_SLICE_NUM = 1; @@ -566,6 +565,7 @@ public class DataExtractServiceImpl implements DataExtractService { final Database database = new Database(); BeanUtils.copyProperties(extractProperties, database); BeanUtils.copyProperties(extractProperties, config); + database.setLowercaseTableNames(dataAccessService.queryLowerCaseTableNames()); config.setDatabase(database); return config; } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java index ee0d91c..49d3a3b 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/CheckPoint.java @@ -157,10 +157,4 @@ public class CheckPoint { ConnectionMgr.close(connection, null, null); return Long.parseLong(maxId); } - - public boolean checkInvalidPrimaryKey(TableMetadata tableMetadata) { - ColumnsMetaData pkColumn = tableMetadata.getPrimaryMetas() - .get(0); - return MetaDataUtil.isInvalidPrimaryKey(pkColumn); - } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/QueryStatementFactory.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/QueryStatementFactory.java index 571c383..c15698e 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/QueryStatementFactory.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/task/sql/QueryStatementFactory.java @@ -17,7 +17,6 @@ package org.opengauss.datachecker.extract.task.sql; import org.opengauss.datachecker.common.config.ConfigCache; import org.opengauss.datachecker.common.entry.extract.TableMetadata; -import org.opengauss.datachecker.common.exception.ExtractPrimaryKeyException; import org.opengauss.datachecker.extract.task.CheckPoint; /** @@ -37,14 +36,6 @@ public class QueryStatementFactory { * @return SliceQueryStatement */ public AutoSliceQueryStatement createSliceQueryStatement(CheckPoint checkPoint, TableMetadata tableMetadata) { - if (checkPoint.checkInvalidPrimaryKey(tableMetadata)) { - String dataType = tableMetadata.getPrimaryMetas() - .get(0) - .getDataType(); - throw new ExtractPrimaryKeyException( - "current not support primary key type for this table " + tableMetadata.getTableName() + " dataType : " - + dataType); - } return new SinglePrimaryAutoSliceQueryStatement(checkPoint); } -- Gitee