diff --git a/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h b/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h index a6d36c54c9d5082c2b24828f7288331b993beeff..1c45df0d4fcba67b89c05f66ff78aa5793c63f90 100644 --- a/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h +++ b/frameworks/libs/distributeddb/common/include/cloud/cloud_db_constant.h @@ -23,6 +23,7 @@ namespace DistributedDB { class CloudDbConstant { public: static constexpr const char *CLOUD_META_TABLE_PREFIX = "naturalbase_cloud_meta_"; + static constexpr const char *CLOUD_INFO_META_PREFIX = "naturalbase_cloud_info_meta_"; static constexpr const char *GID_FIELD = "#_gid"; static constexpr const char *CREATE_FIELD = "#_createTime"; static constexpr const char *MODIFY_FIELD = "#_modifyTime"; @@ -104,7 +105,7 @@ public: static constexpr std::chrono::milliseconds DFX_TIME_THRESHOLD = std::chrono::milliseconds(1000); - static constexpr std::chrono::milliseconds ASYNC_GEN_LOG_INTERVAL = std::chrono::milliseconds(20); + static constexpr std::chrono::milliseconds LONG_TRANSACTION_INTERVAL = std::chrono::milliseconds(50); static constexpr std::chrono::milliseconds LONG_TIME_TRANSACTION = std::chrono::milliseconds(1000); // change local flag to :keep async download asset[0x1000] diff --git a/frameworks/libs/distributeddb/common/include/db_common.h b/frameworks/libs/distributeddb/common/include/db_common.h index 96a2db07ae4016e2a4def0d8ed22e1e172ce1996..d1c25e6be0fe04741ce79aaa74f5d3035a46d66f 100644 --- a/frameworks/libs/distributeddb/common/include/db_common.h +++ b/frameworks/libs/distributeddb/common/include/db_common.h @@ -30,6 +30,7 @@ public: static int CreateDirectory(const std::string &directory); static void StringToVector(const std::string &src, std::vector &dst); + static std::vector ToVector(const std::string &src); static void VectorToString(const std::vector &src, std::string &dst); static inline std::string GetLogTableName(const std::string &tableName) @@ -37,6 +38,11 @@ public: return DBConstant::RELATIONAL_PREFIX + tableName + DBConstant::LOG_POSTFIX; } + static inline std::string GetTmpLogTableName(const std::string &tableName) + { + return DBConstant::RELATIONAL_PREFIX + tableName + DBConstant::LOG_POSTFIX + DBConstant::TMP_POSTFIX; + } + static inline const std::vector GetWaterTypeVec() { return {CloudWaterType::DELETE, CloudWaterType::UPDATE, CloudWaterType::INSERT}; diff --git a/frameworks/libs/distributeddb/common/include/db_constant.h b/frameworks/libs/distributeddb/common/include/db_constant.h index 6de2f46a98be953a274967fbb7d59a34afb65039..a0c15ad39f2f3e6fa627757820cefeb70fda8664 100644 --- a/frameworks/libs/distributeddb/common/include/db_constant.h +++ b/frameworks/libs/distributeddb/common/include/db_constant.h @@ -162,6 +162,7 @@ public: static constexpr size_t RELATIONAL_PREFIX_SIZE = 20; static constexpr const char *TIMESTAMP_ALIAS = "naturalbase_rdb_aux_timestamp"; static constexpr const char *LOG_POSTFIX = "_log"; + static constexpr const char *TMP_POSTFIX = "_tmp"; static constexpr const char *META_TABLE_POSTFIX = "metadata"; static constexpr const char *KNOWLEDGE_TABLE_TYPE = "knowledge"; diff --git a/frameworks/libs/distributeddb/common/include/db_errno.h b/frameworks/libs/distributeddb/common/include/db_errno.h index 3bc7b9caade51f8f5063220728539aaacf2a63b6..645adb6e04ee2adfab72b30c1896f00ee0c5e9e0 100644 --- a/frameworks/libs/distributeddb/common/include/db_errno.h +++ b/frameworks/libs/distributeddb/common/include/db_errno.h @@ -196,6 +196,7 @@ constexpr const int E_NEED_CORRECT_TARGET_USER = (E_BASE + 209); constexpr const int E_CLOUD_ASSET_NOT_FOUND = (E_BASE + 210); // Cloud download asset return 404 error constexpr const int E_TASK_INTERRUPTED = (E_BASE + 211); // Task(cloud sync, generate log) interrupted constexpr const int E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT = (E_BASE + 212); // Upload failed by cloud space insufficient +constexpr const int E_EXPIRED_CURSOR = (E_BASE + 213); // Cursor invalid in cloud } // namespace DistributedDB #endif // DISTRIBUTEDDB_ERRNO_H diff --git a/frameworks/libs/distributeddb/common/src/db_common_client.cpp b/frameworks/libs/distributeddb/common/src/db_common_client.cpp index fa103ec9fe8efac192436a76f6e299184403367f..723305e1cb7cfc485230f4cd0350e6ed2d4eb84c 100644 --- a/frameworks/libs/distributeddb/common/src/db_common_client.cpp +++ b/frameworks/libs/distributeddb/common/src/db_common_client.cpp @@ -33,6 +33,13 @@ void DBCommon::StringToVector(const std::string &src, std::vector &dst) dst.assign(src.begin(), src.end()); } +std::vector DBCommon::ToVector(const std::string &src) +{ + std::vector res; + StringToVector(src, res); + return res; +} + std::string DBCommon::StringMiddleMasking(const std::string &name) { if (name.length() <= HEAD_SIZE) { diff --git a/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h b/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h index 79e24a0c66efd6bd49c372932e7a8d7ee6906713..333e2911f18247cc3f55569ec4acaf2a264ac6b5 100644 --- a/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h +++ b/frameworks/libs/distributeddb/interfaces/include/cloud/icloud_db.h @@ -52,6 +52,10 @@ public: { return this->prepareTraceId; } + virtual DBStatus QueryAllGID(const std::string &tableName, VBucket &extend, std::vector &data) + { + return NOT_SUPPORT; + } private: std::string prepareTraceId; }; diff --git a/frameworks/libs/distributeddb/interfaces/include/store_types.h b/frameworks/libs/distributeddb/interfaces/include/store_types.h index e74ae3b0c2f899cece65f50974bcea7df891e22e..d29c61f73c09d89e9c5aa7bf9d887bdaf6769366 100644 --- a/frameworks/libs/distributeddb/interfaces/include/store_types.h +++ b/frameworks/libs/distributeddb/interfaces/include/store_types.h @@ -99,6 +99,7 @@ enum DBStatus { CLOUD_ASSET_NOT_FOUND, // The cloud download asset return 404 error TASK_INTERRUPTED, // Task(cloud sync) interrupted SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT, // Whitelist for contact, skip when cloud space insufficient + EXPIRED_CURSOR, // Cursor is out of date in cloud BUTT_STATUS = 27394048 // end of status }; diff --git a/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp b/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp index cbe2ccdd20581932b76626c8cafdc4d2ea2a4528..e52953991459d090e3589fdcbef2da69b88f4d41 100644 --- a/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp +++ b/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp @@ -90,6 +90,7 @@ namespace { { -E_CLOUD_ASSET_NOT_FOUND, CLOUD_ASSET_NOT_FOUND }, { -E_TASK_INTERRUPTED, TASK_INTERRUPTED }, { -E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT, SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT }, + { -E_EXPIRED_CURSOR, EXPIRED_CURSOR }, }; } diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h b/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h index eba410cff8822d59fdcde678dbdb4d1d6b252d59..3c0ec305cc45abf95bcb36e2e22a2ddf1aa030cc 100644 --- a/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h +++ b/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h @@ -281,6 +281,10 @@ public: int WaitAsyncGenLogTaskFinished(const std::vector &tables) override; int ResetGenLogTaskStatus(); + + int PutCloudGid(const std::string &tableName, std::vector &data) override; + + int AgingCloudNoneExistRecord(const std::string &tableName); protected: int FillReferenceData(CloudSyncData &syncData); @@ -359,6 +363,14 @@ private: int StartTransactionForAsyncDownload(TransactType type); +#ifdef USE_DISTRIBUTEDDB_CLOUD + int GetOneBatchCloudNoneExistRecord(const std::string &tableName, const std::string &dataPk, + std::vector &records); + + int AgingOneBatchCloudNoneExistRecord(const std::string &tableName, ChangedData &changedData, + const std::vector &records); +#endif + // data std::shared_ptr storageEngine_ = nullptr; std::function onSchemaChanged_; diff --git a/frameworks/libs/distributeddb/storage/include/cloud/cloud_meta_data.h b/frameworks/libs/distributeddb/storage/include/cloud/cloud_meta_data.h index b0081a8aa9b4878b4876a62f4df970ea3e015e6b..5005e4e972025e1a6e30926de50891b525db0fe5 100644 --- a/frameworks/libs/distributeddb/storage/include/cloud/cloud_meta_data.h +++ b/frameworks/libs/distributeddb/storage/include/cloud/cloud_meta_data.h @@ -47,6 +47,10 @@ public: void CleanWaterMarkInMemory(const TableName &tableName); + int GetCloudGidCursor(const std::string &tableName, std::string &cursor) const; + int PutCloudGidCursor(const std::string &tableName, const std::string &cursor) const; + int CleanCloudInfo(const std::string &tableName) const; + private: typedef struct CloudMetaValue { Timestamp localMark = 0u; @@ -56,6 +60,11 @@ private: std::string cloudMark; } CloudMetaValue; + typedef struct CloudInfoValue { + uint64_t version = 0u; + std::string gidCloudMark; + } CloudInfoValue; + int ReadMarkFromMeta(const TableName &tableName); int WriteMarkToMeta(const TableName &tableName, Timestamp localmark, std::string &cloudMark); int WriteTypeMarkToMeta(const TableName &tableName, CloudMetaValue &cloudMetaValue); @@ -63,6 +72,13 @@ private: int DeserializeMark(Value &blobMark, CloudMetaValue &cloudMetaValue); uint64_t GetParcelCurrentLength(CloudMetaValue &cloudMetaValue); + int GetCloudInfo(const std::string &tableName, CloudInfoValue &info) const ; + static int DeserializeCloudInfo(Value &blobMark, CloudInfoValue &info); + static int SerializeCloudInfo(const CloudInfoValue &info, Value &blobMark); + + static uint64_t GetParcelCurrentLength(const CloudInfoValue &info); + static Key GetCloudInfoKey(const std::string &tableName); + mutable std::mutex cloudMetaMutex_; std::unordered_map cloudMetaVals_; ICloudSyncStorageInterface *store_ = nullptr; diff --git a/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h b/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h index 10e24a43d8cfdf4467e16558745909f5a6592e19..3e8b71b868dc6bde81f57a54d64724a876ec8964 100644 --- a/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h +++ b/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h @@ -329,6 +329,21 @@ public: } virtual int WaitAsyncGenLogTaskFinished(const std::vector &tables) = 0; + + virtual int PutCloudGid([[gnu::unused]] const std::string &tableName, [[gnu::unused]] std::vector &data) + { + return E_OK; + } + + virtual int AgingCloudNoneExistRecord([[gnu::unused]] const std::string &tableName) + { + return E_OK; + } + + virtual int DeleteMetaData([[gnu::unused]] const std::vector &keys) + { + return E_OK; + } }; } diff --git a/frameworks/libs/distributeddb/storage/include/storage_proxy.h b/frameworks/libs/distributeddb/storage/include/storage_proxy.h index 3614c9eb25e76b9882821d8a9c9b153468c7be41..10e3442838572d09daa8ed043ea279ea3fff0143 100644 --- a/frameworks/libs/distributeddb/storage/include/storage_proxy.h +++ b/frameworks/libs/distributeddb/storage/include/storage_proxy.h @@ -199,6 +199,16 @@ public: int WaitAsyncGenLogTaskFinished(const std::vector &tables) const; + // gid contain in data with key #_gid + int PutCloudGid(const std::string &tableName, std::vector &data); + + int GetCloudGidCursor(const std::string &tableName, std::string &cursor); + + int PutCloudGidCursor(const std::string &tableName, const std::string &cursor); + + int CleanCloudInfo(const std::string &tableName); + + int AgingCloudNoneExistRecord(const std::string &tableName); protected: void Init(); private: diff --git a/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp b/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp index b828c85dc89427ed270f54ac80b059699a75a323..2cc2b9cacee9ebb2bf564a46d7b2b4b1c1b41975 100644 --- a/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp +++ b/frameworks/libs/distributeddb/storage/src/cloud/cloud_meta_data.cpp @@ -277,4 +277,100 @@ void CloudMetaData::CleanWaterMarkInMemory(const TableName &tableName) cloudMetaVals_[tableName] = {}; LOGD("[Meta] clean cloud water mark in memory"); } + +int CloudMetaData::GetCloudGidCursor(const std::string &tableName, std::string &cursor) const +{ + if (store_ == nullptr) { + return -E_INVALID_DB; + } + CloudInfoValue info; + auto errCode = GetCloudInfo(tableName, info); + if (errCode != E_OK) { + return errCode; + } + cursor = info.gidCloudMark; + return E_OK; +} + +int CloudMetaData::PutCloudGidCursor(const std::string &tableName, const std::string &cursor) const +{ + Value blobMetaVal; + CloudInfoValue info; + auto errCode = GetCloudInfo(tableName, info); + if (errCode != E_OK) { + return errCode; + } + info.gidCloudMark = cursor; + errCode = SerializeCloudInfo(info, blobMetaVal); + if (errCode != E_OK) { + return errCode; + } + if (store_ == nullptr) { + return -E_INVALID_DB; + } + return store_->PutMetaData(GetCloudInfoKey(tableName), blobMetaVal); +} + +int CloudMetaData::CleanCloudInfo(const std::string &tableName) const +{ + if (store_ == nullptr) { + return -E_INVALID_DB; + } + return store_->DeleteMetaData({GetCloudInfoKey(tableName)}); +} + +int CloudMetaData::GetCloudInfo(const std::string &tableName, CloudInfoValue &info) const +{ + Value blobMetaVal; + int ret = store_->GetMetaData(GetCloudInfoKey(tableName), blobMetaVal); + if (ret != -E_NOT_FOUND && ret != E_OK) { + return ret; + } + if (ret == -E_NOT_FOUND) { + return E_OK; + } + return DeserializeCloudInfo(blobMetaVal, info); +} + +int CloudMetaData::DeserializeCloudInfo(Value &blobMark, CloudInfoValue &info) +{ + if (blobMark.empty()) { + info.version = 0; + info.gidCloudMark = ""; + return E_OK; + } + Parcel parcel(blobMark.data(), blobMark.size()); + info.version = 0; + parcel.ReadUInt64(info.version); + parcel.ReadString(info.gidCloudMark); + if (parcel.IsError()) { + LOGE("[CloudMetaData] Parse error cloud info version[%" PRIu32 "]", info.version); + return -E_PARSE_FAIL; + } + return E_OK; +} + +int CloudMetaData::SerializeCloudInfo(const CloudInfoValue &info, Value &blobMark) +{ + uint64_t length = GetParcelCurrentLength(info); + blobMark.resize(length); + Parcel parcel(blobMark.data(), blobMark.size()); + parcel.WriteUInt64(info.version); + parcel.WriteString(info.gidCloudMark); + if (parcel.IsError()) { + LOGE("[CloudMetaData] Parcel error while serializing cloud info."); + return -E_PARSE_FAIL; + } + return E_OK; +} + +uint64_t CloudMetaData::GetParcelCurrentLength(const CloudInfoValue &info) +{ + return Parcel::GetUInt64Len() + Parcel::GetStringLen(info.gidCloudMark); +} + +Key CloudMetaData::GetCloudInfoKey(const std::string &tableName) +{ + return DBCommon::ToVector(CloudDbConstant::CLOUD_INFO_META_PREFIX + tableName); +} } // namespace DistributedDB \ No newline at end of file diff --git a/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp b/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp index 82aff8809af06c91c0b2820941bf0996760b30e2..85f4e3428785b3846a4ae277fa67e83de7b9f180 100644 --- a/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp +++ b/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage_extend.cpp @@ -723,5 +723,137 @@ int RelationalSyncAbleStorage::ResetGenLogTaskStatus() storageEngine_->ResetGenLogTaskStatus(); return E_OK; } + +int RelationalSyncAbleStorage::PutCloudGid(const std::string &tableName, std::vector &data) +{ + if (storageEngine_ == nullptr) { + LOGE("[RelationalSyncAbleStorage] Storage is null when put cloud gid"); + return -E_INVALID_DB; + } + return storageEngine_->PutCloudGid(tableName, data); +} + +#ifdef USE_DISTRIBUTEDDB_CLOUD +int RelationalSyncAbleStorage::AgingCloudNoneExistRecord(const std::string &tableName) +{ + if (storageEngine_ == nullptr) { + LOGE("[AgingCloudNoneExistRecord] Storage is null"); + return -E_INVALID_DB; + } + TableInfo tableInfo = storageEngine_->GetSchema().GetTable(tableName); + bool isPkUseRowid = tableInfo.IsNoPkTable() || tableInfo.IsMultiPkTable(); + std::string dataPk = isPkUseRowid ? "_rowid_" : tableInfo.GetPrimaryKey().at(0); + + int errCode = E_OK; + do { + std::vector records; + ChangedData changedData; + changedData.type = ChangedDataType::DATA; + changedData.tableName = tableName; + changedData.field.push_back(dataPk); + errCode = GetOneBatchCloudNoneExistRecord(tableName, dataPk, records); + if (errCode == -E_FINISHED) { + break; + } + if (errCode != E_OK) { + LOGE("[AgingCloudNoneExistRecord] get one batch cloud none exist record failed.%d, tableName:%s", errCode, + DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); + return errCode; + } + errCode = AgingOneBatchCloudNoneExistRecord(tableName, changedData, records); + if (errCode != E_OK) { + LOGE("[AgingCloudNoneExistRecord] aging one batch cloud none exist record failed.%d, tableName:%s", + errCode, DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); + return errCode; + } + TriggerObserverAction("CLOUD", std::move(changedData), true); + std::this_thread::sleep_for(CloudDbConstant::LONG_TRANSACTION_INTERVAL); + } while (true); + return E_OK; +} + +int RelationalSyncAbleStorage::GetOneBatchCloudNoneExistRecord(const std::string &tableName, const std::string &dataPk, + std::vector &records) +{ + int errCode = E_OK; + auto *readHandle = static_cast( + storageEngine_->FindExecutor(false, OperatePerm::NORMAL_PERM, errCode)); + if (readHandle == nullptr) { + return errCode; + } + errCode = readHandle->StartTransaction(TransactType::DEFERRED); + if (errCode != E_OK) { + ReleaseHandle(readHandle); + return errCode; + } + sqlite3 *db = nullptr; + errCode = readHandle->GetDbHandle(db); + if (errCode != E_OK) { + readHandle->Rollback(); + ReleaseHandle(readHandle); + return errCode; + } + errCode = SQLiteRelationalUtils::GetOneBatchAgingCloudRecord(tableName, db, records, dataPk); + if (errCode != E_OK) { + LOGE("[AgingCloudNoneExistRecord] get aging cloud none exist record failed.%d, tableName:%s", errCode, + DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); + readHandle->Rollback(); + ReleaseHandle(readHandle); + return errCode; + } + readHandle->Commit(); + ReleaseHandle(readHandle); + if (records.empty()) { + return -E_FINISHED; + } + return E_OK; +} + +int RelationalSyncAbleStorage::AgingOneBatchCloudNoneExistRecord(const std::string &tableName, + ChangedData &changedData, const std::vector &records) +{ + auto start = std::chrono::steady_clock::now(); + int errCode = E_OK; + auto *writeHandle = static_cast( + storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode)); + if (writeHandle == nullptr) { + return errCode; + } + errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE); + if (errCode != E_OK) { + ReleaseHandle(writeHandle); + return errCode; + } + sqlite3 *db = nullptr; + errCode = writeHandle->GetDbHandle(db); + if (errCode != E_OK) { + writeHandle->Rollback(); + ReleaseHandle(writeHandle); + return errCode; + } + std::vector dataVec; + for (auto &record : records) { + errCode = SQLiteRelationalUtils::AgingOneRecord(tableName, db, record, logicDelete_); + if (errCode != E_OK) { + LOGE("[AgingOneBatchCloudNoneExistRecord] aging cloud none exist record failed.%d, tableName:%s", errCode, + DBCommon::StringMiddleMaskingWithLen(tableName).c_str()); + writeHandle->Rollback(); + ReleaseHandle(writeHandle); + return errCode; + } + dataVec.emplace_back(record.pkValue); + auto duration = std::chrono::steady_clock::now() - start; + if (duration > CloudDbConstant::LONG_TIME_TRANSACTION) { + LOGI("[AgingOneBatchCloudNoneExistRecord] aging one batch cloud none exist record cost %" PRId64 "ms.", + duration.count()); + break; + } + } + writeHandle->Commit(); + ReleaseHandle(writeHandle); + changedData.primaryData[ChangeType::OP_DELETE].push_back(dataVec); + return E_OK; +} +#endif } #endif diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp index 0149f26f08c3a2ec3e6e1eecb0864d12e8099049..4e0601b11403935bafbe9e3f7a55df30f0ca3419 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.cpp @@ -1256,5 +1256,125 @@ void SQLiteRelationalUtils::FillSyncInfo(const CloudSyncOption &option, const Sy info.prepareTraceId = option.prepareTraceId; info.asyncDownloadAssets = option.asyncDownloadAssets; } + +int SQLiteRelationalUtils::PutCloudGid(sqlite3 *db, const std::string &tableName, std::vector &data) +{ + // create tmp table if table not exists + std::string sql = "CREATE TABLE IF NOT EXISTS " + DBCommon::GetTmpLogTableName(tableName) + + "(ID integer primary key autoincrement, cloud_gid TEXT UNIQUE ON CONFLICT IGNORE)"; + int errCode = SQLiteUtils::ExecuteRawSQL(db, sql); + if (errCode != E_OK) { + LOGE("[RDBUtils] Create gid table failed[%d]", errCode); + return errCode; + } + // insert all gid + sql = "INSERT INTO " + DBCommon::GetTmpLogTableName(tableName) + "(cloud_gid) VALUES(?)"; + sqlite3_stmt *stmt = nullptr; + errCode = SQLiteUtils::GetStatement(db, sql, stmt); + if (stmt == nullptr) { + LOGE("[RDBUtils] Get insert gid stmt failed[%d]", errCode); + return errCode; + } + ResFinalizer finalizer([stmt]() { + sqlite3_stmt *releaseStmt = stmt; + int ret = E_OK; + SQLiteUtils::ResetStatement(releaseStmt, true, ret); + if (ret != E_OK) { + LOGW("[RDBUtils] Reset cloud gid stmt failed[%d]", ret); + } + }); + for (const auto &item : data) { + auto iter = item.find(CloudDbConstant::GID_FIELD); + if (iter == item.end()) { + LOGW("[RDBUtils] Cloud gid info not contain gid field"); + continue; + } + auto gid = std::get_if(&iter->second); + if (gid == nullptr) { + LOGW("[RDBUtils] Cloud gid info not contain wrong type[%zu]", iter->second.index()); + continue; + } + errCode = SQLiteUtils::BindTextToStatement(stmt, 1, *gid); + if (errCode != E_OK) { + LOGE("[RDBUtils] Bind cloud gid failed[%d]", errCode); + return errCode; + } + errCode = SQLiteUtils::StepNext(stmt); + if (errCode != -E_FINISHED) { + LOGE("[RDBUtils] Step cloud gid failed[%d]", errCode); + return errCode; + } + errCode = E_OK; + SQLiteUtils::ResetStatement(stmt, false, errCode); + if (errCode != E_OK) { + LOGE("[RDBUtils] Reset cloud gid stmt failed[%d]", errCode); + return errCode; + } + } + return E_OK; +} + +int SQLiteRelationalUtils::GetOneBatchAgingCloudRecord(const std::string &tableName, sqlite3 *db, + std::vector &records, const std::string &dataPk) +{ + std::string sql = "SELECT a._rowid_, a.data_key, b." + dataPk + ", a.flag FROM " + + DBCommon::GetLogTableName(tableName) + " AS a LEFT JOIN " + tableName + " AS b ON (a.data_key = b._rowid_) " + "WHERE a.cloud_gid IS NOT '' AND a.cloud_gid NOT IN (SELECT cloud_gid FROM natural_rdb_aux_" + tableName + + "_log_tmp) limit 100;"; + sqlite3_stmt *stmt = nullptr; + int errCode = SQLiteUtils::GetStatement(db, sql, stmt); + if (errCode != E_OK) { + LOGE("[RDBUtils][GetOneBatchAgingCloudRecord] Get statement failed, %d.", errCode); + return errCode; + } + + do { + errCode = SQLiteUtils::StepWithRetry(stmt); + if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) { + AgingCloudRecord record; + record.logRowid = sqlite3_column_int64(stmt, 0); // col 0 is _rowid_ of log table + record.dataRowid = sqlite3_column_int64(stmt, 1); // col 1 is _rowid_ of data table + errCode = GetTypeValByStatement(stmt, 2, record.pkValue); // col 2 is pk of data table + if (errCode != E_OK) { + LOGE("[RDBUtils][GetOneBatchAgingCloudRecord] Get pk value failed, errCode = %d.", errCode); + break; + } + record.flag = sqlite3_column_int(stmt, 3); // col 3 is flag + records.push_back(record); + } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { + errCode = E_OK; + break; + } else { + LOGE("[RDBUtils][GetOneBatchAgingCloudRecord] Step failed, errCode = %d.", errCode); + break; + } + } while (true); + + return SQLiteUtils::ProcessStatementErrCode(stmt, true, errCode); +} + +int SQLiteRelationalUtils::AgingOneRecord(const std::string &tableName, sqlite3 *db, const AgingCloudRecord &record, + bool isLogicDelete) +{ + if ((record.flag & static_cast(LogInfoFlag::FLAG_LOCAL)) != 0) { + std::string sql = "UPDATE natural_rdb_aux_" + tableName + + "_log SET cloud_gid = '' AND version = '' where _rowid_ = " + std::to_string(record.logRowid) + ";"; + return SQLiteUtils::ExecuteRawSQL(db, sql); + } + + if ((record.flag & static_cast(LogInfoFlag::FLAG_CLOUD_WRITE)) != 0) { + std::string sql = "DELETE FROM " + tableName + " WHERE _rowid_ = " + std::to_string(record.dataRowid) + ";"; + int errCode = SQLiteUtils::ExecuteRawSQL(db, sql); + if (errCode != E_OK) { + return errCode; + } + int32_t newFlag = isLogicDelete ? ((record.flag & (0x20 | 0x1 | 0x8)) & (~0x4000)) : + ((record.flag & (0x20 | 0x1)) & (~0x4000)); + sql = "UPDATE natural_rdb_aux_" + tableName + "_log SET cloud_gid = '' AND version = '' AND flag = " + + std::to_string(newFlag) + " where _rowid_ = " + std::to_string(record.logRowid) + ";"; + return SQLiteUtils::ExecuteRawSQL(db, sql); + } + return E_OK; +} #endif } // namespace DistributedDB \ No newline at end of file diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h index 60f61115ead3310d084ae7f6f12320c4e963f5bf..bcc34b6d2487b6e1d65af75880b1870be6729488 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h @@ -150,6 +150,21 @@ public: #ifdef USE_DISTRIBUTEDDB_CLOUD static void FillSyncInfo(const CloudSyncOption &option, const SyncProcessCallback &onProcess, ICloudSyncer::CloudTaskInfo &info); + + static int PutCloudGid(sqlite3 *db, const std::string &tableName, std::vector &data); + + struct AgingCloudRecord { + int64_t logRowid = 0; + int64_t dataRowid = 0; + int32_t flag = 0; + Type pkValue; + }; + + static int GetOneBatchAgingCloudRecord(const std::string &tableName, sqlite3 *db, + std::vector &records, const std::string &dataPk); + + static int AgingOneRecord(const std::string &tableName, sqlite3 *db, const AgingCloudRecord &record, + bool isLogicDelete); #endif private: static int BindExtendStatementByType(sqlite3_stmt *statement, int cid, Type &typeVal); diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp index a847dc50346871e0656bc8a272e106e542fe10eb..b2d93a407f15af13379cefe7c00c711e6ac1e56e 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp @@ -1221,7 +1221,7 @@ int SQLiteSingleRelationalStorageEngine::GeneLogInfoForExistedDataInBatch(const LOGI("[GeneLogInfoForExistedDataInBatch] Generate 10 batch log finished"); batchNum = 0; } - std::this_thread::sleep_for(CloudDbConstant::ASYNC_GEN_LOG_INTERVAL); + std::this_thread::sleep_for(CloudDbConstant::LONG_TRANSACTION_INTERVAL); } while (changedCount != 0); return E_OK; } @@ -1677,5 +1677,36 @@ std::pair SQLiteSingleRelationalStorageEngine::AnalyzeTable(cons }); return handle->AnalyzeTable(tableName); } + +int SQLiteSingleRelationalStorageEngine::PutCloudGid(const std::string &tableName, std::vector &data) +{ + int errCode = E_OK; + auto *handle = static_cast(FindExecutor(true, OperatePerm::NORMAL_PERM, + errCode)); + if (handle == nullptr) { + return errCode; + } + ResFinalizer finalizer([this, handle]() { + auto releaseHandle = handle; + ReleaseExecutor(releaseHandle); + }); + errCode = handle->StartTransaction(TransactType::IMMEDIATE); + if (errCode != E_OK) { + return errCode; + } + errCode = handle->PutCloudGid(tableName, data); + if (errCode == E_OK) { + errCode = handle->Commit(); + if (errCode != E_OK) { + LOGE("[RDBEngine] Commit transaction failed[%d] when put cloud gid", errCode); + } + } else { + int ret = handle->Rollback(); + if (ret != E_OK) { + LOGW("[RDBEngine] Rollback transaction failed[%d] when put cloud gid", ret); + } + } + return errCode; +} } #endif \ No newline at end of file diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h index fe6b1ceb4e2c0165582405beda1ad36a414fe00c..0b029635a362a9231161c7c74986f569d115b15f 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h @@ -106,6 +106,8 @@ public: std::vector> &tasks); std::pair AnalyzeTable(const std::string &tableName); + + int PutCloudGid(const std::string &tableName, std::vector &data); protected: StorageExecutor *NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb) override; int Upgrade(sqlite3 *db) override; diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp index f03b734b411766ec24f1108b249be446f071e2a7..514eed6aca40ebda2d9393d66700c5da7d87fd4b 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp @@ -2003,5 +2003,10 @@ std::pair SQLiteSingleVerRelationalStorageExecutor::AnalyzeTable { return SQLiteRelationalUtils::AnalyzeTable(dbHandle_, tableName); } + +int SQLiteSingleVerRelationalStorageExecutor::PutCloudGid(const std::string &tableName, std::vector &data) +{ + return SQLiteRelationalUtils::PutCloudGid(dbHandle_, tableName, data); +} } // namespace DistributedDB #endif diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h index 11a0c7f068024e099cb9346d4b0803ce2d1a78a6..fce159f9ddc3ac25b63f52baede362e7b88700a3 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h @@ -284,6 +284,8 @@ public: int GetLocalDataByRowid(const TableInfo &table, const TableSchema &tableSchema, DataInfoWithLog &dataInfoWithLog); std::pair AnalyzeTable(const std::string &tableName) const; + + int PutCloudGid(const std::string &tableName, std::vector &data); private: int UpdateHashKeyWithOutPk(DistributedTableMode mode, const TableInfo &tableInfo, TableSyncType syncType, const std::string &localIdentity); diff --git a/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp b/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp index 0c236ac517a5ebe460518e1190079d08b31275cf..60957c6e81b74ac35391398e82a4782d95a89600 100644 --- a/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp +++ b/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp @@ -912,9 +912,59 @@ void StorageProxy::FilterDownloadRecordNoneSchemaField(const std::string &tableN int StorageProxy::WaitAsyncGenLogTaskFinished(const std::vector &tables) const { if (store_ == nullptr) { - LOGW("[WaitAsyncGenLogTaskFinished] store is null"); + LOGE("[WaitAsyncGenLogTaskFinished] store is null"); return -E_INVALID_DB; } return store_->WaitAsyncGenLogTaskFinished(tables); } + +int StorageProxy::PutCloudGid(const std::string &tableName, std::vector &data) +{ + std::shared_lock readLock(storeMutex_); + if (store_ == nullptr) { + LOGE("[PutCloudGid] store is null"); + return -E_INVALID_DB; + } + return store_->PutCloudGid(tableName, data); +} + +int StorageProxy::GetCloudGidCursor(const std::string &tableName, std::string &cursor) +{ + std::shared_lock readLock(storeMutex_); + if (cloudMetaData_ == nullptr) { + LOGE("[GetCloudGidCursor] meta is null"); + return -E_INVALID_DB; + } + return cloudMetaData_->GetCloudGidCursor(tableName, cursor); +} + +int StorageProxy::PutCloudGidCursor(const std::string &tableName, const std::string &cursor) +{ + std::shared_lock readLock(storeMutex_); + if (cloudMetaData_ == nullptr) { + LOGE("[PutCloudGidCursor] meta is null"); + return -E_INVALID_DB; + } + return cloudMetaData_->PutCloudGidCursor(tableName, cursor); +} + +int StorageProxy::CleanCloudInfo(const std::string &tableName) +{ + std::shared_lock readLock(storeMutex_); + if (cloudMetaData_ == nullptr) { + LOGE("[CleanCloudInfo] meta is null"); + return -E_INVALID_DB; + } + return cloudMetaData_->CleanCloudInfo(tableName); +} + +int StorageProxy::AgingCloudNoneExistRecord(const std::string &tableName) +{ + std::shared_lock readLock(storeMutex_); + if (store_ == nullptr) { + LOGE("[AgingCloudNoneExistRecord] store is null"); + return -E_INVALID_DB; + } + return store_->AgingCloudNoneExistRecord(tableName); +} } \ No newline at end of file diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp index e616fe9b9c3aee3ebd8eddd334fc9350b1129e9d..f37bcdd7264ca5babedf66d02a6c53a052f14a4b 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp @@ -482,33 +482,27 @@ int CloudDBProxy::GetInnerErrorCode(DBStatus status) if (status < DB_ERROR || status >= BUTT_STATUS) { return static_cast(status); } - switch (status) { - case OK: - case LOCAL_ASSET_NOT_FOUND: - return E_OK; - case CLOUD_NETWORK_ERROR: - return -E_CLOUD_NETWORK_ERROR; - case CLOUD_SYNC_UNSET: - return -E_CLOUD_SYNC_UNSET; - case CLOUD_FULL_RECORDS: - return -E_CLOUD_FULL_RECORDS; - case CLOUD_LOCK_ERROR: - return -E_CLOUD_LOCK_ERROR; - case CLOUD_ASSET_SPACE_INSUFFICIENT: - return -E_CLOUD_ASSET_SPACE_INSUFFICIENT; - case CLOUD_VERSION_CONFLICT: - return -E_CLOUD_VERSION_CONFLICT; - case CLOUD_RECORD_EXIST_CONFLICT: - return -E_CLOUD_RECORD_EXIST_CONFLICT; - case CLOUD_DISABLED: - return -E_CLOUD_DISABLED; - case CLOUD_ASSET_NOT_FOUND: - return -E_CLOUD_ASSET_NOT_FOUND; - case SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT: - return -E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT; - default: - return -E_CLOUD_ERROR; - } + static const std::map CLOUD_ERROR = { + {OK, E_OK}, + {LOCAL_ASSET_NOT_FOUND, E_OK}, + {CLOUD_NETWORK_ERROR, -E_CLOUD_NETWORK_ERROR}, + {CLOUD_SYNC_UNSET, -E_CLOUD_SYNC_UNSET}, + {CLOUD_FULL_RECORDS, -E_CLOUD_FULL_RECORDS}, + {CLOUD_LOCK_ERROR, -E_CLOUD_LOCK_ERROR}, + {CLOUD_ASSET_SPACE_INSUFFICIENT, -E_CLOUD_ASSET_SPACE_INSUFFICIENT}, + {CLOUD_VERSION_CONFLICT, -E_CLOUD_VERSION_CONFLICT}, + {CLOUD_RECORD_EXIST_CONFLICT, -E_CLOUD_RECORD_EXIST_CONFLICT}, + {CLOUD_DISABLED, -E_CLOUD_DISABLED}, + {CLOUD_ASSET_NOT_FOUND, -E_CLOUD_ASSET_NOT_FOUND}, + {SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT, -E_SKIP_WHEN_CLOUD_SPACE_INSUFFICIENT}, + {EXPIRED_CURSOR, -E_EXPIRED_CURSOR}, + {QUERY_END, -E_QUERY_END}, + }; + auto iter = CLOUD_ERROR.find(status); + if (iter != CLOUD_ERROR.end()) { + return iter->second; + } + return -E_CLOUD_ERROR; } DBStatus CloudDBProxy::QueryAction(const std::shared_ptr &context, @@ -857,4 +851,17 @@ std::weak_ptr CloudDBProxy::GetCloudConflictHandler() std::shared_lock writeLock(handlerMutex_); return std::weak_ptr(conflictHandler_); } + +int CloudDBProxy::QueryAllGID(const std::string &tableName, VBucket &extend, std::vector &data) +{ + std::shared_lock readLock(cloudMutex_); + if (iCloudDb_ == nullptr) { + return -E_CLOUD_ERROR; + } + auto ret = iCloudDb_->QueryAllGID(tableName, extend, data); + if (ret != DBStatus::OK && ret != DBStatus::QUERY_END) { + LOGE("[CloudDBProxy] QueryAllGid failed[%d]", ret); + } + return GetInnerErrorCode(ret); +} } diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h index 07a1bdfee8022e1c51ebbee7d61ce86b57345756..4fc0c6bf6db300b54905f3b88d5591d37d77b2eb 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.h @@ -89,6 +89,8 @@ public: std::weak_ptr GetCloudConflictHandler(); + int QueryAllGID(const std::string &tableName, VBucket &extend, std::vector &data); + static int GetInnerErrorCode(DBStatus status); protected: class CloudActionContext { diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp index e1890fe3dc479d6666c3696f76368f1012cfcae5..03ab7e766881882cc95601d4117ba30abdacdfb2 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp @@ -1139,22 +1139,6 @@ int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload) return errCode; } -int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload) -{ - // Query data by batch until reaching end and not more data need to be download - int ret = PreCheck(taskId, param.info.tableName); - if (ret != E_OK) { - return ret; - } - do { - ret = DownloadOneBatch(taskId, param, isFirstDownload); - if (ret != E_OK) { - return ret; - } - } while (!param.isLastBatch); - return E_OK; -} - void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info) { std::lock_guard autoLock(dataLock_); @@ -2109,9 +2093,11 @@ int CloudSyncer::DownloadDataFromCloud(TaskId taskId, SyncParam ¶m, bool isF // Won't break here since downloadData may not be null param.isLastBatch = true; } else if (ret != E_OK) { - std::lock_guard autoLock(dataLock_); - param.info.tableStatus = ProcessStatus::FINISHED; - currentContext_.notifier->UpdateProcess(param.info); + if (ret != -E_EXPIRED_CURSOR) { + std::lock_guard autoLock(dataLock_); + param.info.tableStatus = ProcessStatus::FINISHED; + currentContext_.notifier->UpdateProcess(param.info); + } return ret; } diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h index 93ddbb25980aa3880634e84aaa74ee7c66986756..190dbf3c6026412774c222499393c83ba890a47c 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h @@ -570,6 +570,18 @@ protected: void SetCurrentTmpError(int errCode); + int DoUpdateExpiredCursor(TaskId taskId, const std::string &table); + + int DownloadOneBatchGID(TaskId taskId, SyncParam ¶m); + + int CleanExpiredCursor(SyncParam ¶m); + + int DownloadGIDFromCloud(SyncParam ¶m); + + int SaveGIDRecord(SyncParam ¶m); + + int SaveGIDCursor(SyncParam ¶m); + mutable std::mutex dataLock_; TaskId lastTaskId_; std::multimap> taskQueue_; diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp index 2738e50a12b3d200d8479af4c87c0382cdc7dbc0..539645c709830cdfc7f159aa007e492f1a3a3b5f 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp @@ -32,6 +32,9 @@ #include "version.h" namespace DistributedDB { +namespace { + constexpr const int MAX_EXPIRED_CURSOR_COUNT = 1; +} int CloudSyncer::HandleDownloadResultForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info, DownloadCommitList &commitList, uint32_t &successCount) { @@ -227,4 +230,128 @@ void CloudSyncer::SetCurrentTmpError(int errCode) cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode; cloudTaskInfos_[currentContext_.currentTaskId].tempErrCode = errCode; } + +int CloudSyncer::DoDownloadInner(TaskId taskId, SyncParam ¶m, bool isFirstDownload) +{ + // Query data by batch until reaching end and not more data need to be download + int ret = PreCheck(taskId, param.info.tableName); + if (ret != E_OK) { + return ret; + } + int expiredCursorCount = 0; + do { + ret = DownloadOneBatch(taskId, param, isFirstDownload); + if (ret == -E_EXPIRED_CURSOR) { + expiredCursorCount++; + if (expiredCursorCount > MAX_EXPIRED_CURSOR_COUNT) { + LOGE("[CloudSyncer] Table[%s] too much expired cursor count[%d]", + DBCommon::StringMiddleMasking(param.info.tableName).c_str(), expiredCursorCount); + return ret; + } + param.isLastBatch = false; + ret = DoUpdateExpiredCursor(taskId, param.info.tableName); + } + if (ret != E_OK) { + return ret; + } + } while (!param.isLastBatch); + return E_OK; +} + +int CloudSyncer::DoUpdateExpiredCursor(TaskId taskId, const std::string &table) +{ + LOGI("[CloudSyncer] Update expired cursor now, table[%s]", DBCommon::StringMiddleMasking(table).c_str()); + if (storageProxy_ == nullptr) { + LOGE("[CloudSyncer] storage is nullptr when update expired cursor"); + return -E_INTERNAL_ERROR; + } + SyncParam param; + param.tableName = table; + auto errCode = storageProxy_->GetCloudGidCursor(param.tableName, param.cloudWaterMark); + if (errCode != E_OK) { + return errCode; + } + int retryCount = 0; + do { + int ret = DownloadOneBatchGID(taskId, param); + if (ret == -E_EXPIRED_CURSOR && retryCount < MAX_EXPIRED_CURSOR_COUNT) { + retryCount++; + param.cloudWaterMark = ""; + continue; + } + if (ret != E_OK) { + return ret; + } + } while (!param.isLastBatch); + errCode = storageProxy_->AgingCloudNoneExistRecord(param.tableName); + if (errCode != E_OK) { + return errCode; + } + errCode = CleanExpiredCursor(param); + return errCode; +} + +int CloudSyncer::DownloadOneBatchGID(TaskId taskId, SyncParam ¶m) +{ + int ret = CheckTaskIdValid(taskId); + if (ret != E_OK) { + return ret; + } + ret = DownloadGIDFromCloud(param); + if (ret != E_OK) { + return ret; + } + ret = SaveGIDRecord(param); + if (ret != E_OK) { + return ret; + } + return SaveGIDCursor(param); +} + +int CloudSyncer::CleanExpiredCursor(SyncParam ¶m) +{ + int errCode = storageProxy_->CleanCloudInfo(param.tableName); + if (errCode != E_OK) { + return errCode; + } + return storageProxy_->CleanWaterMark(param.tableName); +} + +int CloudSyncer::DownloadGIDFromCloud(SyncParam ¶m) +{ + VBucket extend; + extend[CloudDbConstant::CURSOR_FIELD] = param.cloudWaterMark; + int errCode = cloudDB_.QueryAllGID(param.tableName, extend, param.downloadData.data); + if (errCode == -E_QUERY_END) { + errCode = E_OK; + param.isLastBatch = true; + } + if (errCode != E_OK) { + LOGE("[CloudSyncer] Query cloud gid failed[%d]", errCode); + } else if(!param.downloadData.data.empty()) { + const auto &record = param.downloadData.data[param.downloadData.data.size() - 1u]; + auto iter = record.find(CloudDbConstant::CURSOR_FIELD); + if (iter == record.end()) { + LOGE("[CloudSyncer] Cloud gid record no exist cursor"); + return -E_CLOUD_ERROR; + } + auto cursor = std::get_if(&iter->second); + if (cursor == nullptr) { + LOGE("[CloudSyncer] Cloud gid record cursor is no str, type[%zu]", iter->second.index()); + return -E_CLOUD_ERROR; + } + param.cloudWaterMark = *cursor; + } + return errCode; +} + +int CloudSyncer::SaveGIDRecord(SyncParam ¶m) +{ + return storageProxy_->PutCloudGid(param.tableName, param.downloadData.data); +} + +int CloudSyncer::SaveGIDCursor(SyncParam ¶m) +{ + return storageProxy_->PutCloudGidCursor(param.tableName, param.cloudWaterMark); +} } // namespace DistributedDB \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_rdb_complex_cloud_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_rdb_complex_cloud_test.cpp index 6c49fb326972a8eece0e687f6b998e6c6d455b47..a8bae4650df73a8a26f92e016af68c889fdb1adf 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_rdb_complex_cloud_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_rdb_complex_cloud_test.cpp @@ -147,5 +147,37 @@ HWTEST_F(DistributedDBRDBComplexCloudTest, RemoveData001, TestSize.Level0) ASSERT_NE(delegate, nullptr); EXPECT_EQ(delegate->RemoveDeviceData("", ClearMode::FLAG_ONLY), DBStatus::OK); } + +/** + * @tc.name: ExpireCursor001 + * @tc.desc: Test sync with expire cursor. + * @tc.type: FUNC + * @tc.author: zqq + */ +HWTEST_F(DistributedDBRDBComplexCloudTest, ExpireCursor001, TestSize.Level0) +{ + /** + * @tc.steps:step1. store1 insert one data + * @tc.expected: step1. insert success. + */ + auto ret = ExecuteSQL("INSERT INTO CLOUD_SYNC_TABLE_A VALUES(1, 1, 'text1', 'text2', 'uuid1')", info1_); + ASSERT_EQ(ret, E_OK); + /** + * @tc.steps:step2. store1 push and store2 pull + * @tc.expected: step2. sync success and stringCol2 is null because store1 don't sync stringCol2. + */ + Query query = Query::Select().FromTable({CLOUD_SYNC_TABLE_A}); + EXPECT_NO_FATAL_FAILURE(CloudBlockSync(info1_, query, SyncMode::SYNC_MODE_CLOUD_MERGE, OK, OK)); + EXPECT_NO_FATAL_FAILURE(CloudBlockSync(info2_, query, SyncMode::SYNC_MODE_CLOUD_MERGE, OK, OK)); + auto cloudDB = GetVirtualCloudDb(); + ASSERT_NE(cloudDB, nullptr); + std::atomic count = 0; + cloudDB->ForkAfterQueryResult([&count](VBucket &, std::vector &) { + count++; + return count == 1 ? DBStatus::EXPIRED_CURSOR : DBStatus::QUERY_END; + }); + EXPECT_NO_FATAL_FAILURE(CloudBlockSync(info2_, query, SyncMode::SYNC_MODE_CLOUD_MERGE, OK, OK)); + cloudDB->ForkAfterQueryResult(nullptr); +} } #endif \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp index 1e19a22148d887ba317de20c736120030e5c46a2..1fb8d75c0cbb1a41ecfab5ea16876caa8c5f3e4a 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp @@ -723,4 +723,16 @@ void VirtualCloudDb::SetUploadRecordStatus(DBStatus status) { uploadRecordStatus_ = status; } + +DBStatus VirtualCloudDb::QueryAllGID(const std::string &tableName, VBucket &extend, std::vector &data) +{ + VBucket copyExtend = extend; + std::string cursor = std::get(copyExtend[g_cursorField]); + bool isIncreCursor = (cursor.substr(0, increPrefix_.size()) == increPrefix_); + LOGD("extend size: %zu type: %zu expect: %zu, cursor: %s", extend.size(), copyExtend[g_cursorField].index(), + TYPE_INDEX, cursor.c_str()); + cursor = cursor.empty() ? "0" : cursor; + GetCloudData(cursor, isIncreCursor, cloudData_[tableName], data, copyExtend); + return (data.empty() || data.size() < static_cast(queryLimit_)) ? QUERY_END : OK; +} } \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h index bdede955fe3e7b2518a9297b57411cfff51757a4..821ffab322090aed14ee023a6b37ab9e128d4569 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h @@ -53,6 +53,8 @@ public: DBStatus Close() override; + DBStatus QueryAllGID(const std::string &tableName, VBucket &extend, std::vector &data) override; + void SetCloudError(bool cloudError); void SetBlockTime(int32_t blockTime);