From 48f0cda40ff9a83269bede20a474c1676a0d8edf Mon Sep 17 00:00:00 2001 From: zqq Date: Sun, 28 Dec 2025 11:33:47 +0800 Subject: [PATCH 1/3] support expire cursor Signed-off-by: zqq --- .../common/include/cloud/cloud_db_constant.h | 1 + .../distributeddb/common/include/db_common.h | 1 + .../distributeddb/common/include/db_errno.h | 1 + .../common/src/db_common_client.cpp | 7 ++ .../interfaces/include/cloud/icloud_db.h | 4 + .../interfaces/include/store_types.h | 1 + .../interfaces/src/kv_store_errno.cpp | 1 + .../storage/include/cloud/cloud_meta_data.h | 16 ++++ .../include/icloud_sync_storage_interface.h | 15 +++ .../storage/include/storage_proxy.h | 10 ++ .../storage/src/cloud/cloud_meta_data.cpp | 96 +++++++++++++++++++ .../storage/src/storage_proxy.cpp | 52 +++++++++- .../syncer/src/cloud/cloud_db_proxy.cpp | 60 ++++++------ .../syncer/src/cloud/cloud_db_proxy.h | 2 + 14 files changed, 239 insertions(+), 28 deletions(-) diff --git a/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h b/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h index a6d36c54c9..5d10c2c4fe 100644 --- a/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h +++ b/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h @@ -23,6 +23,7 @@ namespace DistributedDB { class CloudDbConstant { public: static constexpr const char *CLOUD_META_TABLE_PREFIX = "naturalbase_cloud_meta_"; + static constexpr const char *CLOUD_INFO_META_PREFIX = "naturalbase_cloud_info_meta_"; static constexpr const char *GID_FIELD = "#_gid"; static constexpr const char *CREATE_FIELD = "#_createTime"; static constexpr const char *MODIFY_FIELD = "#_modifyTime"; diff --git a/frameworks/libs/distributeddb/common/include/db_common.h b/frameworks/libs/distributeddb/common/include/db_common.h index 96a2db07ae..55de410a96 100644 --- a/frameworks/libs/distributeddb/common/include/db_common.h +++ b/frameworks/libs/distributeddb/common/include/db_common.h @@ -30,6 +30,7 @@ public: static int CreateDirectory(const std::string &directory); static void StringToVector(const std::string &src, std::vector &dst); + static std::vector ToVector(const std::string &src); static void VectorToString(const std::vector &src, std::string &dst); static inline std::string GetLogTableName(const std::string &tableName) diff --git a/frameworks/libs/distributeddb/common/include/db_errno.h b/frameworks/libs/distributeddb/common/include/db_errno.h index 3bc7b9caad..645adb6e04 100644 --- a/frameworks/libs/distributeddb/common/include/db_errno.h +++ b/frameworks/libs/distributeddb/common/include/db_errno.h @@ -196,6 +196,7 @@ constexpr const int E_NEED_CORRECT_TARGET_USER = (E_BASE + 209); constexpr const int E_CLOUD_ASSET_NOT_FOUND = (E_BASE + 210); // Cloud download asset return 404 error constexpr const int E_TASK_INTERRUPTED = (E_BASE + 211); // Task(cloud sync, generate log) interrupted constexpr const int E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT = (E_BASE + 212); // Upload failed by cloud space insufficient +constexpr const int E_EXPIRED_CURSOR = (E_BASE + 213); // Cursor invalid in cloud } // namespace DistributedDB #endif // DISTRIBUTEDDB_ERRNO_H diff --git a/frameworks/libs/distributeddb/common/src/db_common_client.cpp b/frameworks/libs/distributeddb/common/src/db_common_client.cpp index fa103ec9fe..723305e1cb 100644 --- a/frameworks/libs/distributeddb/common/src/db_common_client.cpp +++ b/frameworks/libs/distributeddb/common/src/db_common_client.cpp @@ -33,6 +33,13 @@ void DBCommon::StringToVector(const std::string &src, std::vector &dst) dst.assign(src.begin(), src.end()); } +std::vector DBCommon::ToVector(const std::string &src) +{ + std::vector res; + StringToVector(src, res); + return res; +} + std::string DBCommon::StringMiddleMasking(const std::string &name) { if (name.length() <= HEAD_SIZE) { diff --git a/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h b/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h index 79e24a0c66..e72fca098d 100644 --- a/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h +++ b/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h @@ -52,6 +52,10 @@ public: { return this->prepareTraceId; } + virtual DBStatus QueryAllGID(const std::string &tableName, const VBucket &extend, std::vector &data) + { + return NOT_SUPPORT; + } private: std::string prepareTraceId; }; diff --git a/frameworks/libs/distributeddb/interfaces/include/store_types.h b/frameworks/libs/distributeddb/interfaces/include/store_types.h index e74ae3b0c2..d29c61f73c 100644 --- a/frameworks/libs/distributeddb/interfaces/include/store_types.h +++ b/frameworks/libs/distributeddb/interfaces/include/store_types.h @@ -99,6 +99,7 @@ enum DBStatus { CLOUD_ASSET_NOT_FOUND, // The cloud download asset return 404 error TASK_INTERRUPTED, // Task(cloud sync) interrupted SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT, // Whitelist for contact, skip when cloud space insufficient + EXPIRED_CURSOR, // Cursor is out of date in cloud BUTT_STATUS = 27394048 // end of status }; diff --git a/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp b/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp index cbe2ccdd20..e529539914 100644 --- a/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp +++ b/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp @@ -90,6 +90,7 @@ namespace { { -E_CLOUD_ASSET_NOT_FOUND, CLOUD_ASSET_NOT_FOUND }, { -E_TASK_INTERRUPTED, TASK_INTERRUPTED }, { -E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT, SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT }, + { -E_EXPIRED_CURSOR, EXPIRED_CURSOR }, }; } diff --git a/frameworks/libs/distributeddb/storage/include/cloud/cloud_meta_data.h b/frameworks/libs/distributeddb/storage/include/cloud/cloud_meta_data.h index b0081a8aa9..5005e4e972 100644 --- a/frameworks/libs/distributeddb/storage/include/cloud/cloud_meta_data.h +++ b/frameworks/libs/distributeddb/storage/include/cloud/cloud_meta_data.h @@ -47,6 +47,10 @@ public: void CleanWaterMarkInMemory(const TableName &tableName); + int GetCloudGidCursor(const std::string &tableName, std::string &cursor) const; + int PutCloudGidCursor(const std::string &tableName, const std::string &cursor) const; + int CleanCloudInfo(const std::string &tableName) const; + private: typedef struct CloudMetaValue { Timestamp localMark = 0u; @@ -56,6 +60,11 @@ private: std::string cloudMark; } CloudMetaValue; + typedef struct CloudInfoValue { + uint64_t version = 0u; + std::string gidCloudMark; + } CloudInfoValue; + int ReadMarkFromMeta(const TableName &tableName); int WriteMarkToMeta(const TableName &tableName, Timestamp localmark, std::string &cloudMark); int WriteTypeMarkToMeta(const TableName &tableName, CloudMetaValue &cloudMetaValue); @@ -63,6 +72,13 @@ private: int DeserializeMark(Value &blobMark, CloudMetaValue &cloudMetaValue); uint64_t GetParcelCurrentLength(CloudMetaValue &cloudMetaValue); + int GetCloudInfo(const std::string &tableName, CloudInfoValue &info) const ; + static int DeserializeCloudInfo(Value &blobMark, CloudInfoValue &info); + static int SerializeCloudInfo(const CloudInfoValue &info, Value &blobMark); + + static uint64_t GetParcelCurrentLength(const CloudInfoValue &info); + static Key GetCloudInfoKey(const std::string &tableName); + mutable std::mutex cloudMetaMutex_; std::unordered_map cloudMetaVals_; ICloudSyncStorageInterface *store_ = nullptr; diff --git a/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h b/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h index 10e24a43d8..3e8b71b868 100644 --- a/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h +++ b/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h @@ -329,6 +329,21 @@ public: } virtual int WaitAsyncGenLogTaskFinished(const std::vector &tables) = 0; + + virtual int PutCloudGid([[gnu::unused]] const std::string &tableName, [[gnu::unused]] std::vector &data) + { + return E_OK; + } + + virtual int AgingCloudNoneExistRecord([[gnu::unused]] const std::string &tableName) + { + return E_OK; + } + + virtual int DeleteMetaData([[gnu::unused]] const std::vector &keys) + { + return E_OK; + } }; } diff --git a/frameworks/libs/distributeddb/storage/include/storage_proxy.h b/frameworks/libs/distributeddb/storage/include/storage_proxy.h index 3614c9eb25..10e3442838 100644 --- a/frameworks/libs/distributeddb/storage/include/storage_proxy.h +++ b/frameworks/libs/distributeddb/storage/include/storage_proxy.h @@ -199,6 +199,16 @@ public: int WaitAsyncGenLogTaskFinished(const std::vector &tables) const; + // gid contain in data with key #_gid + int PutCloudGid(const std::string &tableName, std::vector &data); + + int GetCloudGidCursor(const std::string &tableName, std::string &cursor); + + int PutCloudGidCursor(const std::string &tableName, const std::string &cursor); + + int CleanCloudInfo(const std::string &tableName); + + int AgingCloudNoneExistRecord(const std::string &tableName); protected: void Init(); private: diff --git a/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp b/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp index b828c85dc8..2cc2b9cace 100644 --- a/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp +++ b/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp @@ -277,4 +277,100 @@ void CloudMetaData::CleanWaterMarkInMemory(const TableName &tableName) cloudMetaVals_[tableName] = {}; LOGD("[Meta] clean cloud water mark in memory"); } + +int CloudMetaData::GetCloudGidCursor(const std::string &tableName, std::string &cursor) const +{ + if (store_ == nullptr) { + return -E_INVALID_DB; + } + CloudInfoValue info; + auto errCode = GetCloudInfo(tableName, info); + if (errCode != E_OK) { + return errCode; + } + cursor = info.gidCloudMark; + return E_OK; +} + +int CloudMetaData::PutCloudGidCursor(const std::string &tableName, const std::string &cursor) const +{ + Value blobMetaVal; + CloudInfoValue info; + auto errCode = GetCloudInfo(tableName, info); + if (errCode != E_OK) { + return errCode; + } + info.gidCloudMark = cursor; + errCode = SerializeCloudInfo(info, blobMetaVal); + if (errCode != E_OK) { + return errCode; + } + if (store_ == nullptr) { + return -E_INVALID_DB; + } + return store_->PutMetaData(GetCloudInfoKey(tableName), blobMetaVal); +} + +int CloudMetaData::CleanCloudInfo(const std::string &tableName) const +{ + if (store_ == nullptr) { + return -E_INVALID_DB; + } + return store_->DeleteMetaData({GetCloudInfoKey(tableName)}); +} + +int CloudMetaData::GetCloudInfo(const std::string &tableName, CloudInfoValue &info) const +{ + Value blobMetaVal; + int ret = store_->GetMetaData(GetCloudInfoKey(tableName), blobMetaVal); + if (ret != -E_NOT_FOUND && ret != E_OK) { + return ret; + } + if (ret == -E_NOT_FOUND) { + return E_OK; + } + return DeserializeCloudInfo(blobMetaVal, info); +} + +int CloudMetaData::DeserializeCloudInfo(Value &blobMark, CloudInfoValue &info) +{ + if (blobMark.empty()) { + info.version = 0; + info.gidCloudMark = ""; + return E_OK; + } + Parcel parcel(blobMark.data(), blobMark.size()); + info.version = 0; + parcel.ReadUInt64(info.version); + parcel.ReadString(info.gidCloudMark); + if (parcel.IsError()) { + LOGE("[CloudMetaData] Parse error cloud info version[%" PRIu32 "]", info.version); + return -E_PARSE_FAIL; + } + return E_OK; +} + +int CloudMetaData::SerializeCloudInfo(const CloudInfoValue &info, Value &blobMark) +{ + uint64_t length = GetParcelCurrentLength(info); + blobMark.resize(length); + Parcel parcel(blobMark.data(), blobMark.size()); + parcel.WriteUInt64(info.version); + parcel.WriteString(info.gidCloudMark); + if (parcel.IsError()) { + LOGE("[CloudMetaData] Parcel error while serializing cloud info."); + return -E_PARSE_FAIL; + } + return E_OK; +} + +uint64_t CloudMetaData::GetParcelCurrentLength(const CloudInfoValue &info) +{ + return Parcel::GetUInt64Len() + Parcel::GetStringLen(info.gidCloudMark); +} + +Key CloudMetaData::GetCloudInfoKey(const std::string &tableName) +{ + return DBCommon::ToVector(CloudDbConstant::CLOUD_INFO_META_PREFIX + tableName); +} } // namespace DistributedDB \ No newline at end of file diff --git a/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp b/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp index 0c236ac517..60957c6e81 100644 --- a/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp +++ b/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp @@ -912,9 +912,59 @@ void StorageProxy::FilterDownloadRecordNoneSchemaField(const std::string &tableN int StorageProxy::WaitAsyncGenLogTaskFinished(const std::vector &tables) const { if (store_ == nullptr) { - LOGW("[WaitAsyncGenLogTaskFinished] store is null"); + LOGE("[WaitAsyncGenLogTaskFinished] store is null"); return -E_INVALID_DB; } return store_->WaitAsyncGenLogTaskFinished(tables); } + +int StorageProxy::PutCloudGid(const std::string &tableName, std::vector &data) +{ + std::shared_lock readLock(storeMutex_); + if (store_ == nullptr) { + LOGE("[PutCloudGid] store is null"); + return -E_INVALID_DB; + } + return store_->PutCloudGid(tableName, data); +} + +int StorageProxy::GetCloudGidCursor(const std::string &tableName, std::string &cursor) +{ + std::shared_lock readLock(storeMutex_); + if (cloudMetaData_ == nullptr) { + LOGE("[GetCloudGidCursor] meta is null"); + return -E_INVALID_DB; + } + return cloudMetaData_->GetCloudGidCursor(tableName, cursor); +} + +int StorageProxy::PutCloudGidCursor(const std::string &tableName, const std::string &cursor) +{ + std::shared_lock readLock(storeMutex_); + if (cloudMetaData_ == nullptr) { + LOGE("[PutCloudGidCursor] meta is null"); + return -E_INVALID_DB; + } + return cloudMetaData_->PutCloudGidCursor(tableName, cursor); +} + +int StorageProxy::CleanCloudInfo(const std::string &tableName) +{ + std::shared_lock readLock(storeMutex_); + if (cloudMetaData_ == nullptr) { + LOGE("[CleanCloudInfo] meta is null"); + return -E_INVALID_DB; + } + return cloudMetaData_->CleanCloudInfo(tableName); +} + +int StorageProxy::AgingCloudNoneExistRecord(const std::string &tableName) +{ + std::shared_lock readLock(storeMutex_); + if (store_ == nullptr) { + LOGE("[AgingCloudNoneExistRecord] store is null"); + return -E_INVALID_DB; + } + return store_->AgingCloudNoneExistRecord(tableName); +} } \ No newline at end of file diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp index e616fe9b9c..6298223367 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp @@ -482,33 +482,26 @@ int CloudDBProxy::GetInnerErrorCode(DBStatus status) if (status < DB_ERROR || status >= BUTT_STATUS) { return static_cast(status); } - switch (status) { - case OK: - case LOCAL_ASSET_NOT_FOUND: - return E_OK; - case CLOUD_NETWORK_ERROR: - return -E_CLOUD_NETWORK_ERROR; - case CLOUD_SYNC_UNSET: - return -E_CLOUD_SYNC_UNSET; - case CLOUD_FULL_RECORDS: - return -E_CLOUD_FULL_RECORDS; - case CLOUD_LOCK_ERROR: - return -E_CLOUD_LOCK_ERROR; - case CLOUD_ASSET_SPACE_INSUFFICIENT: - return -E_CLOUD_ASSET_SPACE_INSUFFICIENT; - case CLOUD_VERSION_CONFLICT: - return -E_CLOUD_VERSION_CONFLICT; - case CLOUD_RECORD_EXIST_CONFLICT: - return -E_CLOUD_RECORD_EXIST_CONFLICT; - case CLOUD_DISABLED: - return -E_CLOUD_DISABLED; - case CLOUD_ASSET_NOT_FOUND: - return -E_CLOUD_ASSET_NOT_FOUND; - case SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT: - return -E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT; - default: - return -E_CLOUD_ERROR; - } + static const std::map CLOUD_ERROR = { + {OK, E_OK}, + {LOCAL_ASSET_NOT_FOUND, E_OK}, + {CLOUD_NETWORK_ERROR, -E_CLOUD_NETWORK_ERROR}, + {CLOUD_SYNC_UNSET, -E_CLOUD_SYNC_UNSET}, + {CLOUD_FULL_RECORDS, -E_CLOUD_FULL_RECORDS}, + {CLOUD_LOCK_ERROR, -E_CLOUD_LOCK_ERROR}, + {CLOUD_ASSET_SPACE_INSUFFICIENT, -E_CLOUD_ASSET_SPACE_INSUFFICIENT}, + {CLOUD_VERSION_CONFLICT, -E_CLOUD_VERSION_CONFLICT}, + {CLOUD_RECORD_EXIST_CONFLICT, -E_CLOUD_RECORD_EXIST_CONFLICT}, + {CLOUD_DISABLED, -E_CLOUD_DISABLED}, + {CLOUD_ASSET_NOT_FOUND, -E_CLOUD_ASSET_NOT_FOUND}, + {SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT, -E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT}, + {EXPIRED_CURSOR, -E_EXPIRED_CURSOR}, + }; + auto iter = CLOUD_ERROR.find(status); + if (iter != CLOUD_ERROR.end()) { + return iter->second; + } + return -E_CLOUD_ERROR; } DBStatus CloudDBProxy::QueryAction(const std::shared_ptr &context, @@ -857,4 +850,17 @@ std::weak_ptr CloudDBProxy::GetCloudConflictHandler() std::shared_lock writeLock(handlerMutex_); return std::weak_ptr(conflictHandler_); } + +int CloudDBProxy::QueryAllGID(const std::string &tableName, const VBucket &extend, std::vector &data) +{ + std::shared_lock readLock(cloudMutex_); + if (iCloudDb_ == nullptr) { + return -E_CLOUD_ERROR; + } + auto ret = iCloudDb_->QueryAllGID(tableName, extend, data); + if (ret != DBStatus::OK) { + LOGE("[CloudDBProxy] QueryAllGid failed[%d]", ret); + } + return GetInnerErrorCode(ret); +} } diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h index 07a1bdfee8..55addf602e 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h @@ -89,6 +89,8 @@ public: std::weak_ptr GetCloudConflictHandler(); + int QueryAllGID(const std::string &tableName, const VBucket &extend, std::vector &data); + static int GetInnerErrorCode(DBStatus status); protected: class CloudActionContext { -- Gitee From b8bfb90dfdd545972fe358e1d076fded8cfe8c7b Mon Sep 17 00:00:00 2001 From: zqq Date: Tue, 30 Dec 2025 14:49:47 +0800 Subject: [PATCH 2/3] save gid after expired cursor Signed-off-by: zqq --- .../distributeddb/common/include/db_common.h | 5 + .../common/include/db_constant.h | 1 + .../interfaces/include/cloud/icloud_db.h | 2 +- .../relational/relational_sync_able_storage.h | 2 + .../relational_sync_able_storage_extend.cpp | 9 ++ .../relational/sqlite_relational_utils.cpp | 57 ++++++++ .../relational/sqlite_relational_utils.h | 2 + ...qlite_single_relational_storage_engine.cpp | 31 +++++ .../sqlite_single_relational_storage_engine.h | 2 + ...single_ver_relational_storage_executor.cpp | 5 + ...e_single_ver_relational_storage_executor.h | 2 + .../syncer/src/cloud/cloud_db_proxy.cpp | 5 +- .../syncer/src/cloud/cloud_db_proxy.h | 2 +- .../syncer/src/cloud/cloud_syncer.cpp | 24 +--- .../syncer/src/cloud/cloud_syncer.h | 12 ++ .../src/cloud/cloud_syncer_extend_extend.cpp | 127 ++++++++++++++++++ .../distributeddb_rdb_complex_cloud_test.cpp | 32 +++++ .../common/syncer/cloud/virtual_cloud_db.cpp | 12 ++ .../common/syncer/cloud/virtual_cloud_db.h | 2 + 19 files changed, 311 insertions(+), 23 deletions(-) diff --git a/frameworks/libs/distributeddb/common/include/db_common.h b/frameworks/libs/distributeddb/common/include/db_common.h index 55de410a96..d1c25e6be0 100644 --- a/frameworks/libs/distributeddb/common/include/db_common.h +++ b/frameworks/libs/distributeddb/common/include/db_common.h @@ -38,6 +38,11 @@ public: return DBConstant::RELATIONAL_PREFIX + tableName + DBConstant::LOG_POSTFIX; } + static inline std::string GetTmpLogTableName(const std::string &tableName) + { + return DBConstant::RELATIONAL_PREFIX + tableName + DBConstant::LOG_POSTFIX + DBConstant::TMP_POSTFIX; + } + static inline const std::vector GetWaterTypeVec() { return {CloudWaterType::DELETE, CloudWaterType::UPDATE, CloudWaterType::INSERT}; diff --git a/frameworks/libs/distributeddb/common/include/db_constant.h b/frameworks/libs/distributeddb/common/include/db_constant.h index 6de2f46a98..a0c15ad39f 100644 --- a/frameworks/libs/distributeddb/common/include/db_constant.h +++ b/frameworks/libs/distributeddb/common/include/db_constant.h @@ -162,6 +162,7 @@ public: static constexpr size_t RELATIONAL_PREFIX_SIZE = 20; static constexpr const char *TIMESTAMP_ALIAS = "naturalbase_rdb_aux_timestamp"; static constexpr const char *LOG_POSTFIX = "_log"; + static constexpr const char *TMP_POSTFIX = "_tmp"; static constexpr const char *META_TABLE_POSTFIX = "metadata"; static constexpr const char *KNOWLEDGE_TABLE_TYPE = "knowledge"; diff --git a/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h b/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h index e72fca098d..333e2911f1 100644 --- a/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h +++ b/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h @@ -52,7 +52,7 @@ public: { return this->prepareTraceId; } - virtual DBStatus QueryAllGID(const std::string &tableName, const VBucket &extend, std::vector &data) + virtual DBStatus QueryAllGID(const std::string &tableName, VBucket &extend, std::vector &data) { return NOT_SUPPORT; } diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h b/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h index eba410cff8..f6e087911a 100644 --- a/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h +++ b/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h @@ -281,6 +281,8 @@ public: int WaitAsyncGenLogTaskFinished(const std::vector &tables) override; int ResetGenLogTaskStatus(); + + int PutCloudGid(const std::string &tableName, std::vector &data) override; protected: int FillReferenceData(CloudSyncData &syncData); diff --git a/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp b/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp index 82aff8809a..3774d821eb 100644 --- a/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp +++ b/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp @@ -723,5 +723,14 @@ int RelationalSyncAbleStorage::ResetGenLogTaskStatus() storageEngine_->ResetGenLogTaskStatus(); return E_OK; } + +int RelationalSyncAbleStorage::PutCloudGid(const std::string &tableName, std::vector &data) +{ + if (storageEngine_ == nullptr) { + LOGE("[RelationalSyncAbleStorage] Storage is null when put cloud gid"); + return -E_INVALID_DB; + } + return storageEngine_->PutCloudGid(tableName, data); +} } #endif diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp index 0149f26f08..691cbcde92 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp @@ -1256,5 +1256,62 @@ void SQLiteRelationalUtils::FillSyncInfo(const CloudSyncOption &option, const Sy info.prepareTraceId = option.prepareTraceId; info.asyncDownloadAssets = option.asyncDownloadAssets; } + +int SQLiteRelationalUtils::PutCloudGid(sqlite3 *db, const std::string &tableName, std::vector &data) +{ + // create tmp table if table not exists + std::string sql = "CREATE TABLE IF NOT EXISTS " + DBCommon::GetTmpLogTableName(tableName) + + "(ID integer primary key autoincrement, cloud_gid TEXT UNIQUE ON CONFLICT IGNORE)"; + int errCode = SQLiteUtils::ExecuteRawSQL(db, sql); + if (errCode != E_OK) { + LOGE("[RDBUtils] Create gid table failed[%d]", errCode); + return errCode; + } + // insert all gid + sql = "INSERT INTO " + DBCommon::GetTmpLogTableName(tableName) + "(cloud_gid) VALUES(?)"; + sqlite3_stmt *stmt = nullptr; + errCode = SQLiteUtils::GetStatement(db, sql, stmt); + if (stmt == nullptr) { + LOGE("[RDBUtils] Get insert gid stmt failed[%d]", errCode); + return errCode; + } + ResFinalizer finalizer([stmt]() { + sqlite3_stmt *releaseStmt = stmt; + int ret = E_OK; + SQLiteUtils::ResetStatement(releaseStmt, true, ret); + if (ret != E_OK) { + LOGW("[RDBUtils] Reset cloud gid stmt failed[%d]", ret); + } + }); + for (const auto &item : data) { + auto iter = item.find(CloudDbConstant::GID_FIELD); + if (iter == item.end()) { + LOGW("[RDBUtils] Cloud gid info not contain gid field"); + continue; + } + auto gid = std::get_if(&iter->second); + if (gid == nullptr) { + LOGW("[RDBUtils] Cloud gid info not contain wrong type[%zu]", iter->second.index()); + continue; + } + errCode = SQLiteUtils::BindTextToStatement(stmt, 1, *gid); + if (errCode != E_OK) { + LOGE("[RDBUtils] Bind cloud gid failed[%d]", errCode); + return errCode; + } + errCode = SQLiteUtils::StepNext(stmt); + if (errCode != -E_FINISHED) { + LOGE("[RDBUtils] Step cloud gid failed[%d]", errCode); + return errCode; + } + errCode = E_OK; + SQLiteUtils::ResetStatement(stmt, false, errCode); + if (errCode != E_OK) { + LOGE("[RDBUtils] Reset cloud gid stmt failed[%d]", errCode); + return errCode; + } + } + return E_OK; +} #endif } // namespace DistributedDB \ No newline at end of file diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h index 60f61115ea..72ef275cf1 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h @@ -150,6 +150,8 @@ public: #ifdef USE_DISTRIBUTEDDB_CLOUD static void FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess, ICloudSyncer::CloudTaskInfo &info); + + static int PutCloudGid(sqlite3 *db, const std::string &tableName, std::vector &data); #endif private: static int BindExtendStatementByType(sqlite3_stmt *statement, int cid, Type &typeVal); diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp index a847dc5034..0b636d0d8f 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp @@ -1677,5 +1677,36 @@ std::pair SQLiteSingleRelationalStorageEngine::AnalyzeTable(cons }); return handle->AnalyzeTable(tableName); } + +int SQLiteSingleRelationalStorageEngine::PutCloudGid(const std::string &tableName, std::vector &data) +{ + int errCode = E_OK; + auto *handle = static_cast(FindExecutor(true, OperatePerm::NORMAL_PERM, + errCode)); + if (handle == nullptr) { + return errCode; + } + ResFinalizer finalizer([this, handle]() { + auto releaseHandle = handle; + ReleaseExecutor(releaseHandle); + }); + errCode = handle->StartTransaction(TransactType::IMMEDIATE); + if (errCode != E_OK) { + return errCode; + } + errCode = handle->PutCloudGid(tableName, data); + if (errCode == E_OK) { + errCode = handle->Commit(); + if (errCode != E_OK) { + LOGE("[RDBEngine] Commit transaction failed[%d] when put cloud gid", errCode); + } + } else { + int ret = handle->Rollback(); + if (ret != E_OK) { + LOGW("[RDBEngine] Rollback transaction failed[%d] when put cloud gid", ret); + } + } + return errCode; +} } #endif \ No newline at end of file diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h index fe6b1ceb4e..0b029635a3 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h @@ -106,6 +106,8 @@ public: std::vector> &tasks); std::pair AnalyzeTable(const std::string &tableName); + + int PutCloudGid(const std::string &tableName, std::vector &data); protected: StorageExecutor *NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb) override; int Upgrade(sqlite3 *db) override; diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp index f03b734b41..514eed6aca 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp @@ -2003,5 +2003,10 @@ std::pair SQLiteSingleVerRelationalStorageExecutor::AnalyzeTable { return SQLiteRelationalUtils::AnalyzeTable(dbHandle_, tableName); } + +int SQLiteSingleVerRelationalStorageExecutor::PutCloudGid(const std::string &tableName, std::vector &data) +{ + return SQLiteRelationalUtils::PutCloudGid(dbHandle_, tableName, data); +} } // namespace DistributedDB #endif diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h index 11a0c7f068..fce159f9dd 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h @@ -284,6 +284,8 @@ public: int GetLocalDataByRowid(const TableInfo &table, const TableSchema &tableSchema, DataInfoWithLog &dataInfoWithLog); std::pair AnalyzeTable(const std::string &tableName) const; + + int PutCloudGid(const std::string &tableName, std::vector &data); private: int UpdateHashKeyWithOutPk(DistributedTableMode mode, const TableInfo &tableInfo, TableSyncType syncType, const std::string &localIdentity); diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp index 6298223367..f37bcdd726 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp @@ -496,6 +496,7 @@ int CloudDBProxy::GetInnerErrorCode(DBStatus status) {CLOUD_ASSET_NOT_FOUND, -E_CLOUD_ASSET_NOT_FOUND}, {SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT, -E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT}, {EXPIRED_CURSOR, -E_EXPIRED_CURSOR}, + {QUERY_END, -E_QUERY_END}, }; auto iter = CLOUD_ERROR.find(status); if (iter != CLOUD_ERROR.end()) { @@ -851,14 +852,14 @@ std::weak_ptr CloudDBProxy::GetCloudConflictHandler() return std::weak_ptr(conflictHandler_); } -int CloudDBProxy::QueryAllGID(const std::string &tableName, const VBucket &extend, std::vector &data) +int CloudDBProxy::QueryAllGID(const std::string &tableName, VBucket &extend, std::vector &data) { std::shared_lock readLock(cloudMutex_); if (iCloudDb_ == nullptr) { return -E_CLOUD_ERROR; } auto ret = iCloudDb_->QueryAllGID(tableName, extend, data); - if (ret != DBStatus::OK) { + if (ret != DBStatus::OK && ret != DBStatus::QUERY_END) { LOGE("[CloudDBProxy] QueryAllGid failed[%d]", ret); } return GetInnerErrorCode(ret); diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h index 55addf602e..4fc0c6bf6d 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h @@ -89,7 +89,7 @@ public: std::weak_ptr GetCloudConflictHandler(); - int QueryAllGID(const std::string &tableName, const VBucket &extend, std::vector &data); + int QueryAllGID(const std::string &tableName, VBucket &extend, std::vector &data); static int GetInnerErrorCode(DBStatus status); protected: diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp index e1890fe3dc..03ab7e7668 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp @@ -1139,22 +1139,6 @@ int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload) return errCode; } -int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload) -{ - // Query data by batch until reaching end and not more data need to be download - int ret = PreCheck(taskId, param.info.tableName); - if (ret != E_OK) { - return ret; - } - do { - ret = DownloadOneBatch(taskId, param, isFirstDownload); - if (ret != E_OK) { - return ret; - } - } while (!param.isLastBatch); - return E_OK; -} - void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info) { std::lock_guard autoLock(dataLock_); @@ -2109,9 +2093,11 @@ int CloudSyncer::DownloadDataFromCloud(TaskId taskId, SyncParam ¶m, bool isF // Won't break here since downloadData may not be null param.isLastBatch = true; } else if (ret != E_OK) { - std::lock_guard autoLock(dataLock_); - param.info.tableStatus = ProcessStatus::FINISHED; - currentContext_.notifier->UpdateProcess(param.info); + if (ret != -E_EXPIRED_CURSOR) { + std::lock_guard autoLock(dataLock_); + param.info.tableStatus = ProcessStatus::FINISHED; + currentContext_.notifier->UpdateProcess(param.info); + } return ret; } diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h index 93ddbb2598..190dbf3c60 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h @@ -570,6 +570,18 @@ protected: void SetCurrentTmpError(int errCode); + int DoUpdateExpiredCursor(TaskId taskId, const std::string &table); + + int DownloadOneBatchGID(TaskId taskId, SyncParam ¶m); + + int CleanExpiredCursor(SyncParam ¶m); + + int DownloadGIDFromCloud(SyncParam ¶m); + + int SaveGIDRecord(SyncParam ¶m); + + int SaveGIDCursor(SyncParam ¶m); + mutable std::mutex dataLock_; TaskId lastTaskId_; std::multimap> taskQueue_; diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp index 2738e50a12..539645c709 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp @@ -32,6 +32,9 @@ #include "version.h" namespace DistributedDB { +namespace { + constexpr const int MAX_EXPIRED_CURSOR_COUNT = 1; +} int CloudSyncer::HandleDownloadResultForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info, DownloadCommitList &commitList, uint32_t &successCount) { @@ -227,4 +230,128 @@ void CloudSyncer::SetCurrentTmpError(int errCode) cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode; cloudTaskInfos_[currentContext_.currentTaskId].tempErrCode = errCode; } + +int CloudSyncer::DoDownloadInner(TaskId taskId, SyncParam ¶m, bool isFirstDownload) +{ + // Query data by batch until reaching end and not more data need to be download + int ret = PreCheck(taskId, param.info.tableName); + if (ret != E_OK) { + return ret; + } + int expiredCursorCount = 0; + do { + ret = DownloadOneBatch(taskId, param, isFirstDownload); + if (ret == -E_EXPIRED_CURSOR) { + expiredCursorCount++; + if (expiredCursorCount > MAX_EXPIRED_CURSOR_COUNT) { + LOGE("[CloudSyncer] Table[%s] too much expired cursor count[%d]", + DBCommon::StringMiddleMasking(param.info.tableName).c_str(), expiredCursorCount); + return ret; + } + param.isLastBatch = false; + ret = DoUpdateExpiredCursor(taskId, param.info.tableName); + } + if (ret != E_OK) { + return ret; + } + } while (!param.isLastBatch); + return E_OK; +} + +int CloudSyncer::DoUpdateExpiredCursor(TaskId taskId, const std::string &table) +{ + LOGI("[CloudSyncer] Update expired cursor now, table[%s]", DBCommon::StringMiddleMasking(table).c_str()); + if (storageProxy_ == nullptr) { + LOGE("[CloudSyncer] storage is nullptr when update expired cursor"); + return -E_INTERNAL_ERROR; + } + SyncParam param; + param.tableName = table; + auto errCode = storageProxy_->GetCloudGidCursor(param.tableName, param.cloudWaterMark); + if (errCode != E_OK) { + return errCode; + } + int retryCount = 0; + do { + int ret = DownloadOneBatchGID(taskId, param); + if (ret == -E_EXPIRED_CURSOR && retryCount < MAX_EXPIRED_CURSOR_COUNT) { + retryCount++; + param.cloudWaterMark = ""; + continue; + } + if (ret != E_OK) { + return ret; + } + } while (!param.isLastBatch); + errCode = storageProxy_->AgingCloudNoneExistRecord(param.tableName); + if (errCode != E_OK) { + return errCode; + } + errCode = CleanExpiredCursor(param); + return errCode; +} + +int CloudSyncer::DownloadOneBatchGID(TaskId taskId, SyncParam ¶m) +{ + int ret = CheckTaskIdValid(taskId); + if (ret != E_OK) { + return ret; + } + ret = DownloadGIDFromCloud(param); + if (ret != E_OK) { + return ret; + } + ret = SaveGIDRecord(param); + if (ret != E_OK) { + return ret; + } + return SaveGIDCursor(param); +} + +int CloudSyncer::CleanExpiredCursor(SyncParam ¶m) +{ + int errCode = storageProxy_->CleanCloudInfo(param.tableName); + if (errCode != E_OK) { + return errCode; + } + return storageProxy_->CleanWaterMark(param.tableName); +} + +int CloudSyncer::DownloadGIDFromCloud(SyncParam ¶m) +{ + VBucket extend; + extend[CloudDbConstant::CURSOR_FIELD] = param.cloudWaterMark; + int errCode = cloudDB_.QueryAllGID(param.tableName, extend, param.downloadData.data); + if (errCode == -E_QUERY_END) { + errCode = E_OK; + param.isLastBatch = true; + } + if (errCode != E_OK) { + LOGE("[CloudSyncer] Query cloud gid failed[%d]", errCode); + } else if(!param.downloadData.data.empty()) { + const auto &record = param.downloadData.data[param.downloadData.data.size() - 1u]; + auto iter = record.find(CloudDbConstant::CURSOR_FIELD); + if (iter == record.end()) { + LOGE("[CloudSyncer] Cloud gid record no exist cursor"); + return -E_CLOUD_ERROR; + } + auto cursor = std::get_if(&iter->second); + if (cursor == nullptr) { + LOGE("[CloudSyncer] Cloud gid record cursor is no str, type[%zu]", iter->second.index()); + return -E_CLOUD_ERROR; + } + param.cloudWaterMark = *cursor; + } + return errCode; +} + +int CloudSyncer::SaveGIDRecord(SyncParam ¶m) +{ + return storageProxy_->PutCloudGid(param.tableName, param.downloadData.data); +} + +int CloudSyncer::SaveGIDCursor(SyncParam ¶m) +{ + return storageProxy_->PutCloudGidCursor(param.tableName, param.cloudWaterMark); +} } // namespace DistributedDB \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_rdb_complex_cloud_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_rdb_complex_cloud_test.cpp index 6c49fb3269..a8bae4650d 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_rdb_complex_cloud_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_rdb_complex_cloud_test.cpp @@ -147,5 +147,37 @@ HWTEST_F(DistributedDBRDBComplexCloudTest, RemoveData001, TestSize.Level0) ASSERT_NE(delegate, nullptr); EXPECT_EQ(delegate->RemoveDeviceData("", ClearMode::FLAG_ONLY), DBStatus::OK); } + +/** + * @tc.name: ExpireCursor001 + * @tc.desc: Test sync with expire cursor. + * @tc.type: FUNC + * @tc.author: zqq + */ +HWTEST_F(DistributedDBRDBComplexCloudTest, ExpireCursor001, TestSize.Level0) +{ + /** + * @tc.steps:step1. store1 insert one data + * @tc.expected: step1. insert success. + */ + auto ret = ExecuteSQL("INSERT INTO CLOUD_SYNC_TABLE_A VALUES(1, 1, 'text1', 'text2', 'uuid1')", info1_); + ASSERT_EQ(ret, E_OK); + /** + * @tc.steps:step2. store1 push and store2 pull + * @tc.expected: step2. sync success and stringCol2 is null because store1 don't sync stringCol2. + */ + Query query = Query::Select().FromTable({CLOUD_SYNC_TABLE_A}); + EXPECT_NO_FATAL_FAILURE(CloudBlockSync(info1_, query, SyncMode::SYNC_MODE_CLOUD_MERGE, OK, OK)); + EXPECT_NO_FATAL_FAILURE(CloudBlockSync(info2_, query, SyncMode::SYNC_MODE_CLOUD_MERGE, OK, OK)); + auto cloudDB = GetVirtualCloudDb(); + ASSERT_NE(cloudDB, nullptr); + std::atomic count = 0; + cloudDB->ForkAfterQueryResult([&count](VBucket &, std::vector &) { + count++; + return count == 1 ? DBStatus::EXPIRED_CURSOR : DBStatus::QUERY_END; + }); + EXPECT_NO_FATAL_FAILURE(CloudBlockSync(info2_, query, SyncMode::SYNC_MODE_CLOUD_MERGE, OK, OK)); + cloudDB->ForkAfterQueryResult(nullptr); +} } #endif \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp index 1e19a22148..1fb8d75c0c 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp @@ -723,4 +723,16 @@ void VirtualCloudDb::SetUploadRecordStatus(DBStatus status) { uploadRecordStatus_ = status; } + +DBStatus VirtualCloudDb::QueryAllGID(const std::string &tableName, VBucket &extend, std::vector &data) +{ + VBucket copyExtend = extend; + std::string cursor = std::get(copyExtend[g_cursorField]); + bool isIncreCursor = (cursor.substr(0, increPrefix_.size()) == increPrefix_); + LOGD("extend size: %zu type: %zu expect: %zu, cursor: %s", extend.size(), copyExtend[g_cursorField].index(), + TYPE_INDEX, cursor.c_str()); + cursor = cursor.empty() ? "0" : cursor; + GetCloudData(cursor, isIncreCursor, cloudData_[tableName], data, copyExtend); + return (data.empty() || data.size() < static_cast(queryLimit_)) ? QUERY_END : OK; +} } \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h index bdede955fe..821ffab322 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h @@ -53,6 +53,8 @@ public: DBStatus Close() override; + DBStatus QueryAllGID(const std::string &tableName, VBucket &extend, std::vector &data) override; + void SetCloudError(bool cloudError); void SetBlockTime(int32_t blockTime); -- Gitee From 242215a3424ce5a61b1b5b975235338431c3615a Mon Sep 17 00:00:00 2001 From: liao-yonghuang Date: Tue, 30 Dec 2025 15:50:09 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E3=80=90RDB=E3=80=91=E3=80=90=E7=AB=AF?= =?UTF-8?q?=E4=BA=91=E3=80=91=E3=80=90cursor=E5=A4=B1=E6=95=88=E3=80=91?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=BA=91=E4=B8=8A=E4=B8=8D=E5=AD=98=E5=9C=A8?= =?UTF-8?q?=E7=9A=84=E6=9C=AC=E5=9C=B0=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: liao-yonghuang --- .../common/include/cloud/cloud_db_constant.h | 2 +- .../relational/relational_sync_able_storage.h | 10 ++ .../relational_sync_able_storage_extend.cpp | 123 ++++++++++++++++++ .../relational/sqlite_relational_utils.cpp | 63 +++++++++ .../relational/sqlite_relational_utils.h | 13 ++ ...qlite_single_relational_storage_engine.cpp | 2 +- 6 files changed, 211 insertions(+), 2 deletions(-) diff --git a/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h b/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h index 5d10c2c4fe..1c45df0d4f 100644 --- a/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h +++ b/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h @@ -105,7 +105,7 @@ public: static constexpr std::chrono::milliseconds DFX_TIME_THRESHOLD = std::chrono::milliseconds(1000); - static constexpr std::chrono::milliseconds ASYNC_GEN_LOG_INTERVAL = std::chrono::milliseconds(20); + static constexpr std::chrono::milliseconds LONG_TRANSACTION_INTERVAL = std::chrono::milliseconds(50); static constexpr std::chrono::milliseconds LONG_TIME_TRANSACTION = std::chrono::milliseconds(1000); // change local flag to :keep async download asset[0x1000] diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h b/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h index f6e087911a..3c0ec305cc 100644 --- a/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h +++ b/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h @@ -283,6 +283,8 @@ public: int ResetGenLogTaskStatus(); int PutCloudGid(const std::string &tableName, std::vector &data) override; + + int AgingCloudNoneExistRecord(const std::string &tableName); protected: int FillReferenceData(CloudSyncData &syncData); @@ -361,6 +363,14 @@ private: int StartTransactionForAsyncDownload(TransactType type); +#ifdef USE_DISTRIBUTEDDB_CLOUD + int GetOneBatchCloudNoneExistRecord(const std::string &tableName, const std::string &dataPk, + std::vector &records); + + int AgingOneBatchCloudNoneExistRecord(const std::string &tableName, ChangedData &changedData, + const std::vector &records); +#endif + // data std::shared_ptr storageEngine_ = nullptr; std::function onSchemaChanged_; diff --git a/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp b/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp index 3774d821eb..85f4e34287 100644 --- a/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp +++ b/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp @@ -732,5 +732,128 @@ int RelationalSyncAbleStorage::PutCloudGid(const std::string &tableName, std::ve } return storageEngine_->PutCloudGid(tableName, data); } + +#ifdef USE_DISTRIBUTEDDB_CLOUD +int RelationalSyncAbleStorage::AgingCloudNoneExistRecord(const std::string &tableName) +{ + if (storageEngine_ == nullptr) { + LOGE("[AgingCloudNoneExistRecord] Storage is null"); + return -E_INVALID_DB; + } + TableInfo tableInfo = storageEngine_->GetSchema().GetTable(tableName); + bool isPkUseRowid = tableInfo.IsNoPkTable() || tableInfo.IsMultiPkTable(); + std::string dataPk = isPkUseRowid ? "_rowid_" : tableInfo.GetPrimaryKey().at(0); + + int errCode = E_OK; + do { + std::vector records; + ChangedData changedData; + changedData.type = ChangedDataType::DATA; + changedData.tableName = tableName; + changedData.field.push_back(dataPk); + errCode = GetOneBatchCloudNoneExistRecord(tableName, dataPk, records); + if (errCode == -E_FINISHED) { + break; + } + if (errCode != E_OK) { + LOGE("[AgingCloudNoneExistRecord] get one batch cloud none exist record failed.%d, tableName:%s", errCode, + DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); + return errCode; + } + errCode = AgingOneBatchCloudNoneExistRecord(tableName, changedData, records); + if (errCode != E_OK) { + LOGE("[AgingCloudNoneExistRecord] aging one batch cloud none exist record failed.%d, tableName:%s", + errCode, DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); + return errCode; + } + TriggerObserverAction("CLOUD", std::move(changedData), true); + std::this_thread::sleep_for(CloudDbConstant::LONG_TRANSACTION_INTERVAL); + } while (true); + return E_OK; +} + +int RelationalSyncAbleStorage::GetOneBatchCloudNoneExistRecord(const std::string &tableName, const std::string &dataPk, + std::vector &records) +{ + int errCode = E_OK; + auto *readHandle = static_cast( + storageEngine_->FindExecutor(false, OperatePerm::NORMAL_PERM, errCode)); + if (readHandle == nullptr) { + return errCode; + } + errCode = readHandle->StartTransaction(TransactType::DEFERRED); + if (errCode != E_OK) { + ReleaseHandle(readHandle); + return errCode; + } + sqlite3 *db = nullptr; + errCode = readHandle->GetDbHandle(db); + if (errCode != E_OK) { + readHandle->Rollback(); + ReleaseHandle(readHandle); + return errCode; + } + errCode = SQLiteRelationalUtils::GetOneBatchAgingCloudRecord(tableName, db, records, dataPk); + if (errCode != E_OK) { + LOGE("[AgingCloudNoneExistRecord] get aging cloud none exist record failed.%d, tableName:%s", errCode, + DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); + readHandle->Rollback(); + ReleaseHandle(readHandle); + return errCode; + } + readHandle->Commit(); + ReleaseHandle(readHandle); + if (records.empty()) { + return -E_FINISHED; + } + return E_OK; +} + +int RelationalSyncAbleStorage::AgingOneBatchCloudNoneExistRecord(const std::string &tableName, + ChangedData &changedData, const std::vector &records) +{ + auto start = std::chrono::steady_clock::now(); + int errCode = E_OK; + auto *writeHandle = static_cast( + storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode)); + if (writeHandle == nullptr) { + return errCode; + } + errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE); + if (errCode != E_OK) { + ReleaseHandle(writeHandle); + return errCode; + } + sqlite3 *db = nullptr; + errCode = writeHandle->GetDbHandle(db); + if (errCode != E_OK) { + writeHandle->Rollback(); + ReleaseHandle(writeHandle); + return errCode; + } + std::vector dataVec; + for (auto &record : records) { + errCode = SQLiteRelationalUtils::AgingOneRecord(tableName, db, record, logicDelete_); + if (errCode != E_OK) { + LOGE("[AgingOneBatchCloudNoneExistRecord] aging cloud none exist record failed.%d, tableName:%s", errCode, + DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); + writeHandle->Rollback(); + ReleaseHandle(writeHandle); + return errCode; + } + dataVec.emplace_back(record.pkValue); + auto duration = std::chrono::steady_clock::now() - start; + if (duration > CloudDbConstant::LONG_TIME_TRANSACTION) { + LOGI("[AgingOneBatchCloudNoneExistRecord] aging one batch cloud none exist record cost %" PRId64 "ms.", + duration.count()); + break; + } + } + writeHandle->Commit(); + ReleaseHandle(writeHandle); + changedData.primaryData[ChangeType::OP_DELETE].push_back(dataVec); + return E_OK; +} +#endif } #endif diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp index 691cbcde92..4e0601b114 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp @@ -1313,5 +1313,68 @@ int SQLiteRelationalUtils::PutCloudGid(sqlite3 *db, const std::string &tableName } return E_OK; } + +int SQLiteRelationalUtils::GetOneBatchAgingCloudRecord(const std::string &tableName, sqlite3 *db, + std::vector &records, const std::string &dataPk) +{ + std::string sql = "SELECT a._rowid_, a.data_key, b." + dataPk + ", a.flag FROM " + + DBCommon::GetLogTableName(tableName) + " AS a LEFT JOIN " + tableName + " AS b ON (a.data_key = b._rowid_) " + "WHERE a.cloud_gid IS NOT '' AND a.cloud_gid NOT IN (SELECT cloud_gid FROM natural_rdb_aux_" + tableName + + "_log_tmp) limit 100;"; + sqlite3_stmt *stmt = nullptr; + int errCode = SQLiteUtils::GetStatement(db, sql, stmt); + if (errCode != E_OK) { + LOGE("[RDBUtils][GetOneBatchAgingCloudRecord] Get statement failed, %d.", errCode); + return errCode; + } + + do { + errCode = SQLiteUtils::StepWithRetry(stmt); + if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { + AgingCloudRecord record; + record.logRowid = sqlite3_column_int64(stmt, 0); // col 0 is _rowid_ of log table + record.dataRowid = sqlite3_column_int64(stmt, 1); // col 1 is _rowid_ of data table + errCode = GetTypeValByStatement(stmt, 2, record.pkValue); // col 2 is pk of data table + if (errCode != E_OK) { + LOGE("[RDBUtils][GetOneBatchAgingCloudRecord] Get pk value failed, errCode = %d.", errCode); + break; + } + record.flag = sqlite3_column_int(stmt, 3); // col 3 is flag + records.push_back(record); + } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { + errCode = E_OK; + break; + } else { + LOGE("[RDBUtils][GetOneBatchAgingCloudRecord] Step failed, errCode = %d.", errCode); + break; + } + } while (true); + + return SQLiteUtils::ProcessStatementErrCode(stmt, true, errCode); +} + +int SQLiteRelationalUtils::AgingOneRecord(const std::string &tableName, sqlite3 *db, const AgingCloudRecord &record, + bool isLogicDelete) +{ + if ((record.flag & static_cast(LogInfoFlag::FLAG_LOCAL)) != 0) { + std::string sql = "UPDATE natural_rdb_aux_" + tableName + + "_log SET cloud_gid = '' AND version = '' where _rowid_ = " + std::to_string(record.logRowid) + ";"; + return SQLiteUtils::ExecuteRawSQL(db, sql); + } + + if ((record.flag & static_cast(LogInfoFlag::FLAG_CLOUD_WRITE)) != 0) { + std::string sql = "DELETE FROM " + tableName + " WHERE _rowid_ = " + std::to_string(record.dataRowid) + ";"; + int errCode = SQLiteUtils::ExecuteRawSQL(db, sql); + if (errCode != E_OK) { + return errCode; + } + int32_t newFlag = isLogicDelete ? ((record.flag & (0x20 | 0x1 | 0x8)) & (~0x4000)) : + ((record.flag & (0x20 | 0x1)) & (~0x4000)); + sql = "UPDATE natural_rdb_aux_" + tableName + "_log SET cloud_gid = '' AND version = '' AND flag = " + + std::to_string(newFlag) + " where _rowid_ = " + std::to_string(record.logRowid) + ";"; + return SQLiteUtils::ExecuteRawSQL(db, sql); + } + return E_OK; +} #endif } // namespace DistributedDB \ No newline at end of file diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h index 72ef275cf1..bcc34b6d24 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h @@ -152,6 +152,19 @@ public: ICloudSyncer::CloudTaskInfo &info); static int PutCloudGid(sqlite3 *db, const std::string &tableName, std::vector &data); + + struct AgingCloudRecord { + int64_t logRowid = 0; + int64_t dataRowid = 0; + int32_t flag = 0; + Type pkValue; + }; + + static int GetOneBatchAgingCloudRecord(const std::string &tableName, sqlite3 *db, + std::vector &records, const std::string &dataPk); + + static int AgingOneRecord(const std::string &tableName, sqlite3 *db, const AgingCloudRecord &record, + bool isLogicDelete); #endif private: static int BindExtendStatementByType(sqlite3_stmt *statement, int cid, Type &typeVal); diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp index 0b636d0d8f..b2d93a407f 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp @@ -1221,7 +1221,7 @@ int SQLiteSingleRelationalStorageEngine::GeneLogInfoForExistedDataInBatch(const LOGI("[GeneLogInfoForExistedDataInBatch] Generate 10 batch log finished"); batchNum = 0; } - std::this_thread::sleep_for(CloudDbConstant::ASYNC_GEN_LOG_INTERVAL); + std::this_thread::sleep_for(CloudDbConstant::LONG_TRANSACTION_INTERVAL); } while (changedCount != 0); return E_OK; } -- Gitee