diff --git a/CHANGELOG.md b/CHANGELOG.md index c17218d2837e9a83433e3dcd07b2af7194ba2968..9de68f81fe58eb0d2723a7fcdeeb4af05da8b365 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ - 修复了ignoreTables参数无效问题 - 修复了tables参数不支持%通配符问题,重新支持包括库名映射场景下的%通配符用法,例如"db1.t%:db2.t%" - 修复了当数据表没有显式主键,且表中有多行数据是重复的,可能会导致校验结果不准确问题 + - 修复元数据查询链路并发共享状态导致的 DDL-yes 随机漂移问题 ## 1.2.3 - 增加内存使用量限制,且当内存使用量超过该值时,会自动调低parallelThds,queueSize,chunkSize这几个影响性能的参数,并进行GC操作 diff --git a/MySQL/my_query_table_data.go b/MySQL/my_query_table_data.go index dd4a120deb2246e4e18b3bc23a464f7c50edfe98..d64783be8e2c16789e0e27dcd3d4346008a54822 100644 --- a/MySQL/my_query_table_data.go +++ b/MySQL/my_query_table_data.go @@ -35,6 +35,18 @@ func getDBScopeKey(db *sql.DB) string { return scopeKey } +func scopedTableCacheKey(db *sql.DB, schema, table, suffix string) string { + scopeKey := getDBScopeKey(db) + if suffix == "" { + return fmt.Sprintf("%s.%s.%s", scopeKey, schema, table) + } + return fmt.Sprintf("%s.%s.%s.%s", scopeKey, schema, table, suffix) +} + +func scopedColumnCacheKey(db *sql.DB, schema, table, column string) string { + return fmt.Sprintf("%s.%s.%s.%s", getDBScopeKey(db), schema, table, column) +} + /* 查询MySQL库下指定表的索引统计信息 */ @@ -43,20 +55,22 @@ func (my *QueryTable) QueryTableIndexColumnInfo(db *sql.DB, logThreadSeq int64) Event = "Q_Index_Statistics" tableData []map[string]interface{} err error + query string + logMsg string ) - strsql = fmt.Sprintf("SELECT isc.COLUMN_NAME AS columnName, isc.COLUMN_TYPE AS columnType, isc.COLUMN_KEY AS columnKey,isc.EXTRA AS autoIncrement, iss.NON_UNIQUE AS nonUnique, iss.INDEX_NAME AS indexName, iss.SEQ_IN_INDEX AS IndexSeq, isc.ORDINAL_POSITION AS columnSeq, iss.IS_VISIBLE AS indexVisibility FROM INFORMATION_SCHEMA.COLUMNS isc INNER JOIN (SELECT NON_UNIQUE, INDEX_NAME, SEQ_IN_INDEX, COLUMN_NAME, IS_VISIBLE FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s') AS iss ON isc.COLUMN_NAME=iss.COLUMN_NAME WHERE isc.TABLE_SCHEMA='%s' AND isc.TABLE_NAME='%s';", my.Schema, my.Table, my.Schema, my.Table) - vlog = fmt.Sprintf("(%d) [%s] Generate a sql statement to query the index statistics of table %s.%s under the %s database.sql messige is {%s}", logThreadSeq, Event, my.Schema, my.Table, DBType, strsql) - global.Wlog.Debug(vlog) + query = fmt.Sprintf("SELECT isc.COLUMN_NAME AS columnName, isc.COLUMN_TYPE AS columnType, isc.COLUMN_KEY AS columnKey,isc.EXTRA AS autoIncrement, iss.NON_UNIQUE AS nonUnique, iss.INDEX_NAME AS indexName, iss.SEQ_IN_INDEX AS IndexSeq, isc.ORDINAL_POSITION AS columnSeq, iss.IS_VISIBLE AS indexVisibility FROM INFORMATION_SCHEMA.COLUMNS isc INNER JOIN (SELECT NON_UNIQUE, INDEX_NAME, SEQ_IN_INDEX, COLUMN_NAME, IS_VISIBLE FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s') AS iss ON isc.COLUMN_NAME=iss.COLUMN_NAME WHERE isc.TABLE_SCHEMA='%s' AND isc.TABLE_NAME='%s';", my.Schema, my.Table, my.Schema, my.Table) + logMsg = fmt.Sprintf("(%d) [%s] Generate a sql statement to query the index statistics of table %s.%s under the %s database.sql messige is {%s}", logThreadSeq, Event, my.Schema, my.Table, DBType, query) + global.Wlog.Debug(logMsg) dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return nil, err } tableData, err = dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) if err != nil { return nil, err } - vlog = fmt.Sprintf("(%d) [%s] The index statistics query of table %s.%s under the %s database is completed. index statistics is {%v}", logThreadSeq, Event, my.Schema, my.Table, DBType, tableData) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] The index statistics query of table %s.%s under the %s database is completed. index statistics is {%v}", logThreadSeq, Event, my.Schema, my.Table, DBType, tableData) + global.Wlog.Debug(logMsg) defer dispos.SqlRows.Close() return tableData, err } @@ -74,9 +88,10 @@ func (my *QueryTable) IndexDisposF(queryData []map[string]interface{}, logThread indexName string currIndexName string Event = "E_Index_Filter" + logMsg string ) - vlog = fmt.Sprintf("(%d) [%s] Start to filter the primary key index, unique index, and common index based on the index information of the specified table %s.%s under the %s library", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Start to filter the primary key index, unique index, and common index based on the index information of the specified table %s.%s under the %s library", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) // 用于临时存储每个索引的列顺序 indexColumns := make(map[string]map[string]string) @@ -147,8 +162,8 @@ func (my *QueryTable) IndexDisposF(queryData []map[string]interface{}, logThread } } - vlog = fmt.Sprintf("(%d) [%s] The index information screening of the specified table %s.%s under the %s library is completed", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] The index information screening of the specified table %s.%s under the %s library is completed", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) // 返回四个map:主键索引、唯一索引、普通索引和索引可见性信息 return priIndexColumnMap, nultiseriateIndexColumnMap, multiseriateIndexColumnMap, indexVisibilityMap @@ -163,9 +178,10 @@ func (my *QueryTable) TmpTableIndexColumnSelectDispos(logThreadSeq int64) map[st columnSelect = make(map[string]string) columnName = my.ColumnName Event = "D_Index_Length" + logMsg string ) - vlog = fmt.Sprintf("(%d) [%s] Start to query the length of the query index column in table %s.%s in the specified %s database.", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Start to query the length of the query index column in table %s.%s in the specified %s database.", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) //根据索引列的多少,生成select 列条件,并生成列长度,为判断列是否为null或为空做判断 if len(columnName) == 1 { columnSelect["selectColumnName"] = strings.Join(columnName, "") @@ -188,8 +204,8 @@ func (my *QueryTable) TmpTableIndexColumnSelectDispos(logThreadSeq int64) map[st columnSelect["selectColumnNull"] = strings.Join(cc, "/*column*/") columnSelect["selectColumnEmpty"] = strings.Join(dd, "/*column*/") } - vlog = fmt.Sprintf("(%d) [%s] The length of the query index column of table %s.%s in the %s database is completed.", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] The length of the query index column of table %s.%s in the %s database is completed.", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) return columnSelect } @@ -201,12 +217,15 @@ func (my *QueryTable) TmpTableIndexColumnRowsCount(db *sql.DB, logThreadSeq int6 tmpTableCount uint64 Event = "Q_Index_Table_Count" E string + err error + query string + logMsg string ) - vlog = fmt.Sprintf("(%d) [%s] Start to query the total number of rows in the following table %s.%s of the %s database.", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) - strsql = fmt.Sprintf("SELECT index_name AS INDEX_NAME, column_name AS columnName, cardinality as CARDINALITY FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' AND SEQ_IN_INDEX=1", my.Schema, my.Table) + logMsg = fmt.Sprintf("(%d) [%s] Start to query the total number of rows in the following table %s.%s of the %s database.", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) + query = fmt.Sprintf("SELECT index_name AS INDEX_NAME, column_name AS columnName, cardinality as CARDINALITY FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' AND SEQ_IN_INDEX=1", my.Schema, my.Table) dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return 0, err } if B, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}); err != nil { @@ -229,11 +248,11 @@ func (my *QueryTable) TmpTableIndexColumnRowsCount(db *sql.DB, logThreadSeq int6 } } if E != "" { - strsql = fmt.Sprintf("SELECT SUM(a.count) AS sum FROM (SELECT COUNT(1) AS count FROM `%s`.`%s` GROUP BY %s) a", my.Schema, my.Table, E) + query = fmt.Sprintf("SELECT SUM(a.count) AS sum FROM (SELECT COUNT(1) AS count FROM `%s`.`%s` GROUP BY %s) a", my.Schema, my.Table, E) } else { - strsql = fmt.Sprintf("SELECT COUNT(1) AS sum FROM `%s`.`%s`", my.Schema, my.Table) + query = fmt.Sprintf("SELECT COUNT(1) AS sum FROM `%s`.`%s`", my.Schema, my.Table) } - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return 0, err } if tableData, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}); err != nil { @@ -244,8 +263,8 @@ func (my *QueryTable) TmpTableIndexColumnRowsCount(db *sql.DB, logThreadSeq int6 tmpTableCount += d } } - vlog = fmt.Sprintf("(%d) [%s] The query of the total number of rows in the following table %s.%s of the %s database is completed.", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] The query of the total number of rows in the following table %s.%s of the %s database is completed.", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) defer dispos.SqlRows.Close() return tmpTableCount, nil } @@ -364,8 +383,7 @@ func (my *QueryTable) getTableColumnSet(db *sql.DB, logThreadSeq int64) (map[str cacheMutex.Lock() tableColumnSetGlobalCache[cacheKey] = colSet cacheMutex.Unlock() - vlog = fmt.Sprintf("(%d) [Check_Column_Exists] Loaded table column cache for %s.%s, columns=%d", logThreadSeq, my.Schema, my.Table, len(colSet)) - global.Wlog.Debug(vlog) + global.Wlog.Debug(fmt.Sprintf("(%d) [Check_Column_Exists] Loaded table column cache for %s.%s, columns=%d", logThreadSeq, my.Schema, my.Table, len(colSet))) return colSet, nil } @@ -555,9 +573,10 @@ func (my QueryTable) TmpTableColumnGroupDataDispos(db *sql.DB, where string, col var ( whereExist string Event = "Q_Index_ColumnData" + logMsg string ) - vlog = fmt.Sprintf("(%d) [%s] Start to query the index column data of the following table %s.%s in the %s database and de-reorder the data.", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Start to query the index column data of the following table %s.%s in the %s database and de-reorder the data.", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) // 先检查表中是否存在该列 columnExists, err := my.checkColumnExists(db, columnName, logThreadSeq) @@ -565,8 +584,8 @@ func (my QueryTable) TmpTableColumnGroupDataDispos(db *sql.DB, where string, col return nil, err } if !columnExists { - vlog = fmt.Sprintf("(%d) [%s] Column %s does not exist in table %s.%s, skipping query to avoid errors.", logThreadSeq, Event, columnName, my.Schema, my.Table) - global.Wlog.Warn(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Column %s does not exist in table %s.%s, skipping query to avoid errors.", logThreadSeq, Event, columnName, my.Schema, my.Table) + global.Wlog.Warn(logMsg) // 返回空的channel表示跳过该列的查询 emptyChan := make(chan map[string]interface{}) close(emptyChan) @@ -593,19 +612,19 @@ func (my QueryTable) TmpTableColumnGroupDataDispos(db *sql.DB, where string, col leadingIndexName := my.getLeadingIndexName(db, columnName) groupQueryForceIndexName := my.chooseGroupByForceIndex(db, columnName, where) if groupQueryForceIndexName != "" { - vlog = fmt.Sprintf("(%d) [%s] Force index chosen for grouped query on %s.%s(%s): %s", + logMsg = fmt.Sprintf("(%d) [%s] Force index chosen for grouped query on %s.%s(%s): %s", logThreadSeq, Event, my.Schema, my.Table, columnName, groupQueryForceIndexName) - global.Wlog.Debug(vlog) + global.Wlog.Debug(logMsg) } if where != "" && groupQueryForceIndexName != "" && !strings.EqualFold(groupQueryForceIndexName, leadingIndexName) { - vlog = fmt.Sprintf("(%d) [%s] Use WHERE-driven force index (%s) instead of GROUP-column index (%s) for %s.%s", + logMsg = fmt.Sprintf("(%d) [%s] Use WHERE-driven force index (%s) instead of GROUP-column index (%s) for %s.%s", logThreadSeq, Event, groupQueryForceIndexName, leadingIndexName, my.Schema, my.Table) - global.Wlog.Info(vlog) + global.Wlog.Info(logMsg) } if where != "" && groupQueryForceIndexName == "" { - vlog = fmt.Sprintf("(%d) [%s] No suitable force index detected from WHERE columns for %s.%s(%s), fallback to optimizer", + logMsg = fmt.Sprintf("(%d) [%s] No suitable force index detected from WHERE columns for %s.%s(%s), fallback to optimizer", logThreadSeq, Event, my.Schema, my.Table, columnName) - global.Wlog.Debug(vlog) + global.Wlog.Debug(logMsg) } if groupQueryForceIndexName != "" { forceIndexClause = fmt.Sprintf(" FORCE INDEX (`%s`)", groupQueryForceIndexName) @@ -613,8 +632,8 @@ func (my QueryTable) TmpTableColumnGroupDataDispos(db *sql.DB, where string, col useFastGroupMode := false if shouldUseFastGroupMode(where, my.getTableRowsEstimate(db), my.getLeadingIndexCardinality(db, leadingIndexName, columnName), leadingIndexName != "") { useFastGroupMode = true - vlog = fmt.Sprintf("(%d) [%s] Fast group mode enabled for %s.%s column %s", logThreadSeq, Event, my.Schema, my.Table, columnName) - global.Wlog.Info(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Fast group mode enabled for %s.%s column %s", logThreadSeq, Event, my.Schema, my.Table, columnName) + global.Wlog.Info(logMsg) } accurateForceSQL := fmt.Sprintf("SELECT %s AS columnName, COUNT(1) AS count FROM `%s`.`%s`%s %s GROUP BY %s ORDER BY %s", columnName, my.Schema, my.Table, forceIndexClause, whereExist, columnName, columnName) @@ -646,8 +665,8 @@ func (my QueryTable) TmpTableColumnGroupDataDispos(db *sql.DB, where string, col } } C := dispos.DataChanDispos() - vlog = fmt.Sprintf("(%d) [%s] The index column data query of the following table %s.%s in the %s database is completed.", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] The index column data query of the following table %s.%s in the %s database is completed.", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) return C, nil } @@ -680,34 +699,35 @@ MySQL 查询表的统计信息中行数 */ func (my *QueryTable) TableRows(db *sql.DB, logThreadSeq int64) (uint64, error) { var ( - Event = "Q_I_S_tableRows" + Event = "Q_I_S_tableRows" + logMsg string ) // 确保Schema不为空 if my.Schema == "" { - vlog := fmt.Sprintf("(%d) [%s] Schema is empty for table %s, cannot get row count. Please specify a schema.", logThreadSeq, Event, my.Table) - global.Wlog.Error(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Schema is empty for table %s, cannot get row count. Please specify a schema.", logThreadSeq, Event, my.Table) + global.Wlog.Error(logMsg) return 0, fmt.Errorf("schema is empty for table %s", my.Table) } - vlog := fmt.Sprintf("(%d) [%s] Start querying row count for table %s.%s in the %s database", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Start querying row count for table %s.%s in the %s database", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) // Prefer INFORMATION_SCHEMA.TABLES row estimate and avoid heavy COUNT(*) full scans. if tableRows := my.getTableRowsEstimate(db); tableRows > 0 { - vlog = fmt.Sprintf("(%d) [%s] TABLE_ROWS estimate for %s.%s: %d", logThreadSeq, Event, my.Schema, my.Table, tableRows) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] TABLE_ROWS estimate for %s.%s: %d", logThreadSeq, Event, my.Schema, my.Table, tableRows) + global.Wlog.Debug(logMsg) return tableRows, nil } // Fallback to max leading index cardinality estimate. if cardRows := my.getMaxLeadingIndexCardinality(db); cardRows > 0 { - vlog = fmt.Sprintf("(%d) [%s] MAX(CARDINALITY) estimate for %s.%s: %d", logThreadSeq, Event, my.Schema, my.Table, cardRows) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] MAX(CARDINALITY) estimate for %s.%s: %d", logThreadSeq, Event, my.Schema, my.Table, cardRows) + global.Wlog.Debug(logMsg) return cardRows, nil } - vlog = fmt.Sprintf("(%d) [%s] Row estimate unavailable for %s.%s, returning 0 without COUNT(*) fallback", logThreadSeq, Event, my.Schema, my.Table) - global.Wlog.Warn(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Row estimate unavailable for %s.%s, returning 0 without COUNT(*) fallback", logThreadSeq, Event, my.Schema, my.Table) + global.Wlog.Warn(logMsg) return 0, nil } @@ -745,15 +765,18 @@ func (my *QueryTable) NoIndexGeneratingQueryCriteria(db *sql.DB, beginSeq uint64 var ( columnNameSeq []string Event = "Q_table_Data" + err error + query string + logMsg string ) // 如果没有列信息,使用"*"查询所有列 if len(my.TableColumn) == 0 { - strsql := fmt.Sprintf("SELECT * FROM `%s`.`%s` LIMIT %d,%d", my.Schema, my.Table, beginSeq, chanrowCount) + query = fmt.Sprintf("SELECT * FROM `%s`.`%s` LIMIT %d,%d", my.Schema, my.Table, beginSeq, chanrowCount) dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { - vlog = fmt.Sprintf("(%d) [%s] Failed to execute query: %s, Error: %v", logThreadSeq, Event, strsql, err) - global.Wlog.Error(vlog) + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { + logMsg = fmt.Sprintf("(%d) [%s] Failed to execute query: %s, Error: %v", logThreadSeq, Event, query, err) + global.Wlog.Error(logMsg) // 记录跳过的表信息到全局变量中 global.AddSkippedTable(my.Schema, my.Table, "data", fmt.Sprintf("query failed: %v", err)) return "", err @@ -782,9 +805,9 @@ func (my *QueryTable) NoIndexGeneratingQueryCriteria(db *sql.DB, beginSeq uint64 columnNameSeq = append(columnNameSeq, "*") } - strsql = fmt.Sprintf("SELECT %s FROM `%s`.`%s` LIMIT %d,%d", strings.Join(columnNameSeq, ","), my.Schema, my.Table, beginSeq, chanrowCount) + query = fmt.Sprintf("SELECT %s FROM `%s`.`%s` LIMIT %d,%d", strings.Join(columnNameSeq, ","), my.Schema, my.Table, beginSeq, chanrowCount) dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return "", err } tableData, err := dispos.DataRowsDispos([]string{}) @@ -802,6 +825,8 @@ func (my QueryTable) GeneratingQueryCriteria(db *sql.DB, logThreadSeq int64) (st var ( Event = "Q_Table_Data" columnNameSeq []string + err error + logMsg string ) //vlog = fmt.Sprintf("(%d) [%s] Start to query the segmented data of the following table %s.%s in the %s database through the where condition.", logThreadSeq, Event, my.Schema, my.Table, DBType) //global.Wlog.Debug(vlog) @@ -961,8 +986,7 @@ func (my QueryTable) GeneratingQueryCriteria(db *sql.DB, logThreadSeq int64) (st // 如果存在无效列,记录并返回错误 if hasInvalidColumn { - vlog = fmt.Sprintf("(%d) [%s] Columns '%v' in WHERE clause do not exist in table %s.%s", logThreadSeq, Event, invalidColumns, my.Schema, my.Table) - global.Wlog.Warn(vlog) + global.Wlog.Warn(fmt.Sprintf("(%d) [%s] Columns '%v' in WHERE clause do not exist in table %s.%s", logThreadSeq, Event, invalidColumns, my.Schema, my.Table)) // 记录跳过的表信息到全局变量中 global.AddSkippedTable(my.Schema, my.Table, "data", fmt.Sprintf("invalid columns: %v", invalidColumns)) return "", fmt.Errorf("invalid columns in WHERE clause: %v", invalidColumns) @@ -971,8 +995,7 @@ func (my QueryTable) GeneratingQueryCriteria(db *sql.DB, logThreadSeq int64) (st // 获取表的所有列名 if len(my.TableColumn) == 0 { - // Generate cache key in format: schema.table - cacheKey := fmt.Sprintf("%s.%s", my.Schema, my.Table) + cacheKey := scopedTableCacheKey(db, my.Schema, my.Table, "allColumns") // Check if result is already in global cache cacheMutex.RLock() @@ -989,8 +1012,8 @@ func (my QueryTable) GeneratingQueryCriteria(db *sql.DB, logThreadSeq int64) (st strsql := fmt.Sprintf("SELECT COLUMN_NAME, COLUMN_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' ORDER BY ORDINAL_POSITION", my.Schema, my.Table) dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { - vlog = fmt.Sprintf("(%d) [%s] Failed to execute query: %s, Error: %v", logThreadSeq, Event, strsql, err) - global.Wlog.Error(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Failed to execute query: %s, Error: %v", logThreadSeq, Event, strsql, err) + global.Wlog.Error(logMsg) // 记录跳过的表信息到全局变量中 global.AddSkippedTable(my.Schema, my.Table, "data", fmt.Sprintf("query failed: %v", err)) return "", err @@ -1036,7 +1059,7 @@ func (my QueryTable) GeneratingQueryCriteria(db *sql.DB, logThreadSeq int64) (st tmpcolumnName = fmt.Sprintf("`%s`", columnName) // 定义缓存键,在所有条件分支中都可以使用 - cacheKey := fmt.Sprintf("%s.%s.%s", my.Schema, my.Table, columnName) + cacheKey := scopedColumnCacheKey(db, my.Schema, my.Table, columnName) // 查找当前列的数据类型 var dataType string @@ -1129,7 +1152,8 @@ func (my QueryTable) GeneratingQueryCriteria(db *sql.DB, logThreadSeq int64) (st return "", err } defer dispos.SqlRows.Close() - vlog = fmt.Sprintf("(%d) [%s] Complete the data in the following table %s.%s of the %s database.", logThreadSeq, Event, my.Schema, my.Table, DBType) + logMsg = fmt.Sprintf("(%d) [%s] Complete the data in the following table %s.%s of the %s database.", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) return strings.Join(tableData, "/*go actions rowData*/"), nil } @@ -1204,14 +1228,14 @@ func (my *QueryTable) GeneratingQuerySql(db *sql.DB, logThreadSeq int64) (string columnNameSeq []string Event = "E_Table_SQL" selectSql string + logMsg string ) //vlog = fmt.Sprintf("(%d) [%s] Start to generate the data query sql of table %s.%s in the %s database", logThreadSeq, Event, my.Schema, my.Table, DBType) //global.Wlog.Debug(vlog) // 如果TableColumn为空,从数据库查询获取列信息 if len(my.TableColumn) == 0 { - // Generate cache key in format: schema.table - cacheKey := fmt.Sprintf("%s.%s", my.Schema, my.Table) + cacheKey := scopedTableCacheKey(db, my.Schema, my.Table, "tableColumn") // Check if complete table column information is already in global cache cacheMutex.RLock() @@ -1223,15 +1247,15 @@ func (my *QueryTable) GeneratingQuerySql(db *sql.DB, logThreadSeq int64) (string my.TableColumn = cachedTableColumn } else { cacheMutex.RUnlock() - vlog = fmt.Sprintf("(%d) [%s] TableColumn is empty, querying column info from database for table %s.%s", logThreadSeq, Event, my.Schema, my.Table) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] TableColumn is empty, querying column info from database for table %s.%s", logThreadSeq, Event, my.Schema, my.Table) + global.Wlog.Debug(logMsg) // 查询表的所有列信息 query := fmt.Sprintf("SELECT COLUMN_NAME, COLUMN_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s'", my.Schema, my.Table) rows, err := db.Query(query) if err != nil { - vlog = fmt.Sprintf("(%d) [%s] Failed to query column info for table %s.%s: %v", logThreadSeq, Event, my.Schema, my.Table, err) - global.Wlog.Error(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Failed to query column info for table %s.%s: %v", logThreadSeq, Event, my.Schema, my.Table, err) + global.Wlog.Error(logMsg) return "", err } defer rows.Close() @@ -1241,8 +1265,8 @@ func (my *QueryTable) GeneratingQuerySql(db *sql.DB, logThreadSeq int64) (string for rows.Next() { var columnName, dataType string if err := rows.Scan(&columnName, &dataType); err != nil { - vlog = fmt.Sprintf("(%d) [%s] Failed to scan column info for table %s.%s: %v", logThreadSeq, Event, my.Schema, my.Table, err) - global.Wlog.Error(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Failed to scan column info for table %s.%s: %v", logThreadSeq, Event, my.Schema, my.Table, err) + global.Wlog.Error(logMsg) return "", err } tableColumnEntry := map[string]string{ @@ -1251,7 +1275,7 @@ func (my *QueryTable) GeneratingQuerySql(db *sql.DB, logThreadSeq int64) (string } cachedTableColumn = append(cachedTableColumn, tableColumnEntry) // Cache individual column mappings as well - columnCacheKey := fmt.Sprintf("%s.%s.%s", my.Schema, my.Table, columnName) + columnCacheKey := scopedColumnCacheKey(db, my.Schema, my.Table, columnName) cacheMutex.Lock() columnDataTypeGlobalCache[columnCacheKey] = dataType cacheMutex.Unlock() diff --git a/MySQL/my_scheme_table_column.go b/MySQL/my_scheme_table_column.go index 494e83f3cb1ccb09f5442bb38f7f12efa63408c5..f853e5be9acb0b008f12d80cef993969ef5bc66e 100644 --- a/MySQL/my_scheme_table_column.go +++ b/MySQL/my_scheme_table_column.go @@ -34,9 +34,6 @@ type QueryTable struct { var ( DBType = "MySQL" - vlog string - err error - strsql string // Global caching for expensive INFORMATION_SCHEMA queries // These caches are shared across all QueryTable instances @@ -131,15 +128,18 @@ MySQL 获取对应的库表信息,排除'information_Schema','performance_Sche */ func (my *QueryTable) DatabaseNameList(db *sql.DB, logThreadSeq int64) (map[string]int, error) { var ( - A = make(map[string]int) - Event = "Q_Schema_Table_List" + A = make(map[string]int) + Event = "Q_Schema_Table_List" + query string + logMsg string + err error ) excludeSchema := fmt.Sprintf("'information_Schema','performance_Schema','sys','mysql'") - strsql = fmt.Sprintf("SELECT TABLE_SCHEMA AS databaseName, TABLE_NAME AS tableName FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA NOT IN (%s);", excludeSchema) - vlog = fmt.Sprintf("(%d) [%s] Start to query the metadata of the %s database and obtain library and table information. SQL: {%s}", logThreadSeq, Event, DBType, strsql) - global.Wlog.Debug(vlog) + query = fmt.Sprintf("SELECT TABLE_SCHEMA AS databaseName, TABLE_NAME AS tableName FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA NOT IN (%s);", excludeSchema) + logMsg = fmt.Sprintf("(%d) [%s] Start to query the metadata of the %s database and obtain library and table information. SQL: {%s}", logThreadSeq, Event, DBType, query) + global.Wlog.Debug(logMsg) dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return nil, err } tableData, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) @@ -156,8 +156,8 @@ func (my *QueryTable) DatabaseNameList(db *sql.DB, logThreadSeq int64) (map[stri ga = fmt.Sprintf("%v/*schema&table*/%v", gd, gt) A[ga]++ } - vlog = fmt.Sprintf("(%d) [%s] Complete the library and table information query of the %s database.", logThreadSeq, Event, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Complete the library and table information query of the %s database.", logThreadSeq, Event, DBType) + global.Wlog.Debug(logMsg) defer dispos.SqlRows.Close() return A, nil } @@ -167,23 +167,24 @@ MySQL 通过查询表的元数据信息获取列名 */ func (my *QueryTable) TableColumnName(db *sql.DB, logThreadSeq int64) ([]map[string]interface{}, error) { var ( - Event = "Q_table_columns" + Event = "Q_table_columns" + query string + logMsg string + err error ) - vlog = fmt.Sprintf("(%d) [%s] Start querying the metadata information of table %s.%s in the %s database and get all the column names", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) - strsql = fmt.Sprintf("SELECT COLUMN_NAME AS columnName, COLUMN_TYPE AS columnType, IS_NULLABLE AS isNull, CHARACTER_SET_NAME AS charset, COLLATION_NAME AS collationName, COLUMN_COMMENT AS columnComment, COLUMN_DEFAULT AS columnDefault, EXTRA AS extra FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' ORDER BY ORDINAL_POSITION", my.Schema, my.Table) + logMsg = fmt.Sprintf("(%d) [%s] Start querying the metadata information of table %s.%s in the %s database and get all the column names", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) + query = fmt.Sprintf("SELECT COLUMN_NAME AS columnName, COLUMN_TYPE AS columnType, IS_NULLABLE AS isNull, CHARACTER_SET_NAME AS charset, COLLATION_NAME AS collationName, COLUMN_COMMENT AS columnComment, COLUMN_DEFAULT AS columnDefault, EXTRA AS extra FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' ORDER BY ORDINAL_POSITION", my.Schema, my.Table) dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { - if err != nil { - return nil, err - } + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { + return nil, err } tableData, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) if err != nil { return nil, err } - vlog = fmt.Sprintf("(%d) [%s] Complete the acquisition of all column names in the following table %s.%s of the %s database.", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Complete the acquisition of all column names in the following table %s.%s of the %s database.", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) defer dispos.SqlRows.Close() return tableData, err } @@ -193,13 +194,16 @@ MySQL 获取表的注释信息 */ func (my *QueryTable) TableComment(db *sql.DB, logThreadSeq int64) (string, error) { var ( - Event = "Q_Table_Comment" + Event = "Q_Table_Comment" + query string + logMsg string + err error ) - vlog = fmt.Sprintf("(%d) [%s] Start to query the comment of table %s.%s in the %s database", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) - strsql = fmt.Sprintf("SELECT TABLE_COMMENT AS tableComment FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s';", my.Schema, my.Table) + logMsg = fmt.Sprintf("(%d) [%s] Start to query the comment of table %s.%s in the %s database", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) + query = fmt.Sprintf("SELECT TABLE_COMMENT AS tableComment FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s';", my.Schema, my.Table) dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return "", err } tableData, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) @@ -212,8 +216,8 @@ func (my *QueryTable) TableComment(db *sql.DB, logThreadSeq int64) (string, erro comment = fmt.Sprintf("%s", tableData[0]["tableComment"]) } - vlog = fmt.Sprintf("(%d) [%s] Complete the comment query of table %s.%s in the %s database: %s", logThreadSeq, Event, my.Schema, my.Table, DBType, comment) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Complete the comment query of table %s.%s in the %s database: %s", logThreadSeq, Event, my.Schema, my.Table, DBType, comment) + global.Wlog.Debug(logMsg) defer dispos.SqlRows.Close() return comment, nil } @@ -226,11 +230,12 @@ func (my *QueryTable) DatabaseVersion(db *sql.DB, logThreadSeq int64) (string, e version string rows *sql.Rows Event = "Q_M_Versions" + query string + logMsg string + err error ) - // Use database connection address as cache key - // This ensures different connections to different MySQL instances get their own cached version - cacheKey := fmt.Sprintf("%p", db) + cacheKey := getDBScopeKey(db) // Try to get cached version first cacheMutex.RLock() @@ -243,13 +248,12 @@ func (my *QueryTable) DatabaseVersion(db *sql.DB, logThreadSeq int64) (string, e cacheMutex.RUnlock() // Cache miss, execute the query - vlog = fmt.Sprintf("(%d) [%s] Start querying the version information of the %s database", logThreadSeq, Event, DBType) - strsql = fmt.Sprintf("SELECT VERSION() AS VERSION") + logMsg = fmt.Sprintf("(%d) [%s] Start querying the version information of the %s database", logThreadSeq, Event, DBType) + global.Wlog.Debug(logMsg) + query = fmt.Sprintf("SELECT VERSION() AS VERSION") dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if rows, err = dispos.DBSQLforExec(strsql); err != nil { - if err != nil { - return "", err - } + if rows, err = dispos.DBSQLforExec(query); err != nil { + return "", err } defer rows.Close() @@ -289,6 +293,9 @@ func (my *QueryTable) GlobalAccessPri(db *sql.DB, logThreadSeq int64) (bool, err currentUser string rows *sql.Rows Event = "Q_Table_Global_Access_Pri" + query string + logMsg string + err error ) //要确定MySQL的版本,5.7和8.0 if version, err = my.DatabaseVersion(db, logThreadSeq); err != nil { @@ -303,16 +310,16 @@ func (my *QueryTable) GlobalAccessPri(db *sql.DB, logThreadSeq int64) (bool, err //globalPri["FLUSH_TABLES"] = 0 globalPri["REPLICATION CLIENT"] = 0 - vlog = fmt.Sprintf("(%d) [%s] The permissions that the current %s DB needs to check is message {%v}, to check it...", logThreadSeq, Event, DBType, globalPri) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] The permissions that the current %s DB needs to check is message {%v}, to check it...", logThreadSeq, Event, DBType, globalPri) + global.Wlog.Debug(logMsg) var globalPriS []string for k, _ := range globalPri { globalPriS = append(globalPriS, k) } //获取当前匹配的用户 - strsql = fmt.Sprintf("SELECT CURRENT_USER() AS user;") + query = fmt.Sprintf("SELECT CURRENT_USER() AS user;") dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if rows, err = dispos.DBSQLforExec(strsql); err != nil { + if rows, err = dispos.DBSQLforExec(query); err != nil { return false, err } dispos.SqlRows = rows @@ -322,14 +329,14 @@ func (my *QueryTable) GlobalAccessPri(db *sql.DB, logThreadSeq int64) (bool, err } currentUser = fmt.Sprintf("'%s'", strings.ReplaceAll(fmt.Sprintf("%s", CC[0]["user"]), "@", "'@'")) - vlog = fmt.Sprintf("(%d) [%s] The user account corresponding to the currently connected %s DB user is message {%s}", logThreadSeq, Event, DBType, currentUser) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] The user account corresponding to the currently connected %s DB user is message {%s}", logThreadSeq, Event, DBType, currentUser) + global.Wlog.Debug(logMsg) //查找全局权限 类似于grant all privileges on *.* 或 grant select on *.* - vlog = fmt.Sprintf("(%d) [%s] Query the current %s DB global dynamic grants permission, to query it...", logThreadSeq, Event, DBType) - global.Wlog.Debug(vlog) - strsql = fmt.Sprintf("SELECT PRIVILEGE_TYPE AS privileges FROM INFORMATION_SCHEMA.USER_PRIVILEGES WHERE PRIVILEGE_TYPE IN('%s') AND GRANTEE=\"%s\";", strings.Join(globalPriS, "','"), currentUser) - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + logMsg = fmt.Sprintf("(%d) [%s] Query the current %s DB global dynamic grants permission, to query it...", logThreadSeq, Event, DBType) + global.Wlog.Debug(logMsg) + query = fmt.Sprintf("SELECT PRIVILEGE_TYPE AS privileges FROM INFORMATION_SCHEMA.USER_PRIVILEGES WHERE PRIVILEGE_TYPE IN('%s') AND GRANTEE=\"%s\";", strings.Join(globalPriS, "','"), currentUser) + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return false, err } globalDynamic, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) @@ -343,18 +350,18 @@ func (my *QueryTable) GlobalAccessPri(db *sql.DB, logThreadSeq int64) (bool, err } } if len(globalPri) == 0 { - vlog = fmt.Sprintf("(%d) [%s] The current global access user with permission to connect to %s DB is normal and can be verified normally...", logThreadSeq, Event, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] The current global access user with permission to connect to %s DB is normal and can be verified normally...", logThreadSeq, Event, DBType) + global.Wlog.Debug(logMsg) return true, nil } if _, ok := globalPri["SESSION_VARIABLES_ADMIN"]; ok && strings.HasPrefix(version, "8.") { - vlog = fmt.Sprintf("(%d) [%s] The current user connecting to %s DB lacks \"session_variables_admin\" permission, and the check table is empty", logThreadSeq, Event, DBType) - global.Wlog.Error(vlog) + logMsg = fmt.Sprintf("(%d) [%s] The current user connecting to %s DB lacks \"session_variables_admin\" permission, and the check table is empty", logThreadSeq, Event, DBType) + global.Wlog.Error(logMsg) return false, nil } if _, ok := globalPri["REPLICATION CLIENT"]; ok { - vlog = fmt.Sprintf("(%d) [%s] The current user connecting to %s DB lacks \"REPLICATION CLIENT\" permission, and the check table is empty", logThreadSeq, Event, DBType) - global.Wlog.Error(vlog) + logMsg = fmt.Sprintf("(%d) [%s] The current user connecting to %s DB lacks \"REPLICATION CLIENT\" permission, and the check table is empty", logThreadSeq, Event, DBType) + global.Wlog.Error(logMsg) return false, nil } //if _, ok := globalPri["FLUSH_TABLES"]; ok { @@ -377,6 +384,9 @@ func (my *QueryTable) TableAccessPriCheck(db *sql.DB, checkTableList []string, d PT, abPT = make(map[string]int), make(map[string]int) Event = "Q_Table_Access_Pri" globalPriS []string + query string + logMsg string + err error ) //针对要校验的库做去重(库级别的) @@ -389,8 +399,8 @@ func (my *QueryTable) TableAccessPriCheck(db *sql.DB, checkTableList []string, d for k, _ := range globalPri { globalPriS = append(globalPriS, k) } - vlog = fmt.Sprintf("(%d) [%s] The permissions that the current %s DB needs to check is message {%v},check table list is {%v}. to check it...", logThreadSeq, Event, DBType, globalPri, newCheckTableList) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] The permissions that the current %s DB needs to check is message {%v},check table list is {%v}. to check it...", logThreadSeq, Event, DBType, globalPri, newCheckTableList) + global.Wlog.Debug(logMsg) //校验库.表由切片改为map for _, AA := range checkTableList { @@ -409,9 +419,9 @@ func (my *QueryTable) TableAccessPriCheck(db *sql.DB, checkTableList []string, d } } //获取当前匹配的用户 - strsql = fmt.Sprintf("SELECT CURRENT_USER() AS user;") + query = fmt.Sprintf("SELECT CURRENT_USER() AS user;") dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return nil, err } CC, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) @@ -420,10 +430,10 @@ func (my *QueryTable) TableAccessPriCheck(db *sql.DB, checkTableList []string, d } currentUser = fmt.Sprintf("'%s'", strings.ReplaceAll(fmt.Sprintf("%s", CC[0]["user"]), "@", "'@'")) //查找全局权限 类似于grant all privileges on *.* 或 grant select on *.* - vlog = fmt.Sprintf("(%d) [%s] Query the current %s DB global dynamic grants permission, to query it...", logThreadSeq, Event, DBType) - global.Wlog.Debug(vlog) - strsql = fmt.Sprintf("SELECT PRIVILEGE_TYPE AS privileges FROM INFORMATION_SCHEMA.USER_PRIVILEGES WHERE PRIVILEGE_TYPE IN('%s') AND GRANTEE=\"%s\";", strings.Join(globalPriS, "','"), currentUser) - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + logMsg = fmt.Sprintf("(%d) [%s] Query the current %s DB global dynamic grants permission, to query it...", logThreadSeq, Event, DBType) + global.Wlog.Debug(logMsg) + query = fmt.Sprintf("SELECT PRIVILEGE_TYPE AS privileges FROM INFORMATION_SCHEMA.USER_PRIVILEGES WHERE PRIVILEGE_TYPE IN('%s') AND GRANTEE=\"%s\";", strings.Join(globalPriS, "','"), currentUser) + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return nil, err } globalDynamic, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) @@ -437,20 +447,20 @@ func (my *QueryTable) TableAccessPriCheck(db *sql.DB, checkTableList []string, d } } if len(globalPri) == 0 { - vlog = fmt.Sprintf("(%d) [%s] The %s DB table information that meets the permissions and needs to be verified is {%v}...", logThreadSeq, Event, DBType, newCheckTableList) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] The %s DB table information that meets the permissions and needs to be verified is {%v}...", logThreadSeq, Event, DBType, newCheckTableList) + global.Wlog.Debug(logMsg) return newCheckTableList, nil } //查询当前库的权限 //类似于grant all privileges on pcms.* 或 grant select on pcms.* - vlog = fmt.Sprintf("(%d) [%s] Query the current %s DB global dynamic schema permission, to query it...", logThreadSeq, Event, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Query the current %s DB global dynamic schema permission, to query it...", logThreadSeq, Event, DBType) + global.Wlog.Debug(logMsg) for AC, _ := range A { var cc []string var intseq int - strsql = fmt.Sprintf("SELECT TABLE_SCHEMA AS databaseName, PRIVILEGE_TYPE AS privileges FROM INFORMATION_SCHEMA.SCHEMA_PRIVILEGES WHERE PRIVILEGE_TYPE IN('%s') AND TABLE_SCHEMA='%s' AND GRANTEE=\"%s\";", strings.Join(globalPriS, "','"), AC, currentUser) - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + query = fmt.Sprintf("SELECT TABLE_SCHEMA AS databaseName, PRIVILEGE_TYPE AS privileges FROM INFORMATION_SCHEMA.SCHEMA_PRIVILEGES WHERE PRIVILEGE_TYPE IN('%s') AND TABLE_SCHEMA='%s' AND GRANTEE=\"%s\";", strings.Join(globalPriS, "','"), AC, currentUser) + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return nil, err } schemaPri, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) @@ -473,16 +483,16 @@ func (my *QueryTable) TableAccessPriCheck(db *sql.DB, checkTableList []string, d } } if len(A) == 0 { - vlog = fmt.Sprintf("(%d) [%s] The %s DB table information that meets the permissions and needs to be verified is {%v}...", logThreadSeq, Event, DBType, newCheckTableList) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] The %s DB table information that meets the permissions and needs to be verified is {%v}...", logThreadSeq, Event, DBType, newCheckTableList) + global.Wlog.Debug(logMsg) return newCheckTableList, nil } //vlog = fmt.Sprintf("(%d) MySQL DB library level permissions are not satisfied with {%v}", logThreadSeq, A) //global.Wlog.Debug(vlog) //查询当前表的权限 //类似于grant all privileges on pcms.a 或 grant select on pcms.a - vlog = fmt.Sprintf("(%d) [%s] Query the current %s DB global dynamic table permission, to query it...", logThreadSeq, Event, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Query the current %s DB global dynamic table permission, to query it...", logThreadSeq, Event, DBType) + global.Wlog.Debug(logMsg) //遍历没有schema pri权限的剩余库 var DM = make(map[string]int) for _, D := range checkTableList { @@ -493,8 +503,8 @@ func (my *QueryTable) TableAccessPriCheck(db *sql.DB, checkTableList []string, d } for B, _ := range A { //按照每个库,查询table pri权限 - strsql = fmt.Sprintf("SELECT TABLE_NAME AS tableName, PRIVILEGE_TYPE AS privileges FROM INFORMATION_SCHEMA.TABLE_PRIVILEGES WHERE PRIVILEGE_TYPE IN('%s') AND TABLE_SCHEMA='%s' AND GRANTEE=\"%s\";", strings.Join(globalPriS, "','"), B, currentUser) - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + query = fmt.Sprintf("SELECT TABLE_NAME AS tableName, PRIVILEGE_TYPE AS privileges FROM INFORMATION_SCHEMA.TABLE_PRIVILEGES WHERE PRIVILEGE_TYPE IN('%s') AND TABLE_SCHEMA='%s' AND GRANTEE=\"%s\";", strings.Join(globalPriS, "','"), B, currentUser) + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return nil, err } tablePri, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) @@ -535,8 +545,8 @@ func (my *QueryTable) TableAccessPriCheck(db *sql.DB, checkTableList []string, d } } } - vlog = fmt.Sprintf("(%d) [%s] The %s DB table information that needs to be verified to meet the permissions is {%v}, and the information that is not satisfied is {%v}...", logThreadSeq, Event, DBType, PT, abPT) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] The %s DB table information that needs to be verified to meet the permissions is {%v}, and the information that is not satisfied is {%v}...", logThreadSeq, Event, DBType, PT, abPT) + global.Wlog.Debug(logMsg) return PT, nil } @@ -545,29 +555,30 @@ MySQL 获取校验表的列信息,包含列名,列序号,列类型 */ func (my *QueryTable) TableAllColumn(db *sql.DB, logThreadSeq int64) ([]map[string]interface{}, error) { var ( - //sqlStr string - //rows *sql.Rows - Event = "Q_Table_Column_Metadata" + Event = "Q_Table_Column_Metadata" + err error + query string + logMsg string + cacheKey string ) - // Generate cache key in format: schema.table - cacheKey := fmt.Sprintf("%s.%s", my.Schema, my.Table) + cacheKey = scopedTableCacheKey(db, my.Schema, my.Table, "tableAllColumn") // Check if result is already in global cache cacheMutex.RLock() if cachedTableAllColumn, ok := tableAllColumnGlobalCache[cacheKey]; ok { cacheMutex.RUnlock() - vlog := fmt.Sprintf("(%d) [%s] Using cached TableAllColumn information for table %s.%s", logThreadSeq, Event, my.Schema, my.Table) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Using cached TableAllColumn information for table %s.%s", logThreadSeq, Event, my.Schema, my.Table) + global.Wlog.Debug(logMsg) return cachedTableAllColumn, nil } cacheMutex.RUnlock() - vlog = fmt.Sprintf("(%d) [%s] Start to query the metadata of all the columns of table %s.%s in the %s database", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) - strsql = fmt.Sprintf("SELECT COLUMN_NAME AS columnName, COLUMN_TYPE AS dataType, ORDINAL_POSITION AS columnSeq, IS_NULLABLE AS isNull, COLUMN_COMMENT AS columnComment FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' ORDER BY ORDINAL_POSITION;", my.Schema, my.Table) + logMsg = fmt.Sprintf("(%d) [%s] Start to query the metadata of all the columns of table %s.%s in the %s database", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) + query = fmt.Sprintf("SELECT COLUMN_NAME AS columnName, COLUMN_TYPE AS dataType, ORDINAL_POSITION AS columnSeq, IS_NULLABLE AS isNull, COLUMN_COMMENT AS columnComment FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' ORDER BY ORDINAL_POSITION;", my.Schema, my.Table) dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return nil, err } tableData, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) @@ -580,8 +591,8 @@ func (my *QueryTable) TableAllColumn(db *sql.DB, logThreadSeq int64) ([]map[stri tableAllColumnGlobalCache[cacheKey] = tableData cacheMutex.Unlock() - vlog = fmt.Sprintf("(%d) [%s] Complete the metadata query of all columns in table %s.%s in the %s database. Cached results for future use.", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Complete the metadata query of all columns in table %s.%s in the %s database. Cached results for future use.", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) defer dispos.SqlRows.Close() return tableData, err } @@ -672,14 +683,15 @@ func (my *QueryTable) TableIndexChoice(queryData []map[string]interface{}, logTh PriIndexCol, uniIndexCol, mulIndexCol []string indexName string Event = "Q_Table_Index_Choice" + logMsg string ) if len(queryData) == 0 { return nil } //索引列处理,联合索引进行列合并 //去除主键索引列、唯一索引列、普通索引列的所有列明 - vlog = fmt.Sprintf("(%d) [%s] Start to select the appropriate index column in the following table %s.%s of the %s database.", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Start to select the appropriate index column in the following table %s.%s of the %s database.", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) for _, v := range queryData { if v["nonUnique"].(string) == "0" { //处理主键索引 @@ -711,15 +723,15 @@ func (my *QueryTable) TableIndexChoice(queryData []map[string]interface{}, logTh //global.Wlog.Debug(vlog) //处理主键索引列 //判断是否存在主键索引,每个表的索引只有一个 - vlog = fmt.Sprintf("(%d) MySQL DB primary key index starts to choose the best.", logThreadSeq) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) MySQL DB primary key index starts to choose the best.", logThreadSeq) + global.Wlog.Debug(logMsg) if len(PriIndexCol) == 1 { //单列主键索引 indexChoice["pri_single"] = PriIndexCol } else if len(PriIndexCol) > 1 { //联合主键索引 indexChoice["pri_multiseriate"] = PriIndexCol } - vlog = fmt.Sprintf("(%d) MySQL DB unique key index starts to choose the best.", logThreadSeq) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) MySQL DB unique key index starts to choose the best.", logThreadSeq) + global.Wlog.Debug(logMsg) g := my.keyChoiceDispos(nultiseriateIndexColumnMap, "uni") for k, v := range g { if len(v) > 0 { @@ -732,8 +744,8 @@ func (my *QueryTable) TableIndexChoice(queryData []map[string]interface{}, logTh indexChoice[k] = v } } - vlog = fmt.Sprintf("(%d) [%s] Complete the selection of the appropriate index column in the following table %s.%s of the %s database.", logThreadSeq, Event, my.Schema, my.Table, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Complete the selection of the appropriate index column in the following table %s.%s of the %s database.", logThreadSeq, Event, my.Schema, my.Table, DBType) + global.Wlog.Debug(logMsg) return indexChoice } @@ -742,14 +754,17 @@ MySQL 查询触发器信息 */ func (my *QueryTable) Trigger(db *sql.DB, logThreadSeq int64) (map[string]string, error) { var ( - tmpb = make(map[string]string) - Event = "Q_Trigger" + tmpb = make(map[string]string) + Event = "Q_Trigger" + query string + logMsg string + err error ) - vlog = fmt.Sprintf("(%d) [%s] Start to query the trigger information under the %s database.", logThreadSeq, Event, DBType) - global.Wlog.Debug(vlog) - strsql = fmt.Sprintf("SELECT TRIGGER_NAME AS triggerName, EVENT_OBJECT_TABLE AS tableName FROM INFORMATION_SCHEMA.TRIGGERS WHERE TRIGGER_SCHEMA IN('%s');", my.Schema) + logMsg = fmt.Sprintf("(%d) [%s] Start to query the trigger information under the %s database.", logThreadSeq, Event, DBType) + global.Wlog.Debug(logMsg) + query = fmt.Sprintf("SELECT TRIGGER_NAME AS triggerName, EVENT_OBJECT_TABLE AS tableName FROM INFORMATION_SCHEMA.TRIGGERS WHERE TRIGGER_SCHEMA IN('%s');", my.Schema) dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return nil, err } triggerName, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) @@ -757,13 +772,13 @@ func (my *QueryTable) Trigger(db *sql.DB, logThreadSeq int64) (map[string]string return nil, err } for _, v := range triggerName { - strsql = fmt.Sprintf("SHOW CREATE TRIGGER %s.%s", my.Schema, v["triggerName"]) - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + query = fmt.Sprintf("SHOW CREATE TRIGGER %s.%s", my.Schema, v["triggerName"]) + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return nil, err } createTrigger, err1 := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) if err1 != nil { - return nil, err + return nil, err1 } for _, b := range createTrigger { //获取trigger name @@ -785,11 +800,11 @@ func (my *QueryTable) Trigger(db *sql.DB, logThreadSeq int64) (map[string]string } tmpb[triggerNa] = fmt.Sprintf("%s %s %s", triggerAction, triggerOn, triggerTRX) } - vlog = fmt.Sprintf("(%d) MySQL db query databases %s Trigger data completion...", logThreadSeq, my.Schema) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) MySQL db query databases %s Trigger data completion...", logThreadSeq, my.Schema) + global.Wlog.Debug(logMsg) } - vlog = fmt.Sprintf("(%d) [%s] Complete the trigger information query under the %s database.", logThreadSeq, Event, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Complete the trigger information query under the %s database.", logThreadSeq, Event, DBType) + global.Wlog.Debug(logMsg) defer dispos.SqlRows.Close() return tmpb, nil } @@ -805,14 +820,17 @@ func (my *QueryTable) Routine(db *sql.DB, logThreadSeq int64) (map[string]string routines = make(map[string]string) // name -> body types = make(map[string]string) // name -> "PROCEDURE"/"FUNCTION" Event = "Q_Routine" + query string + logMsg string + err error ) - vlog = fmt.Sprintf("(%d) [%s] Start to query PROCEDURE and FUNCTION information under the %s database.", logThreadSeq, Event, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Start to query PROCEDURE and FUNCTION information under the %s database.", logThreadSeq, Event, DBType) + global.Wlog.Debug(logMsg) // 1) 查询参数:同时取 PROCEDURE 与 FUNCTION - strsql = fmt.Sprintf("SELECT SPECIFIC_SCHEMA, SPECIFIC_NAME, ROUTINE_TYPE, ORDINAL_POSITION, PARAMETER_MODE, PARAMETER_NAME, DTD_IDENTIFIER FROM INFORMATION_SCHEMA.PARAMETERS WHERE SPECIFIC_SCHEMA IN('%s') AND ROUTINE_TYPE IN('PROCEDURE','FUNCTION') ORDER BY SPECIFIC_NAME, ORDINAL_POSITION;", my.Schema) + query = fmt.Sprintf("SELECT SPECIFIC_SCHEMA, SPECIFIC_NAME, ROUTINE_TYPE, ORDINAL_POSITION, PARAMETER_MODE, PARAMETER_NAME, DTD_IDENTIFIER FROM INFORMATION_SCHEMA.PARAMETERS WHERE SPECIFIC_SCHEMA IN('%s') AND ROUTINE_TYPE IN('PROCEDURE','FUNCTION') ORDER BY SPECIFIC_NAME, ORDINAL_POSITION;", my.Schema) dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return nil, nil, err } inoutAll, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) @@ -833,8 +851,8 @@ func (my *QueryTable) Routine(db *sql.DB, logThreadSeq int64) (map[string]string tmpaFunc := procP(inoutFunc, "Func") // 2) 从 ROUTINES 取定义与属性,并带出 ROUTINE_TYPE - strsql = fmt.Sprintf("SELECT ROUTINE_NAME, ROUTINE_DEFINITION, DEFINER, SQL_MODE, CHARACTER_SET_CLIENT, COLLATION_CONNECTION, DATABASE_COLLATION, ROUTINE_TYPE FROM INFORMATION_SCHEMA.ROUTINES WHERE ROUTINE_SCHEMA='%s' AND ROUTINE_TYPE IN('PROCEDURE','FUNCTION');", my.Schema) - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + query = fmt.Sprintf("SELECT ROUTINE_NAME, ROUTINE_DEFINITION, DEFINER, SQL_MODE, CHARACTER_SET_CLIENT, COLLATION_CONNECTION, DATABASE_COLLATION, ROUTINE_TYPE FROM INFORMATION_SCHEMA.ROUTINES WHERE ROUTINE_SCHEMA='%s' AND ROUTINE_TYPE IN('PROCEDURE','FUNCTION');", my.Schema) + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return nil, nil, err } createAll, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) @@ -866,8 +884,8 @@ func (my *QueryTable) Routine(db *sql.DB, logThreadSeq int64) (map[string]string types[k] = "FUNCTION" } - vlog = fmt.Sprintf("(%d) [%s] Complete the PROCEDURE and FUNCTION information query under the %s database.", logThreadSeq, Event, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Complete the PROCEDURE and FUNCTION information query under the %s database.", logThreadSeq, Event, DBType) + global.Wlog.Debug(logMsg) return routines, types, nil } @@ -918,17 +936,20 @@ MySQL 外键校验 */ func (my *QueryTable) Foreign(db *sql.DB, logThreadSeq int64) (map[string]string, error) { var ( - tmpb = make(map[string]string) - Event = "Q_Foreign" + tmpb = make(map[string]string) + Event = "Q_Foreign" + query string + logMsg string + err error ) - vlog = fmt.Sprintf("(%d) [%s] Start to query the Foreign information under the %s database.", logThreadSeq, Event, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Start to query the Foreign information under the %s database.", logThreadSeq, Event, DBType) + global.Wlog.Debug(logMsg) // 使用INFORMATION_SCHEMA获取完整的外键约束信息 // 这个查询会获取外键名称、列名、引用的表和列信息 - strsql = fmt.Sprintf(` - SELECT - rc.CONSTRAINT_NAME, + query = fmt.Sprintf(` + SELECT + rc.CONSTRAINT_NAME, kcu.COLUMN_NAME, rc.CONSTRAINT_SCHEMA AS REFERENCED_TABLE_SCHEMA, rc.REFERENCED_TABLE_NAME, @@ -952,19 +973,19 @@ func (my *QueryTable) Foreign(db *sql.DB, logThreadSeq int64) (map[string]string AND rc.TABLE_NAME = '%s' ORDER BY rc.CONSTRAINT_NAME, kcu.ORDINAL_POSITION - `, my.Schema, my.Table) + `, my.Schema, my.Table) dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { - vlog = fmt.Sprintf("(%d) [%s] Error executing foreign key query: %v", logThreadSeq, Event, err) - global.Wlog.Error(vlog) + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { + logMsg = fmt.Sprintf("(%d) [%s] Error executing foreign key query: %v", logThreadSeq, Event, err) + global.Wlog.Error(logMsg) return nil, err } foreignKeys, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) if err != nil { - vlog = fmt.Sprintf("(%d) [%s] Error processing foreign key results: %v", logThreadSeq, Event, err) - global.Wlog.Error(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Error processing foreign key results: %v", logThreadSeq, Event, err) + global.Wlog.Error(logMsg) return nil, err } defer dispos.SqlRows.Close() @@ -1018,12 +1039,12 @@ func (my *QueryTable) Foreign(db *sql.DB, logThreadSeq int64) (map[string]string tableKey := fmt.Sprintf("%s.%s", my.Schema, my.Table) tmpb[tableKey] = strings.ToUpper(ddl) - vlog = fmt.Sprintf("(%d) [%s] Found foreign key: %s", logThreadSeq, Event, ddl) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Found foreign key: %s", logThreadSeq, Event, ddl) + global.Wlog.Debug(logMsg) } - vlog = fmt.Sprintf("(%d) [%s] Complete the Foreign information query under the %s database.", logThreadSeq, Event, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Complete the Foreign information query under the %s database.", logThreadSeq, Event, DBType) + global.Wlog.Debug(logMsg) return tmpb, nil } @@ -1032,8 +1053,11 @@ func (my *QueryTable) Foreign(db *sql.DB, logThreadSeq int64) (map[string]string */ func (my *QueryTable) Partitions(db *sql.DB, logThreadSeq int64) (map[string]string, error) { var ( - tmpb = make(map[string]string) - Event = "Q_Partitions" + tmpb = make(map[string]string) + Event = "Q_Partitions" + err error + logMsg string + query string ) // 正确提取表名,避免表名中包含schema信息 @@ -1045,15 +1069,15 @@ func (my *QueryTable) Partitions(db *sql.DB, logThreadSeq int64) (map[string]str } } - vlog = fmt.Sprintf("(%d) [%s] Start to query the Partitions information for table %s.%s under the %s database.", logThreadSeq, Event, my.Schema, actualTableName, DBType) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Start to query the Partitions information for table %s.%s under the %s database.", logThreadSeq, Event, my.Schema, actualTableName, DBType) + global.Wlog.Debug(logMsg) // 直接查询表的分区信息,包括分区名称和详细定义 - strsql = fmt.Sprintf("SELECT PARTITION_NAME, PARTITION_ORDINAL_POSITION, PARTITION_METHOD, PARTITION_EXPRESSION, PARTITION_DESCRIPTION, TABLE_ROWS FROM INFORMATION_SCHEMA.PARTITIONS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' AND PARTITION_NAME<>'' ORDER BY PARTITION_ORDINAL_POSITION;", my.Schema, actualTableName) - vlog = fmt.Sprintf("(%d) [%s] Executing query on INFORMATION_SCHEMA.PARTITIONS: %s", logThreadSeq, Event, strsql) - global.Wlog.Debug(vlog) + query = fmt.Sprintf("SELECT PARTITION_NAME, PARTITION_ORDINAL_POSITION, PARTITION_METHOD, PARTITION_EXPRESSION, PARTITION_DESCRIPTION, TABLE_ROWS FROM INFORMATION_SCHEMA.PARTITIONS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' AND PARTITION_NAME<>'' ORDER BY PARTITION_ORDINAL_POSITION;", my.Schema, actualTableName) + logMsg = fmt.Sprintf("(%d) [%s] Executing query on INFORMATION_SCHEMA.PARTITIONS: %s", logThreadSeq, Event, query) + global.Wlog.Debug(logMsg) dispos := dataDispos.DBdataDispos{DBType: DBType, LogThreadSeq: logThreadSeq, Event: Event, DB: db} - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return nil, err } partitionsInfo, err := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) @@ -1063,8 +1087,8 @@ func (my *QueryTable) Partitions(db *sql.DB, logThreadSeq int64) (map[string]str // 如果有分区,获取表的创建语句以提取完整的分区定义 if len(partitionsInfo) > 0 { - strsql = fmt.Sprintf("SHOW CREATE TABLE %s.%s;", my.Schema, actualTableName) - if dispos.SqlRows, err = dispos.DBSQLforExec(strsql); err != nil { + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s;", my.Schema, actualTableName) + if dispos.SqlRows, err = dispos.DBSQLforExec(query); err != nil { return nil, err } createTableInfo, err1 := dispos.DataRowsAndColumnSliceDispos([]map[string]interface{}{}) @@ -1108,8 +1132,8 @@ func (my *QueryTable) Partitions(db *sql.DB, logThreadSeq int64) (map[string]str fullPartitionDef = strings.ReplaceAll(fullPartitionDef, "`", "!") // 增加日志,记录完整的分区定义用于调试 - vlog = fmt.Sprintf("(%d) [%s] Extracted full partition definition for %s.%s: %s", logThreadSeq, Event, my.Schema, actualTableName, fullPartitionDef) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Extracted full partition definition for %s.%s: %s", logThreadSeq, Event, my.Schema, actualTableName, fullPartitionDef) + global.Wlog.Debug(logMsg) // 使用表名作为键,存储完整的分区定义 tableKey := fmt.Sprintf("%s.%s", my.Schema, my.Table) @@ -1128,15 +1152,15 @@ func (my *QueryTable) Partitions(db *sql.DB, logThreadSeq int64) (map[string]str p["PARTITION_DESCRIPTION"], p["TABLE_ROWS"]) tmpb[partitionKey] = partitionDetails - vlog = fmt.Sprintf("(%d) [%s] Stored partition %s details: %s", logThreadSeq, Event, partitionKey, partitionDetails) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Stored partition %s details: %s", logThreadSeq, Event, partitionKey, partitionDetails) + global.Wlog.Debug(logMsg) } } } defer dispos.SqlRows.Close() - vlog = fmt.Sprintf("(%d) [%s] Complete the Partitions information query for table %s.%s under the %s database. Found %d partitions.", logThreadSeq, Event, my.Schema, actualTableName, DBType, len(partitionsInfo)) - global.Wlog.Debug(vlog) + logMsg = fmt.Sprintf("(%d) [%s] Complete the Partitions information query for table %s.%s under the %s database. Found %d partitions.", logThreadSeq, Event, my.Schema, actualTableName, DBType, len(partitionsInfo)) + global.Wlog.Debug(logMsg) return tmpb, nil }