From 704e15922e48e5700debb20e9b98ceab05b664ae Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Thu, 7 Nov 2024 11:35:34 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=A0=A1=E9=AA=8C?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E5=A4=84=E7=90=86=E6=B5=81=E7=A8=8B=20?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E6=89=93=E5=8D=B0=E5=BC=82=E5=B8=B8=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../check/slice/SliceCheckEventHandler.java | 4 +- .../data/access/MysqlDataAccessService.java | 4 +- .../data/access/OpgsDataAccessService.java | 60 +++++++++++++------ 3 files changed, 44 insertions(+), 24 deletions(-) diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java index 0e7771b..36d7218 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckEventHandler.java @@ -93,9 +93,7 @@ public class SliceCheckEventHandler { */ public void handleFailed(SliceCheckEvent checkEvent) { LogUtils.warn(log, "slice check event , table slice has unknown error [{}][{} : {}]", checkEvent.getCheckName(), - checkEvent.getSource() - .getTableHash(), checkEvent.getSink() - .getTableHash()); + checkEvent.getSource(), checkEvent.getSink()); long count = getCheckSliceCount(checkEvent); sliceCheckContext.refreshSliceCheckProgress(checkEvent.getSlice(), count); CheckDiffResult result = buildSliceDiffResult(checkEvent.getSlice(), (int) count, true, "slice has unknown error"); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java index 40319a2..8383c69 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java @@ -65,8 +65,8 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public List dasQueryTableNameList() { String schema = properties.getSchema(); - String sql = "SELECT info.table_name tableName FROM information_schema.tables info WHERE table_schema='" - + schema + "'"; + String sql = "select info.table_name tableName from information_schema.tables info where table_schema='" + + schema + "' and table_type='BASE TABLE'"; return adasQueryTableNameList(sql); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java index 6b5dca7..d910ef5 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java @@ -27,6 +27,7 @@ import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.opengauss.datachecker.extract.data.mapper.OpgsMetaDataMapper; import javax.annotation.PostConstruct; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -75,22 +76,43 @@ public class OpgsDataAccessService extends AbstractDataAccessService { return health(schema, sql); } + /** + *
+     * DAS查询表名列表
+     *  select c.relname tableName from pg_class c  LEFT JOIN pg_namespace n on n.oid = c.relnamespace
+     *  where n.nspname=? and c.relkind ='r';
+     *  
+ * + * @return tableNameList + */ @Override public List dasQueryTableNameList() { String schema = properties.getSchema(); String sql = "select c.relname tableName from pg_class c LEFT JOIN pg_namespace n on n.oid = c.relnamespace " - + " where n.nspname='" + schema + "' and c.relkind ='r';"; + + " where n.nspname='" + schema + "' and c.relkind ='r';"; return adasQueryTableNameList(sql); } + /** + *
+     *     查询表主键列信息
+     *      select c.relname tableName,ns.nspname,ns.oid,a.attname columnName from pg_class c
+     *      left join pg_namespace ns on c.relnamespace=ns.oid
+     *      left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped
+     *      inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey)
+     *      where ns.nspname='test' and cs.contype='p';
+     * 
+ * + * @return primaryColumnList 主键列信息列表 + */ @Override public List queryTablePrimaryColumns() { String schema = properties.getSchema(); String sql = "select c.relname tableName,ns.nspname,ns.oid,a.attname columnName from pg_class c " - + "left join pg_namespace ns on c.relnamespace=ns.oid " - + "left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped " - + "inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey) " - + "where ns.nspname='" + schema + "' and cs.contype='p';"; + + "left join pg_namespace ns on c.relnamespace=ns.oid " + + "left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped " + + "inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey) " + + "where ns.nspname='" + schema + "' and cs.contype='p';"; return adasQueryTablePrimaryColumns(sql); } @@ -98,10 +120,10 @@ public class OpgsDataAccessService extends AbstractDataAccessService { public List queryTablePrimaryColumns(String tableName) { String schema = properties.getSchema(); String sql = "select c.relname tableName,ns.nspname,ns.oid,a.attname columnName from pg_class c " - + "left join pg_namespace ns on c.relnamespace=ns.oid " - + "left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped " - + "inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey) " - + "where ns.nspname='" + schema + "' and c.relname='" + tableName + "' and cs.contype='p';"; + + "left join pg_namespace ns on c.relnamespace=ns.oid " + + "left join pg_attribute a on c.oid=a.attrelid and a.attnum>0 and not a.attisdropped " + + "inner join pg_constraint cs on a.attrelid=cs.conrelid and a.attnum=any(cs.conkey) " + + "where ns.nspname='" + schema + "' and c.relname='" + tableName + "' and cs.contype='p';"; return adasQueryTablePrimaryColumns(sql); } @@ -119,12 +141,12 @@ public class OpgsDataAccessService extends AbstractDataAccessService { public List dasQueryTableMetadataList() { LowerCaseTableNames lowerCaseTableNames = getLowerCaseTableNames(); String colTableName = Objects.equals(LowerCaseTableNames.SENSITIVE, lowerCaseTableNames) - ? "c.relname tableName" - : "lower(c.relname) tableName"; + ? "c.relname tableName" + : "lower(c.relname) tableName"; String sql = " select n.nspname tableSchema, " + colTableName + ",c.reltuples tableRows, " - + "case when c.reltuples>0 then pg_table_size(c.oid)/c.reltuples else 0 end as avgRowLength " - + "from pg_class c LEFT JOIN pg_namespace n on n.oid = c.relnamespace " + "where n.nspname='" - + properties.getSchema() + "' and c.relkind ='r';"; + + "case when c.reltuples>0 then pg_table_size(c.oid)/c.reltuples else 0 end as avgRowLength " + + "from pg_class c LEFT JOIN pg_namespace n on n.oid = c.relnamespace " + "where n.nspname='" + + properties.getSchema() + "' and c.relkind ='r';"; return wrapperTableMetadata(adasQueryTableMetadataList(sql)); } @@ -153,9 +175,9 @@ public class OpgsDataAccessService extends AbstractDataAccessService { @Override public List queryPointList(Connection connection, DataAccessParam param) { String sql = "select s.%s from ( select row_number() over(order by r.%s asc) as rn,r.%s from %s.%s r) s" - + " where mod(s.rn, %s ) = 1;"; + + " where mod(s.rn, %s ) = 1;"; sql = String.format(sql, param.getColName(), param.getColName(), param.getColName(), param.getSchema(), - param.getName(), param.getOffset()); + param.getName(), param.getOffset()); return adasQueryPointList(connection, sql); } @@ -164,7 +186,6 @@ public class OpgsDataAccessService extends AbstractDataAccessService { return opgsMetaDataMapper.checkDatabaseNotEmpty(properties.getSchema()); } - @Override public LowerCaseTableNames queryLowerCaseTableNames() { String sql = "SHOW VARIABLES LIKE \"lower_case_table_names\";"; @@ -181,7 +202,8 @@ public class OpgsDataAccessService extends AbstractDataAccessService { } finally { closeConnection(connection); } - return isOgCompatibilityB() ? result.getOrDefault(DOLPHIN_LOWER_CASE_TABLE_NAMES, LowerCaseTableNames.UNKNOWN) - : result.getOrDefault(LOWER_CASE_TABLE_NAMES, LowerCaseTableNames.UNKNOWN); + return isOgCompatibilityB() + ? result.getOrDefault(DOLPHIN_LOWER_CASE_TABLE_NAMES, LowerCaseTableNames.UNKNOWN) + : result.getOrDefault(LOWER_CASE_TABLE_NAMES, LowerCaseTableNames.UNKNOWN); } } -- Gitee From 09f1869899b5dc72ef9fd5e489d149b7ad1fa4fd Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Sat, 9 Nov 2024 16:09:19 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=A1=A8=E5=94=AF?= =?UTF-8?q?=E4=B8=80=E6=80=A7=E7=BA=A6=E6=9D=9F=E5=9C=BA=E6=99=AF=E8=A1=A8?= =?UTF-8?q?=E6=A0=A1=E9=AA=8C=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/log4j2.xml | 10 +-- .../entry/extract/PrimaryColumnBean.java | 4 + .../entry/extract/UniqueColumnBean.java | 48 +++++++++++ .../extract/data/BaseDataService.java | 61 +++++++------- .../access/AbstractDataAccessService.java | 80 +++++++++++++++---- .../data/access/CsvDataAccessService.java | 5 ++ .../data/access/DataAccessService.java | 30 ++++--- .../data/access/MysqlDataAccessService.java | 35 +++++--- .../data/access/OpgsDataAccessService.java | 13 +++ .../data/access/OracleDataAccessService.java | 25 ++++-- 10 files changed, 231 insertions(+), 80 deletions(-) create mode 100644 datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/UniqueColumnBean.java diff --git a/config/log4j2.xml b/config/log4j2.xml index e22d912..f8cd663 100644 --- a/config/log4j2.xml +++ b/config/log4j2.xml @@ -30,19 +30,13 @@ - + - - - - - - + diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/PrimaryColumnBean.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/PrimaryColumnBean.java index 651e26e..5f85c51 100644 --- a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/PrimaryColumnBean.java +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/PrimaryColumnBean.java @@ -15,7 +15,9 @@ package org.opengauss.datachecker.common.entry.extract; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; /** * PrimaryColumnBean @@ -25,6 +27,8 @@ import lombok.Data; * @since :11 */ @Data +@NoArgsConstructor +@AllArgsConstructor public class PrimaryColumnBean { /** * Table diff --git a/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/UniqueColumnBean.java b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/UniqueColumnBean.java new file mode 100644 index 0000000..158f147 --- /dev/null +++ b/datachecker-common/src/main/java/org/opengauss/datachecker/common/entry/extract/UniqueColumnBean.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. + * + * openGauss is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * + * http://license.coscl.org.cn/MulanPSL2 + * + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + */ + +package org.opengauss.datachecker.common.entry.extract; + +import lombok.Data; + +/** + * UniqueColumnBean + * + * @author :wangchao + * @date :Created in 2023/12/23 + * @since :11 + */ +@Data +public class UniqueColumnBean { + /** + * Table + */ + private String tableName; + + /** + * Primary key column name + */ + private String columnName; + + /** + * Index identifier + */ + private String indexIdentifier; + + /** + * Column index + */ + private Integer colIdx; +} \ No newline at end of file diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java index 45d3c7c..54e9451 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/BaseDataService.java @@ -16,6 +16,7 @@ package org.opengauss.datachecker.extract.data; import com.alibaba.druid.pool.DruidDataSource; + import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.logging.log4j.Logger; @@ -33,6 +34,7 @@ import org.opengauss.datachecker.extract.service.RuleAdapterService; import org.springframework.stereotype.Service; import javax.annotation.Resource; + import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; @@ -103,15 +105,13 @@ public class BaseDataService { */ public List bdsQueryTableMetadataList() { List metadataList = dataAccessService.dasQueryTableMetadataList(); - return metadataList.stream() - .filter(meta -> { - boolean isChecking = ruleAdapterService.filterTableByRule(meta.getTableName()); - if (isChecking) { - tableNameList.add(meta.getTableName()); - } - return isChecking; - }) - .collect(Collectors.toList()); + return metadataList.stream().filter(meta -> { + boolean isChecking = ruleAdapterService.filterTableByRule(meta.getTableName()); + if (isChecking) { + tableNameList.add(meta.getTableName()); + } + return isChecking; + }).collect(Collectors.toList()); } /** @@ -124,8 +124,7 @@ public class BaseDataService { if (CollectionUtils.isEmpty(columnBeanList)) { return new HashMap<>(); } - return columnBeanList.stream() - .collect(Collectors.groupingBy(PrimaryColumnBean::getTableName)); + return columnBeanList.stream().collect(Collectors.groupingBy(PrimaryColumnBean::getTableName)); } private List filterByTableRules(List tableNameList) { @@ -189,28 +188,31 @@ public class BaseDataService { /** * update table metadata, and filter column rules * - * @param tableMetadata table metadata + * @param tableMetadata table metadata * @param primaryColumnBeans primary column */ public void updateTableColumnMetaData(TableMetadata tableMetadata, List primaryColumnBeans) { String tableName = tableMetadata.getTableName(); final List columns = dataAccessService.queryTableColumnsMetaData(tableName); - if (Objects.isNull(columns)) { + if (CollectionUtils.isEmpty(columns)) { LogUtils.error(log, "table columns metadata is null ,{}", tableName); return; } - if (Objects.isNull(primaryColumnBeans)) { - primaryColumnBeans = dataAccessService.queryTablePrimaryColumns(tableName); + List tempPrimaryColumnBeans = primaryColumnBeans; + if (CollectionUtils.isEmpty(primaryColumnBeans)) { + tempPrimaryColumnBeans = dataAccessService.queryTablePrimaryColumns(tableName); } - if (Objects.nonNull(primaryColumnBeans)) { - List primaryColumnNameList = getPrimaryColumnNames(primaryColumnBeans); + if (CollectionUtils.isEmpty(tempPrimaryColumnBeans)) { + tempPrimaryColumnBeans = dataAccessService.queryTableUniqueColumns(tableName); + } + if (CollectionUtils.isNotEmpty(tempPrimaryColumnBeans)) { + List primaryColumnNameList = getPrimaryColumnNames(tempPrimaryColumnBeans); for (ColumnsMetaData column : columns) { if (primaryColumnNameList.contains(column.getLowerCaseColumnName())) { column.setColumnKey(ColumnKey.PRI); } } } - tableMetadata.setColumnsMetas(ruleAdapterService.executeColumnRule(columns)); tableMetadata.setPrimaryMetas(getTablePrimaryColumn(columns)); tableMetadata.setTableHash(calcTableHash(columns)); @@ -218,16 +220,17 @@ public class BaseDataService { private List getPrimaryColumnNames(List primaryColumnBeans) { return primaryColumnBeans.stream() - .map(PrimaryColumnBean::getColumnName) - .map(String::toLowerCase) - .collect(Collectors.toList()); + .map(PrimaryColumnBean::getColumnName) + .map(String::toLowerCase) + .distinct() + .collect(Collectors.toList()); } private List getTablePrimaryColumn(List columnsMetaData) { return columnsMetaData.stream() - .filter(meta -> ColumnKey.PRI.equals(meta.getColumnKey())) - .sorted(Comparator.comparing(ColumnsMetaData::getOrdinalPosition)) - .collect(Collectors.toList()); + .filter(meta -> ColumnKey.PRI.equals(meta.getColumnKey())) + .sorted(Comparator.comparing(ColumnsMetaData::getOrdinalPosition)) + .collect(Collectors.toList()); } /** @@ -255,9 +258,8 @@ public class BaseDataService { private long calcTableHash(List columnsMetas) { StringBuilder buffer = new StringBuilder(); columnsMetas.sort(Comparator.comparing(ColumnsMetaData::getOrdinalPosition)); - columnsMetas.forEach(column -> buffer.append(column.getColumnName() - .toLowerCase(Locale.ENGLISH)) - .append(column.getOrdinalPosition())); + columnsMetas.forEach(column -> buffer.append(column.getColumnName().toLowerCase(Locale.ENGLISH)) + .append(column.getOrdinalPosition())); return HASH_UTIL.hashBytes(buffer.toString()); } @@ -289,9 +291,8 @@ public class BaseDataService { } else { String[] sqlModeArray = sqlMode.split(","); String newSqlMode = Arrays.stream(sqlModeArray) - .filter(mode -> !mode.equalsIgnoreCase( - ConfigConstants.SQL_MODE_NAME_PAD_CHAR_TO_FULL_LENGTH)) - .collect(Collectors.joining(",")); + .filter(mode -> !mode.equalsIgnoreCase(ConfigConstants.SQL_MODE_NAME_PAD_CHAR_TO_FULL_LENGTH)) + .collect(Collectors.joining(",")); boolean isPadCharFull = ConfigCache.getBooleanValue(ConfigConstants.SQL_MODE_PAD_CHAR_TO_FULL_LENGTH); if (isPadCharFull) { newSqlMode += ConfigConstants.SQL_MODE_NAME_PAD_CHAR_TO_FULL_LENGTH; diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java index a83d498..93b19b8 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/AbstractDataAccessService.java @@ -16,6 +16,9 @@ package org.opengauss.datachecker.extract.data.access; import com.alibaba.druid.pool.DruidDataSource; + +import cn.hutool.core.collection.CollUtil; + import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.Logger; import org.opengauss.datachecker.common.config.ConfigCache; @@ -25,6 +28,7 @@ import org.opengauss.datachecker.common.entry.common.Health; import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames; import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean; import org.opengauss.datachecker.common.entry.extract.TableMetadata; +import org.opengauss.datachecker.common.entry.extract.UniqueColumnBean; import org.opengauss.datachecker.common.exception.ExtractDataAccessException; import org.opengauss.datachecker.common.util.DurationUtils; import org.opengauss.datachecker.common.util.LogUtils; @@ -36,16 +40,19 @@ import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import javax.annotation.Resource; import javax.sql.DataSource; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.time.Duration; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * AbstractDataAccessService @@ -113,7 +120,7 @@ public abstract class AbstractDataAccessService implements DataAccessService { public String adasQuerySchema(Connection connection, String executeQueryStatement) { String schema = ""; try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement); - ResultSet resultSet = ps.executeQuery()) { + ResultSet resultSet = ps.executeQuery()) { if (resultSet.next()) { schema = resultSet.getString(RS_COL_SCHEMA); } @@ -129,7 +136,7 @@ public abstract class AbstractDataAccessService implements DataAccessService { * 数据库schema是否合法 * * @param schema schema - * @param sql sql + * @param sql sql * @return result */ public Health health(String schema, String sql) { @@ -160,7 +167,7 @@ public abstract class AbstractDataAccessService implements DataAccessService { Connection connection = getConnection(); List list = new LinkedList<>(); try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement); - ResultSet resultSet = ps.executeQuery()) { + ResultSet resultSet = ps.executeQuery()) { while (resultSet.next()) { list.add(resultSet.getString(RS_COL_TABLE_NAME)); } @@ -185,7 +192,7 @@ public abstract class AbstractDataAccessService implements DataAccessService { Connection connection = getConnection(); List list = new LinkedList<>(); try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement); - ResultSet resultSet = ps.executeQuery()) { + ResultSet resultSet = ps.executeQuery()) { PrimaryColumnBean metadata; while (resultSet.next()) { metadata = new PrimaryColumnBean(); @@ -203,6 +210,50 @@ public abstract class AbstractDataAccessService implements DataAccessService { return list; } + /** + * adas查询表的唯一性约束列信息 + * + * @param executeQueryStatement executeQueryStatement + * @return List + */ + public List adasQueryTableUniqueColumns(String executeQueryStatement) { + Connection connection = getConnection(); + List list = new LinkedList<>(); + try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement); + ResultSet resultSet = ps.executeQuery()) { + UniqueColumnBean metadata; + while (resultSet.next()) { + metadata = new UniqueColumnBean(); + metadata.setTableName(resultSet.getString("tableName")); + metadata.setColumnName(resultSet.getString("columnName")); + metadata.setIndexIdentifier(resultSet.getString("indexIdentifier")); + metadata.setColIdx(resultSet.getInt("colIdx")); + list.add(metadata); + } + } catch (SQLException esql) { + LogUtils.error(log, "adasQueryTablePrimaryColumns error:", esql); + } finally { + closeConnection(connection); + } + return list; + } + + /** + * 将UniqueColumnBean列表转换为PrimaryColumnBean列表 + * + * @param uniqueColumns 输入的UniqueColumnBean列表,可能为空 + * @return PrimaryColumnBean列表,永远不会为null,其中的元素是唯一的 + */ + public List translateUniqueToPrimaryColumns(List uniqueColumns) { + if (CollUtil.isEmpty(uniqueColumns)) { + return new ArrayList<>(); + } + return uniqueColumns.stream() + .map(u -> new PrimaryColumnBean(u.getTableName(), u.getColumnName())) + .distinct() + .collect(Collectors.toList()); + } + /** * adasQueryTableMetadataList * @@ -214,7 +265,7 @@ public abstract class AbstractDataAccessService implements DataAccessService { Connection connection = getConnection(); List list = new LinkedList<>(); try (PreparedStatement ps = connection.prepareStatement(executeQueryStatement); - ResultSet resultSet = ps.executeQuery()) { + ResultSet resultSet = ps.executeQuery()) { TableMetadata metadata; while (resultSet.next()) { metadata = new TableMetadata(); @@ -238,7 +289,7 @@ public abstract class AbstractDataAccessService implements DataAccessService { * 查询表数据抽样检查点清单 * * @param connection connection - * @param sql 检查点查询SQL + * @param sql 检查点查询SQL * @return 检查点列表 */ protected List adasQueryPointList(Connection connection, String sql) { @@ -259,7 +310,7 @@ public abstract class AbstractDataAccessService implements DataAccessService { * 查询表数据抽样检查点清单 * * @param connection connection - * @param sql 检查点查询SQL + * @param sql 检查点查询SQL * @return 检查点列表 */ protected String adasQueryOnePoint(Connection connection, String sql) { @@ -277,8 +328,7 @@ public abstract class AbstractDataAccessService implements DataAccessService { } private long durationBetweenToMillis(LocalDateTime start, LocalDateTime end) { - return Duration.between(start, end) - .toMillis(); + return Duration.between(start, end).toMillis(); } /** @@ -292,15 +342,15 @@ public abstract class AbstractDataAccessService implements DataAccessService { return null; } return tableMetadata.setDataBaseType(properties.getDatabaseType()) - .setEndpoint(properties.getEndpoint()) - .setOgCompatibilityB(isOgCompatibilityB); + .setEndpoint(properties.getEndpoint()) + .setOgCompatibilityB(isOgCompatibilityB); } /** * jdbc mode does not use it * - * @param table table - * @param fileName fileName + * @param table table + * @param fileName fileName * @param differenceList differenceList * @return result */ @@ -317,8 +367,8 @@ public abstract class AbstractDataAccessService implements DataAccessService { */ protected List wrapperTableMetadata(List list) { list.forEach(meta -> meta.setDataBaseType(properties.getDatabaseType()) - .setEndpoint(properties.getEndpoint()) - .setOgCompatibilityB(isOgCompatibilityB)); + .setEndpoint(properties.getEndpoint()) + .setOgCompatibilityB(isOgCompatibilityB)); return list; } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java index f8eb60a..322796b 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/CsvDataAccessService.java @@ -269,4 +269,9 @@ public class CsvDataAccessService implements DataAccessService { public LowerCaseTableNames queryLowerCaseTableNames() { return LowerCaseTableNames.INSENSITIVE; } + + @Override + public List queryTableUniqueColumns(String tableName) { + return null; + } } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/DataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/DataAccessService.java index 5d2e84d..6eaef38 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/DataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/DataAccessService.java @@ -25,6 +25,7 @@ import org.opengauss.datachecker.common.entry.extract.TableMetadata; import org.springframework.jdbc.core.RowMapper; import javax.sql.DataSource; + import java.sql.Connection; import java.util.List; import java.util.Map; @@ -111,7 +112,7 @@ public interface DataAccessService { * query table column min value * * @param connection connection - * @param param param + * @param param param * @return min value of string */ String min(Connection connection, DataAccessParam param); @@ -120,7 +121,7 @@ public interface DataAccessService { * query table column max value * * @param connection connection - * @param param param + * @param param param * @return max value of string */ String max(Connection connection, DataAccessParam param); @@ -136,10 +137,10 @@ public interface DataAccessService { /** * query row data by sql * - * @param sql sql - * @param param sql param + * @param sql sql + * @param param sql param * @param rowMapper row mapper - * @param data type + * @param data type * @return data */ List query(String sql, Map param, RowMapper rowMapper); @@ -147,10 +148,10 @@ public interface DataAccessService { /** * query data from csv file * - * @param table table - * @param fileName fileName + * @param table table + * @param fileName fileName * @param differenceList differenceList - * @return + * @return data */ List> query(String table, String fileName, List differenceList); @@ -165,7 +166,7 @@ public interface DataAccessService { * query table check point list * * @param connection connection - * @param param param + * @param param param * @return point list */ List queryPointList(Connection connection, DataAccessParam param); @@ -187,4 +188,15 @@ public interface DataAccessService { * @return value */ LowerCaseTableNames queryLowerCaseTableNames(); + + /** + * query table unique columns + *
+     *     唯一性约束与唯一性索引
+     * 
+ * + * @param tableName table + * @return unique columns + */ + List queryTableUniqueColumns(String tableName); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java index 8383c69..d388461 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java @@ -21,6 +21,7 @@ import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean; import org.opengauss.datachecker.common.entry.extract.TableMetadata; +import org.opengauss.datachecker.common.entry.extract.UniqueColumnBean; import org.opengauss.datachecker.extract.data.mapper.MysqlMetaDataMapper; import java.sql.Connection; @@ -52,8 +53,8 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public Health health() { String schema = properties.getSchema(); - String sql = "SELECT SCHEMA_NAME tableSchema FROM information_schema.SCHEMATA info WHERE SCHEMA_NAME='" - + schema + "' limit 1"; + String sql = "SELECT SCHEMA_NAME tableSchema FROM information_schema.SCHEMATA info WHERE SCHEMA_NAME='" + schema + + "' limit 1"; return health(schema, sql); } @@ -66,7 +67,7 @@ public class MysqlDataAccessService extends AbstractDataAccessService { public List dasQueryTableNameList() { String schema = properties.getSchema(); String sql = "select info.table_name tableName from information_schema.tables info where table_schema='" - + schema + "' and table_type='BASE TABLE'"; + + schema + "' and table_type='BASE TABLE'"; return adasQueryTableNameList(sql); } @@ -83,11 +84,23 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public List queryTablePrimaryColumns() { String sql = "select table_name tableName ,lower(column_name) columnName from information_schema.columns " - + "where table_schema='" + properties.getSchema() - + "' and column_key='PRI' order by ordinal_position asc "; + + "where table_schema='" + properties.getSchema() + "' and column_key='PRI' order by ordinal_position asc "; return adasQueryTablePrimaryColumns(sql); } + @Override + public List queryTableUniqueColumns(String tableName) { + String schema = properties.getSchema(); + String sql = "select kcu.table_name tableName, kcu.column_name columnName,kcu.ordinal_position colIdx," + + " kcu.constraint_name indexIdentifier from information_schema.table_constraints tc " + + " left join information_schema.KEY_COLUMN_USAGE kcu on tc.table_schema =kcu.table_schema" + + " and tc.constraint_name=kcu.constraint_name and tc.table_name = kcu.table_name" + + " where tc.table_schema='" + schema + "' and tc.table_name='" + tableName + "'" + + " and tc.constraint_type='UNIQUE' ;"; + List uniqueColumns = adasQueryTableUniqueColumns(sql); + return translateUniqueToPrimaryColumns(uniqueColumns); + } + @Override public List queryTablePrimaryColumns(String tableName) { return mysqlMetaDataMapper.queryTablePrimaryColumnsByTableName(properties.getSchema(), tableName); @@ -97,11 +110,11 @@ public class MysqlDataAccessService extends AbstractDataAccessService { public List dasQueryTableMetadataList() { LowerCaseTableNames lowerCaseTableNames = getLowerCaseTableNames(); String colTableName = Objects.equals(LowerCaseTableNames.SENSITIVE, lowerCaseTableNames) - ? "info.table_name tableName" - : "lower(info.table_name) tableName"; + ? "info.table_name tableName" + : "lower(info.table_name) tableName"; String sql = " SELECT info.TABLE_SCHEMA tableSchema," + colTableName + ",info.table_rows tableRows , " - + "info.avg_row_length avgRowLength FROM information_schema.tables info WHERE TABLE_SCHEMA='" - + properties.getSchema() + "'"; + + "info.avg_row_length avgRowLength FROM information_schema.tables info WHERE TABLE_SCHEMA='" + + properties.getSchema() + "'"; return wrapperTableMetadata(adasQueryTableMetadataList(sql)); } @@ -130,9 +143,9 @@ public class MysqlDataAccessService extends AbstractDataAccessService { @Override public List queryPointList(Connection connection, DataAccessParam param) { String sql = "select s.%s from (SELECT @rowno:=@rowno+1 as rn,r.%s from %s.%s r," - + " (select @rowno := 0) t ORDER BY r.%s asc) s where mod(s.rn, %s) = 1"; + + " (select @rowno := 0) t ORDER BY r.%s asc) s where mod(s.rn, %s) = 1"; sql = String.format(sql, param.getColName(), param.getColName(), param.getSchema(), param.getName(), - param.getColName(), param.getOffset()); + param.getColName(), param.getOffset()); return adasQueryPointList(connection, sql); } diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java index d910ef5..86a0843 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OpgsDataAccessService.java @@ -24,6 +24,7 @@ import org.opengauss.datachecker.common.entry.enums.OgCompatibility; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean; import org.opengauss.datachecker.common.entry.extract.TableMetadata; +import org.opengauss.datachecker.common.entry.extract.UniqueColumnBean; import org.opengauss.datachecker.extract.data.mapper.OpgsMetaDataMapper; import javax.annotation.PostConstruct; @@ -127,6 +128,18 @@ public class OpgsDataAccessService extends AbstractDataAccessService { return adasQueryTablePrimaryColumns(sql); } + @Override + public List queryTableUniqueColumns(String tableName) { + String schema = properties.getSchema(); + String sql = "SELECT c.relname AS tableName, ns.nspname, i.indexrelid indexIdentifier, " + + " a.attname AS columnName, a.attnum colIdx FROM pg_index i" + + " JOIN pg_class c ON i.indrelid = c.oid join pg_namespace ns on c.relnamespace=ns.oid" + + " JOIN pg_attribute a ON i.indrelid = a.attrelid AND a.attnum = ANY(i.indkey) " + + " where ns.nspname='" + schema + "' and c.relname='" + tableName + "' and i.indisunique = true;"; + List uniqueColumns = adasQueryTableUniqueColumns(sql); + return translateUniqueToPrimaryColumns(uniqueColumns); + } + @Override public List queryTableColumnsMetaData(String tableName) { return opgsMetaDataMapper.queryTableColumnsMetaData(properties.getSchema(), tableName); diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java index b2426c1..e29bafb 100644 --- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java +++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java @@ -21,6 +21,7 @@ import org.opengauss.datachecker.common.entry.enums.LowerCaseTableNames; import org.opengauss.datachecker.common.entry.extract.ColumnsMetaData; import org.opengauss.datachecker.common.entry.extract.PrimaryColumnBean; import org.opengauss.datachecker.common.entry.extract.TableMetadata; +import org.opengauss.datachecker.common.entry.extract.UniqueColumnBean; import org.opengauss.datachecker.extract.data.mapper.OracleMetaDataMapper; import java.sql.Connection; @@ -76,11 +77,22 @@ public class OracleDataAccessService extends AbstractDataAccessService { @Override public List queryTablePrimaryColumns() { String sql = "SELECT A.TABLE_NAME tableName, A.COLUMN_NAME columnName FROM ALL_CONS_COLUMNS A,ALL_CONSTRAINTS B" - + " WHERE A.constraint_name = B.constraint_name AND B.constraint_type = 'P' AND A.OWNER = '" - + properties.getSchema() + "'"; + + " WHERE A.constraint_name = B.constraint_name AND B.constraint_type = 'P' AND A.OWNER = '" + + properties.getSchema() + "'"; return adasQueryTablePrimaryColumns(sql); } + @Override + public List queryTableUniqueColumns(String tableName) { + String schema = properties.getSchema(); + String sql = "SELECT uc.table_name tableName,uc.constraint_name indexIdentifier,ucc.column_name columnName," + + " uc.constraint_type,ucc.position colIdx FROM USER_CONSTRAINTS uc " + + " JOIN USER_CONS_COLUMNS ucc ON uc.constraint_name=ucc.constraint_name " + + " WHERE uc.constraint_type='U' and uc.owner='" + schema + "'and uc.table_name='" + tableName + "'"; + List uniqueColumns = adasQueryTableUniqueColumns(sql); + return translateUniqueToPrimaryColumns(uniqueColumns); + } + @Override public List queryTablePrimaryColumns(String tableName) { return oracleMetaDataMapper.queryTablePrimaryColumnsByTableName(properties.getSchema(), tableName); @@ -91,12 +103,11 @@ public class OracleDataAccessService extends AbstractDataAccessService { String schema = properties.getSchema(); LowerCaseTableNames lowerCaseTableNames = getLowerCaseTableNames(); String colTableName = Objects.equals(LowerCaseTableNames.SENSITIVE, lowerCaseTableNames) - ? "t.table_name tableName" - : "lower(t.table_name) tableName"; + ? "t.table_name tableName" + : "lower(t.table_name) tableName"; String sql = "SELECT t.owner tableSchema," + colTableName + ",t.num_rows tableRows,avg_row_len avgRowLength" - + " FROM ALL_TABLES t LEFT JOIN (SELECT DISTINCT table_name from ALL_CONSTRAINTS where OWNER = '" - + schema + "' AND constraint_type='P') pc on t.table_name=pc.table_name WHERE t.OWNER = '" - + schema + "'"; + + " FROM ALL_TABLES t LEFT JOIN (SELECT DISTINCT table_name from ALL_CONSTRAINTS where OWNER = '" + schema + + "' AND constraint_type='P') pc on t.table_name=pc.table_name WHERE t.OWNER = '" + schema + "'"; return wrapperTableMetadata(adasQueryTableMetadataList(sql)); } -- Gitee From 3b7aac4b0b55ed56927be1e5f39737c1a1cdcdea Mon Sep 17 00:00:00 2001 From: mystarry-sky Date: Mon, 11 Nov 2024 16:12:14 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=A1=A8=E5=94=AF?= =?UTF-8?q?=E4=B8=80=E6=80=A7=E7=B4=A2=E5=BC=95=E5=9C=BA=E6=99=AF=E8=A1=A8?= =?UTF-8?q?=E6=A0=A1=E9=AA=8C=E3=80=82=20=E4=BF=AE=E5=A4=8Dconsumer?= =?UTF-8?q?=E6=8B=89=E5=8F=96=E5=BC=82=E5=B8=B8=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../modules/check/KafkaConsumerHandler.java | 27 ++-- .../check/slice/SliceCheckContext.java | 15 +- .../check/slice/SliceCheckWorker.java | 135 ++++++++---------- .../data/access/MysqlDataAccessService.java | 11 +- .../data/access/OracleDataAccessService.java | 10 +- 5 files changed, 98 insertions(+), 100 deletions(-) diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java index e142040..3a83c2a 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/modules/check/KafkaConsumerHandler.java @@ -16,6 +16,7 @@ package org.opengauss.datachecker.check.modules.check; import com.alibaba.fastjson.JSON; + import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -26,6 +27,7 @@ import org.opengauss.datachecker.common.entry.extract.RowDataHash; import org.opengauss.datachecker.common.entry.extract.SliceExtend; import org.opengauss.datachecker.common.exception.CheckConsumerPollEmptyException; import org.opengauss.datachecker.common.util.LogUtils; +import org.opengauss.datachecker.common.util.ThreadUtil; import java.time.Duration; import java.util.*; @@ -44,10 +46,11 @@ public class KafkaConsumerHandler { private static final int MAX_CONSUMER_POLL_TIMES = 50; private KafkaConsumer kafkaConsumer; + /** * Constructor * - * @param consumer consumer + * @param consumer consumer * @param retryTimes retryTimes */ public KafkaConsumerHandler(KafkaConsumer consumer, int retryTimes) { @@ -66,6 +69,7 @@ public class KafkaConsumerHandler { /** * 获取kafka consumer * + * @return consumer */ public KafkaConsumer getConsumer() { return kafkaConsumer; @@ -74,7 +78,7 @@ public class KafkaConsumerHandler { /** * Query the Kafka partition data corresponding to the specified table * - * @param topic Kafka topic + * @param topic Kafka topic * @param partitions Kafka partitions * @return kafka partitions data */ @@ -96,8 +100,8 @@ public class KafkaConsumerHandler { * consumer poll data from the topic partition, and filter bu slice extend. then add data in the data list. * * @param topicPartition topic partition - * @param sExtend slice extend - * @param attempts + * @param sExtend slice extend + * @param attempts attempts */ public void consumerAssign(TopicPartition topicPartition, SliceExtend sExtend, int attempts) { kafkaConsumer.assign(List.of(topicPartition)); @@ -109,20 +113,21 @@ public class KafkaConsumerHandler { /** * consumer poll data from the topic partition, and filter bu slice extend. then add data in the data list. * - * @param sExtend slice extend + * @param sExtend slice extend * @param dataList data list */ public synchronized void pollTpSliceData(SliceExtend sExtend, List dataList) { AtomicLong currentCount = new AtomicLong(0); int pollEmptyCount = 0; while (currentCount.get() < sExtend.getCount()) { - ConsumerRecords records = - kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); + ConsumerRecords records = kafkaConsumer.poll( + Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); if (records.count() <= 0) { pollEmptyCount++; if (pollEmptyCount > MAX_CONSUMER_POLL_TIMES) { throw new CheckConsumerPollEmptyException(sExtend.getName()); } + ThreadUtil.sleep(KAFKA_CONSUMER_POLL_DURATION); continue; } pollEmptyCount = 0; @@ -139,8 +144,8 @@ public class KafkaConsumerHandler { /** * Query the Kafka partition data corresponding to the specified table * - * @param topic Kafka topic - * @param partitions Kafka partitions + * @param topic Kafka topic + * @param partitions Kafka partitions * @param shouldChangeConsumerGroup if true change consumer Group random * @return kafka partitions data */ @@ -188,8 +193,8 @@ public class KafkaConsumerHandler { } private void getTopicRecords(List dataList, KafkaConsumer kafkaConsumer) { - ConsumerRecords consumerRecords = - kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); + ConsumerRecords consumerRecords = kafkaConsumer.poll( + Duration.ofMillis(KAFKA_CONSUMER_POLL_DURATION)); consumerRecords.forEach(record -> { dataList.add(JSON.parseObject(record.value(), RowDataHash.class)); }); diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java index 5b23f55..9f0bc4f 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckContext.java @@ -81,10 +81,19 @@ public class SliceCheckContext { kafkaConsumerService.getRetryFetchRecordTimes()); } + /** + * get consumer retry fetch record times + * + * @return duration times + */ + public int getRetryFetchRecordTimes() { + return kafkaConsumerService.getRetryFetchRecordTimes(); + } + /** * get source or sink table topic * - * @param table table + * @param table table * @param endpoint source or sink * @return topic name */ @@ -97,7 +106,7 @@ public class SliceCheckContext { /** * refresh slice check progress * - * @param slice slice + * @param slice slice * @param rowCount slice of row count */ public void refreshSliceCheckProgress(SliceVo slice, long rowCount) { @@ -107,7 +116,7 @@ public class SliceCheckContext { /** * add slice check Result * - * @param slice slice + * @param slice slice * @param result check result */ public void addCheckResult(SliceVo slice, CheckDiffResult result) { diff --git a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java index 29c6bd0..26b42af 100644 --- a/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java +++ b/datachecker-check/src/main/java/org/opengauss/datachecker/check/slice/SliceCheckWorker.java @@ -17,6 +17,7 @@ package org.opengauss.datachecker.check.slice; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; + import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.kafka.common.TopicPartition; @@ -47,6 +48,7 @@ import org.opengauss.datachecker.common.exception.BucketNumberInconsistentExcept import org.opengauss.datachecker.common.exception.CheckConsumerPollEmptyException; import org.opengauss.datachecker.common.exception.MerkleTreeDepthException; import org.opengauss.datachecker.common.util.LogUtils; +import org.opengauss.datachecker.common.util.ThreadUtil; import org.opengauss.datachecker.common.util.TopicUtil; import org.springframework.lang.NonNull; @@ -72,8 +74,6 @@ import java.util.concurrent.CountDownLatch; public class SliceCheckWorker implements Runnable { private static final Logger LOGGER = LogUtils.getLogger(SliceCheckWorker.class); private static final int THRESHOLD_MIN_BUCKET_SIZE = 2; - // 设置最大尝试次数 - private static final int MAX_ATTEMPTS=5; private final SliceVo slice; @@ -81,17 +81,19 @@ public class SliceCheckWorker implements Runnable { private final SliceCheckEvent checkEvent; private final SliceCheckContext checkContext; private final TaskRegisterCenter registerCenter; - private final DifferencePair, List, List> difference = - DifferencePair.of(new LinkedList<>(), new LinkedList<>(), new LinkedList<>()); + private final DifferencePair, List, List> difference = DifferencePair.of( + new LinkedList<>(), new LinkedList<>(), new LinkedList<>()); private final LocalDateTime startTime; private long sliceRowCount; + // 设置最大尝试次数 + private int maxAttemptsTimes; private Topic topic = new Topic(); /** * slice check worker construct * - * @param checkEvent check event + * @param checkEvent check event * @param sliceCheckContext slice check context */ public SliceCheckWorker(SliceCheckEvent checkEvent, SliceCheckContext sliceCheckContext, @@ -102,6 +104,7 @@ public class SliceCheckWorker implements Runnable { this.slice = checkEvent.getSlice(); this.registerCenter = registerCenter; this.processNo = ConfigCache.getValue(ConfigConstants.PROCESS_NO); + this.maxAttemptsTimes = sliceCheckContext.getRetryFetchRecordTimes(); } @Override @@ -168,22 +171,17 @@ public class SliceCheckWorker implements Runnable { LogUtils.debug(LOGGER, "slice {} fetch empty", slice.getName()); } else { // sourceSize is less than thresholdMinBucketSize, that is, there is only one bucket. Compare - DifferencePair, List, List> subDifference = - compareBucketCommon(sourceTuple.getBuckets() - .get(0), sinkTuple.getBuckets() - .get(0)); - difference.getDiffering() - .addAll(subDifference.getDiffering()); - difference.getOnlyOnLeft() - .addAll(subDifference.getOnlyOnLeft()); - difference.getOnlyOnRight() - .addAll(subDifference.getOnlyOnRight()); + DifferencePair, List, List> subDifference = null; + subDifference = compareBucketCommon(sourceTuple.getBuckets().get(0), sinkTuple.getBuckets().get(0)); + difference.getDiffering().addAll(subDifference.getDiffering()); + difference.getOnlyOnLeft().addAll(subDifference.getOnlyOnLeft()); + difference.getOnlyOnRight().addAll(subDifference.getOnlyOnRight()); } } else { throw new BucketNumberInconsistentException(String.format( - "table[%s] slice[%s] build the bucket number is inconsistent, source-bucket-count=[%s] sink-bucket-count=[%s]" - + " Please synchronize data again! ", slice.getTable(), slice.getNo(), sourceTuple.getBucketSize(), - sinkTuple.getBucketSize())); + "table[%s] slice[%s] build the bucket number is inconsistent, source-bucket-count=[%s] " + + "sink-bucket-count=[%s] Please synchronize data again! ", slice.getTable(), slice.getNo(), + sourceTuple.getBucketSize(), sinkTuple.getBucketSize())); } } @@ -193,24 +191,23 @@ public class SliceCheckWorker implements Runnable { private void checkResult(String resultMsg) { CheckDiffResultBuilder builder = CheckDiffResultBuilder.builder(); - builder.process(ConfigCache.getValue(ConfigConstants.PROCESS_NO)) - .table(slice.getTable()) - .sno(slice.getNo()) - .error(resultMsg) - .topic(getConcatTableTopics()) - .schema(slice.getSchema()) - .fileName(slice.getName()) - .conditionLimit(getConditionLimit()) - .partitions(slice.getPtnNum()) - .isTableStructureEquals(true) - .startTime(startTime) - .endTime(LocalDateTime.now()) - .isExistTableMiss(false, null) - .rowCount((int) sliceRowCount) - .errorRate(20) - .checkMode(ConfigCache.getValue(ConfigConstants.CHECK_MODE, CheckMode.class)) - .keyDiff(difference.getOnlyOnLeft(), difference.getDiffering(), difference.getOnlyOnRight()); + .table(slice.getTable()) + .sno(slice.getNo()) + .error(resultMsg) + .topic(getConcatTableTopics()) + .schema(slice.getSchema()) + .fileName(slice.getName()) + .conditionLimit(getConditionLimit()) + .partitions(slice.getPtnNum()) + .isTableStructureEquals(true) + .startTime(startTime) + .endTime(LocalDateTime.now()) + .isExistTableMiss(false, null) + .rowCount((int) sliceRowCount) + .errorRate(20) + .checkMode(ConfigCache.getValue(ConfigConstants.CHECK_MODE, CheckMode.class)) + .keyDiff(difference.getOnlyOnLeft(), difference.getDiffering(), difference.getOnlyOnRight()); CheckDiffResult result = builder.build(); LogUtils.debug(LOGGER, "result {}", result); checkContext.addCheckResult(slice, result); @@ -233,18 +230,13 @@ public class SliceCheckWorker implements Runnable { return; } diffNodeList.forEach(diffNode -> { - Bucket sourceBucket = diffNode.getSource() - .getBucket(); - Bucket sinkBucket = diffNode.getSink() - .getBucket(); - DifferencePair, List, List> subDifference = - compareBucketCommon(sourceBucket, sinkBucket); - difference.getDiffering() - .addAll(subDifference.getDiffering()); - difference.getOnlyOnLeft() - .addAll(subDifference.getOnlyOnLeft()); - difference.getOnlyOnRight() - .addAll(subDifference.getOnlyOnRight()); + Bucket sourceBucket = diffNode.getSource().getBucket(); + Bucket sinkBucket = diffNode.getSink().getBucket(); + DifferencePair, List, List> subDifference = compareBucketCommon( + sourceBucket, sinkBucket); + difference.getDiffering().addAll(subDifference.getDiffering()); + difference.getOnlyOnLeft().addAll(subDifference.getOnlyOnLeft()); + difference.getOnlyOnRight().addAll(subDifference.getOnlyOnRight()); }); diffNodeList.clear(); } @@ -257,13 +249,10 @@ public class SliceCheckWorker implements Runnable { List entriesOnlyOnLeft = collectorDeleteOrInsert(bucketDifference.entriesOnlyOnLeft()); List entriesOnlyOnRight = collectorDeleteOrInsert(bucketDifference.entriesOnlyOnRight()); List differing = collectorUpdate(bucketDifference.entriesDiffering()); - - LogUtils.debug(LOGGER, "diff slice {} insert {}", slice.getName(), bucketDifference.entriesOnlyOnLeft() - .size()); - LogUtils.debug(LOGGER, "diff slice {} delete {}", slice.getName(), bucketDifference.entriesOnlyOnRight() - .size()); - LogUtils.debug(LOGGER, "diff slice {} update {}", slice.getName(), bucketDifference.entriesDiffering() - .size()); + LogUtils.debug(LOGGER, "diff slice {} insert {}", slice.getName(), bucketDifference.entriesOnlyOnLeft().size()); + LogUtils.debug(LOGGER, "diff slice {} delete {}", slice.getName(), + bucketDifference.entriesOnlyOnRight().size()); + LogUtils.debug(LOGGER, "diff slice {} update {}", slice.getName(), bucketDifference.entriesDiffering().size()); return DifferencePair.of(entriesOnlyOnLeft, entriesOnlyOnRight, differing); } @@ -313,12 +302,11 @@ public class SliceCheckWorker implements Runnable { // Initialize source bucket column list data long startFetch = System.currentTimeMillis(); CountDownLatch countDownLatch = new CountDownLatch(checkTupleList.size()); - int avgSliceCount = (int) (sourceTuple.getSlice() - .getCount() + sinkTuple.getSlice() - .getCount()) / 2; + int avgSliceCount = (int) (sourceTuple.getSlice().getCount() + sinkTuple.getSlice().getCount()) / 2; KafkaConsumerHandler consumer = checkContext.createKafkaHandler(); checkTupleList.forEach(check -> { - initBucketList(check.getEndpoint(), check.getSlice(), check.getBuckets(), bucketDiff, avgSliceCount, consumer); + initBucketList(check.getEndpoint(), check.getSlice(), check.getBuckets(), bucketDiff, avgSliceCount, + consumer); countDownLatch.countDown(); }); countDownLatch.await(); @@ -336,30 +324,33 @@ public class SliceCheckWorker implements Runnable { } private void initBucketList(Endpoint endpoint, SliceExtend sliceExtend, List bucketList, - Map> bucketDiff, int avgSliceCount, KafkaConsumerHandler consumer) { + Map> bucketDiff, int avgSliceCount, KafkaConsumerHandler consumer) { // Use feign client to pull Kafka data List dataList = new LinkedList<>(); - TopicPartition topicPartition = new TopicPartition(Objects.equals(Endpoint.SOURCE, endpoint) ? - topic.getSourceTopicName() : topic.getSinkTopicName(), topic.getPtnNum()); + TopicPartition topicPartition = new TopicPartition( + Objects.equals(Endpoint.SOURCE, endpoint) ? topic.getSourceTopicName() : topic.getSinkTopicName(), + topic.getPtnNum()); int attempts = 0; - while (attempts < MAX_ATTEMPTS) { + while (attempts < maxAttemptsTimes) { try { consumer.consumerAssign(topicPartition, sliceExtend, attempts); consumer.pollTpSliceData(sliceExtend, dataList); break; // 如果成功,跳出循环 } catch (CheckConsumerPollEmptyException ex) { - if (++attempts >= MAX_ATTEMPTS) { + if (++attempts >= maxAttemptsTimes) { checkContext.returnConsumer(consumer); throw ex; // 如果达到最大尝试次数,重新抛出异常 } + ThreadUtil.sleepOneSecond(); + LogUtils.warn(LOGGER, "poll slice data {} {} , retry ({})", sliceExtend.getName(), sliceExtend.getNo(), + attempts); } } if (CollectionUtils.isEmpty(dataList)) { return; } - BuilderBucketHandler bucketBuilder = - new BuilderBucketHandler(ConfigCache.getIntValue(ConfigConstants.BUCKET_CAPACITY)); - + BuilderBucketHandler bucketBuilder = new BuilderBucketHandler( + ConfigCache.getIntValue(ConfigConstants.BUCKET_CAPACITY)); Map bucketMap = new ConcurrentHashMap<>(InitialCapacity.CAPACITY_128); // Use the pulled data to build the bucket list bucketBuilder.builder(dataList, avgSliceCount, bucketMap); @@ -394,12 +385,6 @@ public class SliceCheckWorker implements Runnable { bucketList.sort(Comparator.comparingInt(Bucket::getNumber)); } - private void getSliceDataFromTopicPartition(KafkaConsumerHandler consumer, SliceExtend sExtend, - List dataList) throws CheckConsumerPollEmptyException { - - - } - /** *
      * Align the bucket list data according to the statistical results of source
@@ -411,12 +396,10 @@ public class SliceCheckWorker implements Runnable {
         if (MapUtils.isNotEmpty(bucketDiff)) {
             bucketDiff.forEach((number, pair) -> {
                 if (pair.getSource() == -1) {
-                    sourceTuple.getBuckets()
-                               .add(BuilderBucketHandler.builderEmpty(number));
+                    sourceTuple.getBuckets().add(BuilderBucketHandler.builderEmpty(number));
                 }
                 if (pair.getSink() == -1) {
-                    sinkTuple.getBuckets()
-                             .add(BuilderBucketHandler.builderEmpty(number));
+                    sinkTuple.getBuckets().add(BuilderBucketHandler.builderEmpty(number));
                 }
             });
         }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java
index d388461..afee709 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/MysqlDataAccessService.java
@@ -91,12 +91,11 @@ public class MysqlDataAccessService extends AbstractDataAccessService {
     @Override
     public List queryTableUniqueColumns(String tableName) {
         String schema = properties.getSchema();
-        String sql = "select kcu.table_name tableName, kcu.column_name columnName,kcu.ordinal_position colIdx,"
-            + " kcu.constraint_name indexIdentifier from  information_schema.table_constraints tc "
-            + " left join information_schema.KEY_COLUMN_USAGE kcu on tc.table_schema =kcu.table_schema"
-            + " and tc.constraint_name=kcu.constraint_name and tc.table_name = kcu.table_name"
-            + " where tc.table_schema='" + schema + "' and tc.table_name='" + tableName + "'"
-            + " and tc.constraint_type='UNIQUE' ;";
+        String sql = "select s.table_schema,s.table_name tableName,s.column_name columnName,c.ordinal_position colIdx,"
+            + " s.index_name indexIdentifier from information_schema.statistics s "
+            + " left join information_schema.columns c on s.table_schema=c.table_schema  "
+            + " and s.table_schema=c.table_schema and s.table_name=c.table_name and s.column_name=c.column_name "
+            + " where s.table_schema='" + schema + "' and s.table_name='" + tableName + "'" + " and s.non_unique=0;";
         List uniqueColumns = adasQueryTableUniqueColumns(sql);
         return translateUniqueToPrimaryColumns(uniqueColumns);
     }
diff --git a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java
index e29bafb..bfaef45 100644
--- a/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java
+++ b/datachecker-extract/src/main/java/org/opengauss/datachecker/extract/data/access/OracleDataAccessService.java
@@ -85,10 +85,12 @@ public class OracleDataAccessService extends AbstractDataAccessService {
     @Override
     public List queryTableUniqueColumns(String tableName) {
         String schema = properties.getSchema();
-        String sql = "SELECT uc.table_name tableName,uc.constraint_name indexIdentifier,ucc.column_name columnName,"
-            + " uc.constraint_type,ucc.position colIdx FROM USER_CONSTRAINTS uc "
-            + " JOIN USER_CONS_COLUMNS ucc ON uc.constraint_name=ucc.constraint_name "
-            + " WHERE uc.constraint_type='U' and uc.owner='" + schema + "'and uc.table_name='" + tableName + "'";
+        String sql = " SELECT ui.index_name indexIdentifier,ui.table_owner,ui.table_name tableName,"
+            + " utc.column_name columnName, utc.column_id colIdx"
+            + " from user_indexes ui left join user_ind_columns uic on ui.index_name=uic.index_name "
+            + " and ui.table_name=uic.table_name  "
+            + " left join user_tab_columns utc on ui.table_name =utc.table_name and uic.column_name=utc.column_name"
+            + " where ui.uniqueness='UNIQUE' and ui.table_owner='" + schema + "' and ui.table_name='" + tableName + "'";
         List uniqueColumns = adasQueryTableUniqueColumns(sql);
         return translateUniqueToPrimaryColumns(uniqueColumns);
     }
-- 
Gitee