From 0addf9576bac276f8c1b7add6866dc00ec72185b Mon Sep 17 00:00:00 2001 From: zqq Date: Tue, 30 Dec 2025 14:49:47 +0800 Subject: [PATCH 1/2] save gid after expired cursor Signed-off-by: zqq --- .../distributeddb/common/include/db_common.h | 6 +- .../common/include/db_constant.h | 1 + .../common/src/db_common_client.cpp | 7 - .../interfaces/include/cloud/icloud_db.h | 2 +- .../relational/relational_sync_able_storage.h | 4 +- .../storage/include/cloud/cloud_meta_data.h | 4 + .../include/icloud_sync_storage_interface.h | 2 +- .../storage/include/storage_proxy.h | 6 +- .../storage/src/cloud/cloud_meta_data.cpp | 52 +++++- .../relational_sync_able_storage_extend.cpp | 15 +- .../relational/sqlite_relational_utils.cpp | 63 +++++++ .../relational/sqlite_relational_utils.h | 4 + ...qlite_single_relational_storage_engine.cpp | 33 ++++ .../sqlite_single_relational_storage_engine.h | 4 + ...single_ver_relational_storage_executor.cpp | 8 + ...e_single_ver_relational_storage_executor.h | 6 + ...ver_relational_storage_executor_extend.cpp | 11 +- ...ver_relational_storage_extend_executor.cpp | 21 +++ .../storage/src/storage_proxy.cpp | 24 ++- .../syncer/src/cloud/cloud_db_proxy.cpp | 7 +- .../syncer/src/cloud/cloud_db_proxy.h | 2 +- .../syncer/src/cloud/cloud_syncer.cpp | 31 ++-- .../syncer/src/cloud/cloud_syncer.h | 14 ++ .../src/cloud/cloud_syncer_extend_extend.cpp | 156 ++++++++++++++++++ .../distributeddb_rdb_complex_cloud_test.cpp | 32 ++++ .../common/syncer/cloud/virtual_cloud_db.cpp | 12 ++ .../common/syncer/cloud/virtual_cloud_db.h | 2 + 27 files changed, 480 insertions(+), 49 deletions(-) diff --git a/frameworks/libs/distributeddb/common/include/db_common.h b/frameworks/libs/distributeddb/common/include/db_common.h index 55de410a967..3d6b2a3ba5b 100644 --- a/frameworks/libs/distributeddb/common/include/db_common.h +++ b/frameworks/libs/distributeddb/common/include/db_common.h @@ -30,7 +30,6 @@ public: static int CreateDirectory(const std::string &directory); static void StringToVector(const std::string &src, std::vector &dst); - static std::vector ToVector(const std::string &src); static void VectorToString(const std::vector &src, std::string &dst); static inline std::string GetLogTableName(const std::string &tableName) @@ -38,6 +37,11 @@ public: return DBConstant::RELATIONAL_PREFIX + tableName + DBConstant::LOG_POSTFIX; } + static inline std::string GetTmpLogTableName(const std::string &tableName) + { + return DBConstant::RELATIONAL_PREFIX + tableName + DBConstant::LOG_POSTFIX + DBConstant::TMP_POSTFIX; + } + static inline const std::vector GetWaterTypeVec() { return {CloudWaterType::DELETE, CloudWaterType::UPDATE, CloudWaterType::INSERT}; diff --git a/frameworks/libs/distributeddb/common/include/db_constant.h b/frameworks/libs/distributeddb/common/include/db_constant.h index 6de2f46a98b..a0c15ad39f2 100644 --- a/frameworks/libs/distributeddb/common/include/db_constant.h +++ b/frameworks/libs/distributeddb/common/include/db_constant.h @@ -162,6 +162,7 @@ public: static constexpr size_t RELATIONAL_PREFIX_SIZE = 20; static constexpr const char *TIMESTAMP_ALIAS = "naturalbase_rdb_aux_timestamp"; static constexpr const char *LOG_POSTFIX = "_log"; + static constexpr const char *TMP_POSTFIX = "_tmp"; static constexpr const char *META_TABLE_POSTFIX = "metadata"; static constexpr const char *KNOWLEDGE_TABLE_TYPE = "knowledge"; diff --git a/frameworks/libs/distributeddb/common/src/db_common_client.cpp b/frameworks/libs/distributeddb/common/src/db_common_client.cpp index 723305e1cb7..fa103ec9fe8 100644 --- a/frameworks/libs/distributeddb/common/src/db_common_client.cpp +++ b/frameworks/libs/distributeddb/common/src/db_common_client.cpp @@ -33,13 +33,6 @@ void DBCommon::StringToVector(const std::string &src, std::vector &dst) dst.assign(src.begin(), src.end()); } -std::vector DBCommon::ToVector(const std::string &src) -{ - std::vector res; - StringToVector(src, res); - return res; -} - std::string DBCommon::StringMiddleMasking(const std::string &name) { if (name.length() <= HEAD_SIZE) { diff --git a/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h b/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h index e72fca098dd..4b1646dfcbb 100644 --- a/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h +++ b/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h @@ -52,7 +52,7 @@ public: { return this->prepareTraceId; } - virtual DBStatus QueryAllGID(const std::string &tableName, const VBucket &extend, std::vector &data) + virtual DBStatus QueryAllGid(const std::string &tableName, VBucket &extend, std::vector &data) { return NOT_SUPPORT; } diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h b/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h index b9abe1cc63f..66906e96d60 100644 --- a/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h +++ b/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h @@ -282,8 +282,10 @@ public: int ResetGenLogTaskStatus(); + int PutCloudGid(const std::string &tableName, std::vector &data) override; + #ifdef USE_DISTRIBUTEDDB_CLOUD - int AgingCloudNoneExistRecord(const std::string &tableName) override; + int DeleteCloudNoneExistRecord(const std::string &tableName) override; #endif protected: int FillReferenceData(CloudSyncData &syncData); diff --git a/frameworks/libs/distributeddb/storage/include/cloud/cloud_meta_data.h b/frameworks/libs/distributeddb/storage/include/cloud/cloud_meta_data.h index 5005e4e9720..437e610527f 100644 --- a/frameworks/libs/distributeddb/storage/include/cloud/cloud_meta_data.h +++ b/frameworks/libs/distributeddb/storage/include/cloud/cloud_meta_data.h @@ -49,6 +49,8 @@ public: int GetCloudGidCursor(const std::string &tableName, std::string &cursor) const; int PutCloudGidCursor(const std::string &tableName, const std::string &cursor) const; + int GetPotentialCursor(const std::string &tableName, std::string &cursor) const; + int PutPotentialCursor(const std::string &tableName, std::string &cursor) const; int CleanCloudInfo(const std::string &tableName) const; private: @@ -63,6 +65,8 @@ private: typedef struct CloudInfoValue { uint64_t version = 0u; std::string gidCloudMark; + // when query all gid finished, use it as query watermark + std::string potentialCloudMark; } CloudInfoValue; int ReadMarkFromMeta(const TableName &tableName); diff --git a/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h b/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h index 3e8b71b868d..b839d2803d4 100644 --- a/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h +++ b/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h @@ -335,7 +335,7 @@ public: return E_OK; } - virtual int AgingCloudNoneExistRecord([[gnu::unused]] const std::string &tableName) + virtual int DeleteCloudNoneExistRecord([[gnu::unused]] const std::string &tableName) { return E_OK; } diff --git a/frameworks/libs/distributeddb/storage/include/storage_proxy.h b/frameworks/libs/distributeddb/storage/include/storage_proxy.h index 10e34428385..466113a6229 100644 --- a/frameworks/libs/distributeddb/storage/include/storage_proxy.h +++ b/frameworks/libs/distributeddb/storage/include/storage_proxy.h @@ -206,9 +206,13 @@ public: int PutCloudGidCursor(const std::string &tableName, const std::string &cursor); + int GetPotentialCursor(const std::string &tableName, std::string &cursor); + + int PutPotentialCursor(const std::string &tableName, std::string &cursor); + int CleanCloudInfo(const std::string &tableName); - int AgingCloudNoneExistRecord(const std::string &tableName); + int DeleteCloudNoneExistRecord(const std::string &tableName); protected: void Init(); private: diff --git a/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp b/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp index 2cc2b9cacee..6d1b1342bc2 100644 --- a/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp +++ b/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp @@ -280,6 +280,7 @@ void CloudMetaData::CleanWaterMarkInMemory(const TableName &tableName) int CloudMetaData::GetCloudGidCursor(const std::string &tableName, std::string &cursor) const { + std::lock_guard lock(cloudMetaMutex_); if (store_ == nullptr) { return -E_INVALID_DB; } @@ -289,11 +290,13 @@ int CloudMetaData::GetCloudGidCursor(const std::string &tableName, std::string & return errCode; } cursor = info.gidCloudMark; + LOGI("[Meta] Get table[%s] gid cursor[%s]", DBCommon::StringMiddleMasking(tableName).c_str(), cursor.c_str()); return E_OK; } int CloudMetaData::PutCloudGidCursor(const std::string &tableName, const std::string &cursor) const { + std::lock_guard lock(cloudMetaMutex_); Value blobMetaVal; CloudInfoValue info; auto errCode = GetCloudInfo(tableName, info); @@ -308,14 +311,54 @@ int CloudMetaData::PutCloudGidCursor(const std::string &tableName, const std::st if (store_ == nullptr) { return -E_INVALID_DB; } + LOGI("[Meta] Put table[%s] gid cursor[%s]", DBCommon::StringMiddleMasking(tableName).c_str(), cursor.c_str()); + return store_->PutMetaData(GetCloudInfoKey(tableName), blobMetaVal); +} + +int CloudMetaData::GetPotentialCursor(const std::string &tableName, std::string &cursor) const +{ + std::lock_guard lock(cloudMetaMutex_); + if (store_ == nullptr) { + return -E_INVALID_DB; + } + CloudInfoValue info; + auto errCode = GetCloudInfo(tableName, info); + if (errCode != E_OK) { + return errCode; + } + cursor = info.potentialCloudMark; + LOGI("[Meta] Get table[%s] potential cursor[%s]", DBCommon::StringMiddleMasking(tableName).c_str(), cursor.c_str()); + return E_OK; +} + +int CloudMetaData::PutPotentialCursor(const std::string &tableName, std::string &cursor) const +{ + std::lock_guard lock(cloudMetaMutex_); + Value blobMetaVal; + CloudInfoValue info; + auto errCode = GetCloudInfo(tableName, info); + if (errCode != E_OK) { + return errCode; + } + info.potentialCloudMark = cursor; + errCode = SerializeCloudInfo(info, blobMetaVal); + if (errCode != E_OK) { + return errCode; + } + if (store_ == nullptr) { + return -E_INVALID_DB; + } + LOGI("[Meta] Put table[%s] potential cursor[%s]", DBCommon::StringMiddleMasking(tableName).c_str(), cursor.c_str()); return store_->PutMetaData(GetCloudInfoKey(tableName), blobMetaVal); } int CloudMetaData::CleanCloudInfo(const std::string &tableName) const { + std::lock_guard lock(cloudMetaMutex_); if (store_ == nullptr) { return -E_INVALID_DB; } + LOGI("[Meta] Clean table[%s] cloudInfo", DBCommon::StringMiddleMasking(tableName).c_str()); return store_->DeleteMetaData({GetCloudInfoKey(tableName)}); } @@ -343,6 +386,7 @@ int CloudMetaData::DeserializeCloudInfo(Value &blobMark, CloudInfoValue &info) info.version = 0; parcel.ReadUInt64(info.version); parcel.ReadString(info.gidCloudMark); + parcel.ReadString(info.potentialCloudMark); if (parcel.IsError()) { LOGE("[CloudMetaData] Parse error cloud info version[%" PRIu32 "]", info.version); return -E_PARSE_FAIL; @@ -357,6 +401,7 @@ int CloudMetaData::SerializeCloudInfo(const CloudInfoValue &info, Value &blobMar Parcel parcel(blobMark.data(), blobMark.size()); parcel.WriteUInt64(info.version); parcel.WriteString(info.gidCloudMark); + parcel.WriteString(info.potentialCloudMark); if (parcel.IsError()) { LOGE("[CloudMetaData] Parcel error while serializing cloud info."); return -E_PARSE_FAIL; @@ -366,11 +411,14 @@ int CloudMetaData::SerializeCloudInfo(const CloudInfoValue &info, Value &blobMar uint64_t CloudMetaData::GetParcelCurrentLength(const CloudInfoValue &info) { - return Parcel::GetUInt64Len() + Parcel::GetStringLen(info.gidCloudMark); + return Parcel::GetUInt64Len() + Parcel::GetStringLen(info.gidCloudMark) + + Parcel::GetStringLen(info.potentialCloudMark); } Key CloudMetaData::GetCloudInfoKey(const std::string &tableName) { - return DBCommon::ToVector(CloudDbConstant::CLOUD_INFO_META_PREFIX + tableName); + Key key; + DBCommon::StringToVector(CloudDbConstant::CLOUD_INFO_META_PREFIX + tableName, key); + return key; } } // namespace DistributedDB \ No newline at end of file diff --git a/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp b/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp index 04f102e8939..5a338d072e0 100644 --- a/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp +++ b/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp @@ -724,8 +724,21 @@ int RelationalSyncAbleStorage::ResetGenLogTaskStatus() return E_OK; } +int RelationalSyncAbleStorage::PutCloudGid(const std::string &tableName, std::vector &data) +{ +#ifdef USE_DISTRIBUTEDDB_CLOUD + if (storageEngine_ == nullptr) { + LOGE("[RelationalSyncAbleStorage] Storage is null when put cloud gid"); + return -E_INVALID_DB; + } + return storageEngine_->PutCloudGid(tableName, data); +#else + return -E_NOT_SUPPORT; +#endif +} + #ifdef USE_DISTRIBUTEDDB_CLOUD -int RelationalSyncAbleStorage::AgingCloudNoneExistRecord(const std::string &tableName) +int RelationalSyncAbleStorage::DeleteCloudNoneExistRecord(const std::string &tableName) { if (storageEngine_ == nullptr) { LOGE("[AgingCloudNoneExistRecord] Storage is null"); diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp index daf97467198..1bec1a18bc6 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp @@ -1257,6 +1257,69 @@ void SQLiteRelationalUtils::FillSyncInfo(const CloudSyncOption &option, const Sy info.asyncDownloadAssets = option.asyncDownloadAssets; } +int SQLiteRelationalUtils::PutCloudGid(sqlite3 *db, const std::string &tableName, std::vector &data) +{ + // create tmp table if table not exists + std::string sql = "CREATE TABLE IF NOT EXISTS " + DBCommon::GetTmpLogTableName(tableName) + + "(ID integer primary key autoincrement, cloud_gid TEXT UNIQUE ON CONFLICT IGNORE)"; + int errCode = SQLiteUtils::ExecuteRawSQL(db, sql); + if (errCode != E_OK) { + LOGE("[RDBUtils] Create gid table failed[%d]", errCode); + return errCode; + } + // insert all gid + sql = "INSERT INTO " + DBCommon::GetTmpLogTableName(tableName) + "(cloud_gid) VALUES(?)"; + sqlite3_stmt *stmt = nullptr; + errCode = SQLiteUtils::GetStatement(db, sql, stmt); + if (stmt == nullptr) { + LOGE("[RDBUtils] Get insert gid stmt failed[%d]", errCode); + return errCode; + } + ResFinalizer finalizer([stmt]() { + sqlite3_stmt *releaseStmt = stmt; + int ret = E_OK; + SQLiteUtils::ResetStatement(releaseStmt, true, ret); + if (ret != E_OK) { + LOGW("[RDBUtils] Reset cloud gid stmt failed[%d]", ret); + } + }); + return PutCloudGidInner(stmt, data); +} + +int SQLiteRelationalUtils::PutCloudGidInner(sqlite3_stmt *stmt, std::vector &data) +{ + int errCode = E_OK; + for (const auto &item : data) { + auto iter = item.find(CloudDbConstant::GID_FIELD); + if (iter == item.end()) { + LOGW("[RDBUtils] Cloud gid info not contain gid field"); + continue; + } + auto gid = std::get_if(&iter->second); + if (gid == nullptr) { + LOGW("[RDBUtils] Cloud gid info not contain wrong type[%zu]", iter->second.index()); + continue; + } + errCode = SQLiteUtils::BindTextToStatement(stmt, 1, *gid); + if (errCode != E_OK) { + LOGE("[RDBUtils] Bind cloud gid failed[%d]", errCode); + return errCode; + } + errCode = SQLiteUtils::StepNext(stmt); + if (errCode != -E_FINISHED) { + LOGE("[RDBUtils] Step cloud gid failed[%d]", errCode); + return errCode; + } + errCode = E_OK; + SQLiteUtils::ResetStatement(stmt, false, errCode); + if (errCode != E_OK) { + LOGE("[RDBUtils] Reset cloud gid stmt failed[%d]", errCode); + return errCode; + } + } + return errCode; +} + int SQLiteRelationalUtils::GetOneBatchCloudNotExistRecord(const std::string &tableName, sqlite3 *db, std::vector &records, const std::string &dataPk) { diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h index bf395b18d25..a2920903e45 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h @@ -151,6 +151,8 @@ public: static void FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess, ICloudSyncer::CloudTaskInfo &info); + static int PutCloudGid(sqlite3 *db, const std::string &tableName, std::vector &data); + struct CloudNotExistRecord { int64_t logRowid = 0; int64_t dataRowid = 0; @@ -178,6 +180,8 @@ private: const std::vector &targetFields); static std::string GetQueryLocalDataSQL(const TableInfo &table, int64_t dataKey); + + static int PutCloudGidInner(sqlite3_stmt *stmt, std::vector &data); }; } // namespace DistributedDB #endif // SQLITE_RELATIONAL_UTILS_H diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp index a847dc50346..9854ddf9585 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp @@ -1677,5 +1677,38 @@ std::pair SQLiteSingleRelationalStorageEngine::AnalyzeTable(cons }); return handle->AnalyzeTable(tableName); } + +#ifdef USE_DISTRIBUTEDDB_CLOUD +int SQLiteSingleRelationalStorageEngine::PutCloudGid(const std::string &tableName, std::vector &data) +{ + int errCode = E_OK; + auto *handle = static_cast(FindExecutor(true, OperatePerm::NORMAL_PERM, + errCode)); + if (handle == nullptr) { + return errCode; + } + ResFinalizer finalizer([this, handle]() { + auto releaseHandle = handle; + ReleaseExecutor(releaseHandle); + }); + errCode = handle->StartTransaction(TransactType::IMMEDIATE); + if (errCode != E_OK) { + return errCode; + } + errCode = handle->PutCloudGid(tableName, data); + if (errCode == E_OK) { + errCode = handle->Commit(); + if (errCode != E_OK) { + LOGE("[RDBEngine] Commit transaction failed[%d] when put cloud gid", errCode); + } + } else { + int ret = handle->Rollback(); + if (ret != E_OK) { + LOGW("[RDBEngine] Rollback transaction failed[%d] when put cloud gid", ret); + } + } + return errCode; +} +#endif } #endif \ No newline at end of file diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h index fe6b1ceb4e2..c546e078f21 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h @@ -106,6 +106,10 @@ public: std::vector> &tasks); std::pair AnalyzeTable(const std::string &tableName); + +#ifdef USE_DISTRIBUTEDDB_CLOUD + int PutCloudGid(const std::string &tableName, std::vector &data); +#endif protected: StorageExecutor *NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb) override; int Upgrade(sqlite3 *db) override; diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp index 118c8952af0..7869b255d6e 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp @@ -2003,5 +2003,13 @@ std::pair SQLiteSingleVerRelationalStorageExecutor::AnalyzeTable { return SQLiteRelationalUtils::AnalyzeTable(dbHandle_, tableName); } + +#ifdef USE_DISTRIBUTEDDB_CLOUD +int SQLiteSingleVerRelationalStorageExecutor::PutCloudGid(const std::string &tableName, + std::vector &data) const +{ + return SQLiteRelationalUtils::PutCloudGid(dbHandle_, tableName, data); +} +#endif } // namespace DistributedDB #endif diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h index 11a0c7f0680..6e70c4dacd9 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h @@ -284,6 +284,10 @@ public: int GetLocalDataByRowid(const TableInfo &table, const TableSchema &tableSchema, DataInfoWithLog &dataInfoWithLog); std::pair AnalyzeTable(const std::string &tableName) const; + +#ifdef USE_DISTRIBUTEDDB_CLOUD + int PutCloudGid(const std::string &tableName, std::vector &data) const; +#endif private: int UpdateHashKeyWithOutPk(DistributedTableMode mode, const TableInfo &tableInfo, TableSyncType syncType, const std::string &localIdentity); @@ -544,6 +548,8 @@ private: std::vector GetInsertFields(const VBucket &vBucket, const TableSchema &tableSchema); + int CleanTableTmpMsg(const std::vector &tableNameList); + static constexpr const char *CONSISTENT_FLAG = "0x20"; static constexpr const char *UPDATE_FLAG_CLOUD = "flag = 0"; static constexpr const char *UPDATE_FLAG_WAIT_COMPENSATED_SYNC = "flag = flag | 0x10"; diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor_extend.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor_extend.cpp index 93866f17c57..410f8fc9093 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor_extend.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor_extend.cpp @@ -325,19 +325,14 @@ int SQLiteSingleVerRelationalStorageExecutor::DoCleanInner(ClearMode mode, } notifyTableList = tableNameList; } - for (const auto &tableName: tableNameList) { - errCode = CleanDownloadingFlag(tableName); - if (errCode != E_OK) { - LOGE("Fail to clean downloading flag, %d, tableName:%s, length:%zu", - errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size()); - return errCode; - } + errCode = CleanTableTmpMsg(tableNameList); + if (errCode != E_OK) { + return errCode; } errCode = SetLogTriggerStatus(true); if (errCode != E_OK) { LOGE("Fail to set log trigger on when clean cloud data, %d", errCode); } - return errCode; } diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_extend_executor.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_extend_executor.cpp index 1b9d5293ffe..46fedf2242e 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_extend_executor.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_extend_executor.cpp @@ -2034,5 +2034,26 @@ int SQLiteSingleVerRelationalStorageExecutor::GetLocalDataByRowid(const TableInf } return SQLiteRelationalUtils::GetLocalDataByRowid(dbHandle_, table, tableSchema, dataInfoWithLog); } + +int SQLiteSingleVerRelationalStorageExecutor::CleanTableTmpMsg(const std::vector &tableNameList) +{ + int errCode = E_OK; + for (const auto &tableName: tableNameList) { + errCode = CleanDownloadingFlag(tableName); + if (errCode != E_OK) { + LOGE("Fail to clean downloading flag, %d, tableName:%s, length:%zu", + errCode, DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size()); + return errCode; + } + std::string sql = "DROP TABLE IF EXISTS " + DBCommon::GetTmpLogTableName(tableName); + errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql); + if (errCode != E_OK) { + LOGE("Fail to drop tmp table errCode[%d] tableName[%s]", + errCode, DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); + return errCode; + } + } + return errCode; +} } // namespace DistributedDB #endif diff --git a/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp b/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp index 60957c6e81b..2d77e7a74b2 100644 --- a/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp +++ b/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp @@ -948,6 +948,26 @@ int StorageProxy::PutCloudGidCursor(const std::string &tableName, const std::str return cloudMetaData_->PutCloudGidCursor(tableName, cursor); } +int StorageProxy::GetPotentialCursor(const std::string &tableName, std::string &cursor) +{ + std::shared_lock readLock(storeMutex_); + if (cloudMetaData_ == nullptr) { + LOGE("[GetPotentialCursor] meta is null"); + return -E_INVALID_DB; + } + return cloudMetaData_->GetPotentialCursor(tableName, cursor); +} + +int StorageProxy::PutPotentialCursor(const std::string &tableName, std::string &cursor) +{ + std::shared_lock readLock(storeMutex_); + if (cloudMetaData_ == nullptr) { + LOGE("[PutCloudGidCursor] meta is null"); + return -E_INVALID_DB; + } + return cloudMetaData_->PutPotentialCursor(tableName, cursor); +} + int StorageProxy::CleanCloudInfo(const std::string &tableName) { std::shared_lock readLock(storeMutex_); @@ -958,13 +978,13 @@ int StorageProxy::CleanCloudInfo(const std::string &tableName) return cloudMetaData_->CleanCloudInfo(tableName); } -int StorageProxy::AgingCloudNoneExistRecord(const std::string &tableName) +int StorageProxy::DeleteCloudNoneExistRecord(const std::string &tableName) { std::shared_lock readLock(storeMutex_); if (store_ == nullptr) { LOGE("[AgingCloudNoneExistRecord] store is null"); return -E_INVALID_DB; } - return store_->AgingCloudNoneExistRecord(tableName); + return store_->DeleteCloudNoneExistRecord(tableName); } } \ No newline at end of file diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp index 62982233671..23a0647b518 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp @@ -496,6 +496,7 @@ int CloudDBProxy::GetInnerErrorCode(DBStatus status) {CLOUD_ASSET_NOT_FOUND, -E_CLOUD_ASSET_NOT_FOUND}, {SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT, -E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT}, {EXPIRED_CURSOR, -E_EXPIRED_CURSOR}, + {QUERY_END, -E_QUERY_END}, }; auto iter = CLOUD_ERROR.find(status); if (iter != CLOUD_ERROR.end()) { @@ -851,14 +852,14 @@ std::weak_ptr CloudDBProxy::GetCloudConflictHandler() return std::weak_ptr(conflictHandler_); } -int CloudDBProxy::QueryAllGID(const std::string &tableName, const VBucket &extend, std::vector &data) +int CloudDBProxy::QueryAllGid(const std::string &tableName, VBucket &extend, std::vector &data) { std::shared_lock readLock(cloudMutex_); if (iCloudDb_ == nullptr) { return -E_CLOUD_ERROR; } - auto ret = iCloudDb_->QueryAllGID(tableName, extend, data); - if (ret != DBStatus::OK) { + auto ret = iCloudDb_->QueryAllGid(tableName, extend, data); + if (ret != DBStatus::OK && ret != DBStatus::QUERY_END) { LOGE("[CloudDBProxy] QueryAllGid failed[%d]", ret); } return GetInnerErrorCode(ret); diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h index 55addf602ef..9cb2891d438 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h @@ -89,7 +89,7 @@ public: std::weak_ptr GetCloudConflictHandler(); - int QueryAllGID(const std::string &tableName, const VBucket &extend, std::vector &data); + int QueryAllGid(const std::string &tableName, VBucket &extend, std::vector &data); static int GetInnerErrorCode(DBStatus status); protected: diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp index e1890fe3dc4..dbba6af79e8 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp @@ -1139,22 +1139,6 @@ int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload) return errCode; } -int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload) -{ - // Query data by batch until reaching end and not more data need to be download - int ret = PreCheck(taskId, param.info.tableName); - if (ret != E_OK) { - return ret; - } - do { - ret = DownloadOneBatch(taskId, param, isFirstDownload); - if (ret != E_OK) { - return ret; - } - } while (!param.isLastBatch); - return E_OK; -} - void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info) { std::lock_guard autoLock(dataLock_); @@ -1654,7 +1638,12 @@ int CloudSyncer::CleanCloudData(ClearMode mode, const std::vector & LOGD("[CloudSyncer] Start clean cloud water mark. table index: %d.", index); int ret = storageProxy_->CleanWaterMark(tableName); if (ret != E_OK) { - LOGE("[CloudSyncer] failed to put cloud water mark after clean cloud data, %d.", ret); + LOGE("[CloudSyncer] failed to put cloud water mark after clean cloud data, %d.", ret); + return ret; + } + ret = storageProxy_->CleanCloudInfo(tableName); + if (ret != E_OK) { + LOGE("[CloudSyncer] failed to clean cloud info after clean cloud data, %d.", ret); return ret; } index++; @@ -2109,9 +2098,11 @@ int CloudSyncer::DownloadDataFromCloud(TaskId taskId, SyncParam ¶m, bool isF // Won't break here since downloadData may not be null param.isLastBatch = true; } else if (ret != E_OK) { - std::lock_guard autoLock(dataLock_); - param.info.tableStatus = ProcessStatus::FINISHED; - currentContext_.notifier->UpdateProcess(param.info); + if (ret != -E_EXPIRED_CURSOR) { + std::lock_guard autoLock(dataLock_); + param.info.tableStatus = ProcessStatus::FINISHED; + currentContext_.notifier->UpdateProcess(param.info); + } return ret; } diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h index 93ddbb25980..c06cecca22d 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h @@ -570,6 +570,20 @@ protected: void SetCurrentTmpError(int errCode); + int DoUpdateExpiredCursor(TaskId taskId, const std::string &table, std::string &newCursor); + + int DoUpdatePotentialCursorIfNeed(const std::string &table); + + int DownloadOneBatchGID(TaskId taskId, SyncParam ¶m); + + int UpdateCloudMarkAndCleanExpiredCursor(SyncParam ¶m, std::string &newCursor); + + int DownloadGIDFromCloud(SyncParam ¶m); + + int SaveGIDRecord(SyncParam ¶m); + + int SaveGIDCursor(SyncParam ¶m); + mutable std::mutex dataLock_; TaskId lastTaskId_; std::multimap> taskQueue_; diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp index 2738e50a12b..f6b1a9a8d08 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp @@ -32,6 +32,9 @@ #include "version.h" namespace DistributedDB { +namespace { + constexpr const int MAX_EXPIRED_CURSOR_COUNT = 1; +} int CloudSyncer::HandleDownloadResultForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info, DownloadCommitList &commitList, uint32_t &successCount) { @@ -227,4 +230,157 @@ void CloudSyncer::SetCurrentTmpError(int errCode) cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode; cloudTaskInfos_[currentContext_.currentTaskId].tempErrCode = errCode; } + +int CloudSyncer::DoDownloadInner(TaskId taskId, SyncParam ¶m, bool isFirstDownload) +{ + // Query data by batch until reaching end and not more data need to be download + int ret = PreCheck(taskId, param.info.tableName); + if (ret != E_OK) { + return ret; + } + int expiredCursorCount = 0; + do { + ret = DownloadOneBatch(taskId, param, isFirstDownload); + if (ret == -E_EXPIRED_CURSOR) { + expiredCursorCount++; + if (expiredCursorCount > MAX_EXPIRED_CURSOR_COUNT) { + LOGE("[CloudSyncer] Table[%s] too much expired cursor count[%d]", + DBCommon::StringMiddleMasking(param.info.tableName).c_str(), expiredCursorCount); + return ret; + } + param.isLastBatch = false; + ret = DoUpdateExpiredCursor(taskId, param.info.tableName, param.cloudWaterMark); + } + if (ret != E_OK) { + return ret; + } + } while (!param.isLastBatch); + return E_OK; +} + +int CloudSyncer::DoUpdateExpiredCursor(TaskId taskId, const std::string &table, std::string &newCursor) +{ + LOGI("[CloudSyncer] Update expired cursor now, table[%s]", DBCommon::StringMiddleMasking(table).c_str()); + if (storageProxy_ == nullptr) { + LOGE("[CloudSyncer] storage is nullptr when update expired cursor"); + return -E_INTERNAL_ERROR; + } + SyncParam param; + param.tableName = table; + auto errCode = storageProxy_->GetCloudGidCursor(param.tableName, param.cloudWaterMark); + if (errCode != E_OK) { + return errCode; + } + errCode = DoUpdatePotentialCursorIfNeed(param.tableName); + if (errCode != E_OK) { + return errCode; + } + int retryCount = 0; + do { + int ret = DownloadOneBatchGID(taskId, param); + if (ret == -E_EXPIRED_CURSOR && retryCount < MAX_EXPIRED_CURSOR_COUNT) { + retryCount++; + param.cloudWaterMark = ""; + continue; + } + if (ret != E_OK) { + return ret; + } + } while (!param.isLastBatch); + errCode = storageProxy_->DeleteCloudNoneExistRecord(param.tableName); + if (errCode != E_OK) { + return errCode; + } + return UpdateCloudMarkAndCleanExpiredCursor(param, newCursor); +} + +int CloudSyncer::DoUpdatePotentialCursorIfNeed(const std::string &table) +{ + std::string potentialCursor; + auto errCode = storageProxy_->GetPotentialCursor(table, potentialCursor); + if (errCode != E_OK) { + return errCode; + } + if (!potentialCursor.empty()) { + LOGI("[CloudSyncer] Table[%s] already exist potential cursor", DBCommon::StringMiddleMasking(table).c_str()); + return E_OK; + } + std::tie(errCode, potentialCursor) = cloudDB_.GetEmptyCursor(table); + if (errCode != E_OK) { + return errCode; + } + return storageProxy_->PutPotentialCursor(table, potentialCursor); +} + +int CloudSyncer::DownloadOneBatchGID(TaskId taskId, SyncParam ¶m) +{ + int ret = CheckTaskIdValid(taskId); + if (ret != E_OK) { + return ret; + } + ret = DownloadGIDFromCloud(param); + if (ret != E_OK) { + return ret; + } + ret = SaveGIDRecord(param); + if (ret != E_OK) { + return ret; + } + return SaveGIDCursor(param); +} + +int CloudSyncer::UpdateCloudMarkAndCleanExpiredCursor(SyncParam ¶m, std::string &newCursor) +{ + int errCode = storageProxy_->GetPotentialCursor(param.tableName, newCursor); + if (errCode != E_OK) { + return errCode; + } + errCode = storageProxy_->CleanCloudInfo(param.tableName); + if (errCode != E_OK) { + return errCode; + } + return storageProxy_->SetCloudWaterMark(param.tableName, newCursor); +} + +int CloudSyncer::DownloadGIDFromCloud(SyncParam ¶m) +{ + VBucket extend; + extend[CloudDbConstant::CURSOR_FIELD] = param.cloudWaterMark; + int errCode = cloudDB_.QueryAllGid(param.tableName, extend, param.downloadData.data); + if (errCode == -E_QUERY_END) { + errCode = E_OK; + param.isLastBatch = true; + } + if (errCode != E_OK) { + LOGE("[CloudSyncer] Query cloud gid failed[%d]", errCode); + } else if (!param.downloadData.data.empty()) { + const auto &record = param.downloadData.data[param.downloadData.data.size() - 1u]; + auto iter = record.find(CloudDbConstant::CURSOR_FIELD); + if (iter == record.end()) { + LOGE("[CloudSyncer] Cloud gid record no exist cursor"); + return -E_CLOUD_ERROR; + } + auto cursor = std::get_if(&iter->second); + if (cursor == nullptr) { + LOGE("[CloudSyncer] Cloud gid record cursor is no str, type[%zu]", iter->second.index()); + return -E_CLOUD_ERROR; + } + if (cursor->size() > static_cast(INT32_MAX)) { + LOGE("[CloudSyncer] Cloud gid record cursor len over max limit, size[%zu]", cursor->size()); + return -E_CLOUD_ERROR; + } + param.cloudWaterMark = *cursor; + } + return errCode; +} + +int CloudSyncer::SaveGIDRecord(SyncParam ¶m) +{ + return storageProxy_->PutCloudGid(param.tableName, param.downloadData.data); +} + +int CloudSyncer::SaveGIDCursor(SyncParam ¶m) +{ + return storageProxy_->PutCloudGidCursor(param.tableName, param.cloudWaterMark); +} } // namespace DistributedDB \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_rdb_complex_cloud_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_rdb_complex_cloud_test.cpp index 6c49fb32697..a8bae4650df 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_rdb_complex_cloud_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_rdb_complex_cloud_test.cpp @@ -147,5 +147,37 @@ HWTEST_F(DistributedDBRDBComplexCloudTest, RemoveData001, TestSize.Level0) ASSERT_NE(delegate, nullptr); EXPECT_EQ(delegate->RemoveDeviceData("", ClearMode::FLAG_ONLY), DBStatus::OK); } + +/** + * @tc.name: ExpireCursor001 + * @tc.desc: Test sync with expire cursor. + * @tc.type: FUNC + * @tc.author: zqq + */ +HWTEST_F(DistributedDBRDBComplexCloudTest, ExpireCursor001, TestSize.Level0) +{ + /** + * @tc.steps:step1. store1 insert one data + * @tc.expected: step1. insert success. + */ + auto ret = ExecuteSQL("INSERT INTO CLOUD_SYNC_TABLE_A VALUES(1, 1, 'text1', 'text2', 'uuid1')", info1_); + ASSERT_EQ(ret, E_OK); + /** + * @tc.steps:step2. store1 push and store2 pull + * @tc.expected: step2. sync success and stringCol2 is null because store1 don't sync stringCol2. + */ + Query query = Query::Select().FromTable({CLOUD_SYNC_TABLE_A}); + EXPECT_NO_FATAL_FAILURE(CloudBlockSync(info1_, query, SyncMode::SYNC_MODE_CLOUD_MERGE, OK, OK)); + EXPECT_NO_FATAL_FAILURE(CloudBlockSync(info2_, query, SyncMode::SYNC_MODE_CLOUD_MERGE, OK, OK)); + auto cloudDB = GetVirtualCloudDb(); + ASSERT_NE(cloudDB, nullptr); + std::atomic count = 0; + cloudDB->ForkAfterQueryResult([&count](VBucket &, std::vector &) { + count++; + return count == 1 ? DBStatus::EXPIRED_CURSOR : DBStatus::QUERY_END; + }); + EXPECT_NO_FATAL_FAILURE(CloudBlockSync(info2_, query, SyncMode::SYNC_MODE_CLOUD_MERGE, OK, OK)); + cloudDB->ForkAfterQueryResult(nullptr); +} } #endif \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp index 1e19a22148d..c60a674ca2b 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp @@ -723,4 +723,16 @@ void VirtualCloudDb::SetUploadRecordStatus(DBStatus status) { uploadRecordStatus_ = status; } + +DBStatus VirtualCloudDb::QueryAllGid(const std::string &tableName, VBucket &extend, std::vector &data) +{ + VBucket copyExtend = extend; + std::string cursor = std::get(copyExtend[g_cursorField]); + bool isIncreCursor = (cursor.substr(0, increPrefix_.size()) == increPrefix_); + LOGD("extend size: %zu type: %zu expect: %zu, cursor: %s", extend.size(), copyExtend[g_cursorField].index(), + TYPE_INDEX, cursor.c_str()); + cursor = cursor.empty() ? "0" : cursor; + GetCloudData(cursor, isIncreCursor, cloudData_[tableName], data, copyExtend); + return (data.empty() || data.size() < static_cast(queryLimit_)) ? QUERY_END : OK; +} } \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h index bdede955fe3..ac1f09f8096 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h @@ -53,6 +53,8 @@ public: DBStatus Close() override; + DBStatus QueryAllGid(const std::string &tableName, VBucket &extend, std::vector &data) override; + void SetCloudError(bool cloudError); void SetBlockTime(int32_t blockTime); -- Gitee From 67740d51e9a91ddcee6201ef0e369c8510230ec1 Mon Sep 17 00:00:00 2001 From: zqq Date: Tue, 6 Jan 2026 19:00:08 +0800 Subject: [PATCH 2/2] fix issue Signed-off-by: zqq --- .../storage/src/cloud/cloud_meta_data.cpp | 4 ++-- .../relational_sync_able_storage_extend.cpp | 9 ++++---- .../relational/sqlite_relational_utils.cpp | 23 ++++++++----------- .../storage/src/storage_proxy.cpp | 2 +- .../src/cloud/cloud_syncer_extend_extend.cpp | 8 +++++++ 5 files changed, 26 insertions(+), 20 deletions(-) diff --git a/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp b/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp index 6d1b1342bc2..b2b23906b13 100644 --- a/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp +++ b/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp @@ -296,9 +296,9 @@ int CloudMetaData::GetCloudGidCursor(const std::string &tableName, std::string & int CloudMetaData::PutCloudGidCursor(const std::string &tableName, const std::string &cursor) const { - std::lock_guard lock(cloudMetaMutex_); Value blobMetaVal; CloudInfoValue info; + std::lock_guard lock(cloudMetaMutex_); auto errCode = GetCloudInfo(tableName, info); if (errCode != E_OK) { return errCode; @@ -333,9 +333,9 @@ int CloudMetaData::GetPotentialCursor(const std::string &tableName, std::string int CloudMetaData::PutPotentialCursor(const std::string &tableName, std::string &cursor) const { - std::lock_guard lock(cloudMetaMutex_); Value blobMetaVal; CloudInfoValue info; + std::lock_guard lock(cloudMetaMutex_); auto errCode = GetCloudInfo(tableName, info); if (errCode != E_OK) { return errCode; diff --git a/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp b/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp index 5a338d072e0..2d606239e00 100644 --- a/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp +++ b/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp @@ -741,7 +741,7 @@ int RelationalSyncAbleStorage::PutCloudGid(const std::string &tableName, std::ve int RelationalSyncAbleStorage::DeleteCloudNoneExistRecord(const std::string &tableName) { if (storageEngine_ == nullptr) { - LOGE("[AgingCloudNoneExistRecord] Storage is null"); + LOGE("[DeleteCloudNoneExistRecord] Storage is null"); return -E_INVALID_DB; } TableInfo tableInfo = storageEngine_->GetSchema().GetTable(tableName); @@ -759,22 +759,23 @@ int RelationalSyncAbleStorage::DeleteCloudNoneExistRecord(const std::string &tab changedData.field.push_back(dataPk); errCode = GetOneBatchCloudNoneExistRecord(tableName, dataPk, records); if (errCode != E_OK) { - LOGE("[AgingCloudNoneExistRecord] get one batch cloud none exist record failed.%d, tableName:%s", errCode, + LOGE("[DeleteCloudNoneExistRecord] get one batch cloud none exist record failed.%d, tableName:%s", errCode, DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); return errCode; } + LOGW("[DeleteCloudNoneExistRecord] match count is %zu", records.size()); errCode = DeleteOneBatchCloudNoneExistRecord(tableName, changedData, records); if (errCode == -E_FINISHED) { break; } if (errCode != E_OK) { - LOGE("[AgingCloudNoneExistRecord] delete one batch cloud none exist record failed.%d, tableName:%s", + LOGE("[DeleteCloudNoneExistRecord] delete one batch cloud none exist record failed.%d, tableName:%s", errCode, DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); return errCode; } TriggerObserverAction("CLOUD", std::move(changedData), true); if (loopTime >= UINT16_MAX) { - LOGW("[AgingCloudNoneExistRecord] there is too much data that not exist in the cloud, about to exit"); + LOGW("[DeleteCloudNoneExistRecord] there is too much data that not exist in the cloud, about to exit"); break; } std::this_thread::sleep_for(CloudDbConstant::LONG_TRANSACTION_INTERVAL); diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp index 1bec1a18bc6..5d74c553989 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp @@ -1364,23 +1364,20 @@ int SQLiteRelationalUtils::DeleteOneRecord(const std::string &tableName, sqlite3 { if ((record.flag & static_cast(LogInfoFlag::FLAG_LOCAL)) != 0) { std::string sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + - " SET cloud_gid = '' AND version = '' where _rowid_ = " + std::to_string(record.logRowid) + ";"; + " SET cloud_gid = '', version = '' where _rowid_ = " + std::to_string(record.logRowid) + ";"; return SQLiteUtils::ExecuteRawSQL(db, sql); } - if ((record.flag & static_cast(LogInfoFlag::FLAG_CLOUD_WRITE)) != 0) { - std::string sql = "DELETE FROM " + tableName + " WHERE _rowid_ = " + std::to_string(record.dataRowid) + ";"; - int errCode = SQLiteUtils::ExecuteRawSQL(db, sql); - if (errCode != E_OK) { - return errCode; - } - int32_t newFlag = isLogicDelete ? ((record.flag & (0x20 | 0x1 | 0x8)) & (~0x4000)) : - ((record.flag & (0x20 | 0x1)) & (~0x4000)); - sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET cloud_gid = '' AND version = '' AND flag = " + - std::to_string(newFlag) + " where _rowid_ = " + std::to_string(record.logRowid) + ";"; - return SQLiteUtils::ExecuteRawSQL(db, sql); + std::string sql = "DELETE FROM " + tableName + " WHERE _rowid_ = " + std::to_string(record.dataRowid) + ";"; + int errCode = SQLiteUtils::ExecuteRawSQL(db, sql); + if (errCode != E_OK) { + return errCode; } - return E_OK; + int32_t newFlag = isLogicDelete ? ((record.flag & (0x20 | 0x1 | 0x8)) & (~0x4000)) : + ((record.flag & (0x20 | 0x1)) & (~0x4000)); + sql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET cloud_gid = '', version = '', flag = " + + std::to_string(newFlag) + " where _rowid_ = " + std::to_string(record.logRowid) + ";"; + return SQLiteUtils::ExecuteRawSQL(db, sql); } int SQLiteRelationalUtils::DropTempTable(const std::string &tableName, sqlite3 *db) diff --git a/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp b/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp index 2d77e7a74b2..7ee829912c4 100644 --- a/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp +++ b/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp @@ -982,7 +982,7 @@ int StorageProxy::DeleteCloudNoneExistRecord(const std::string &tableName) { std::shared_lock readLock(storeMutex_); if (store_ == nullptr) { - LOGE("[AgingCloudNoneExistRecord] store is null"); + LOGE("[DeleteCloudNoneExistRecord] store is null"); return -E_INVALID_DB; } return store_->DeleteCloudNoneExistRecord(tableName); diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp index f6b1a9a8d08..0c54788d90a 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp @@ -34,6 +34,8 @@ namespace DistributedDB { namespace { constexpr const int MAX_EXPIRED_CURSOR_COUNT = 1; + constexpr const uint64_t MAX_DOWNLOAD_LOOP_TIMES = 10000; + constexpr const uint64_t WARNING_DOWNLOAD_PERIOD = 100; } int CloudSyncer::HandleDownloadResultForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info, DownloadCommitList &commitList, uint32_t &successCount) @@ -239,6 +241,7 @@ int CloudSyncer::DoDownloadInner(TaskId taskId, SyncParam ¶m, bool isFirstDo return ret; } int expiredCursorCount = 0; + uint64_t loopCount = 0; do { ret = DownloadOneBatch(taskId, param, isFirstDownload); if (ret == -E_EXPIRED_CURSOR) { @@ -254,6 +257,11 @@ int CloudSyncer::DoDownloadInner(TaskId taskId, SyncParam ¶m, bool isFirstDo if (ret != E_OK) { return ret; } + loopCount++; + if (loopCount > MAX_DOWNLOAD_LOOP_TIMES && (loopCount % WARNING_DOWNLOAD_PERIOD == 0)) { + LOGW("[CloudSyncer] Table[%s] download too much times, current[%" PRIu64 "]", + DBCommon::StringMiddleMasking(param.info.tableName).c_str(), loopCount); + } } while (!param.isLastBatch); return E_OK; } -- Gitee