diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp index fb61a99ee9a745c8ed1ccdad78f4fce6b81b29ee..26818e310deee785a7cfd7b6f913ffeb94139a77 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp @@ -2008,16 +2008,21 @@ int CloudSyncer::DownloadOneAssetRecord(const std::set &dupHashKeySet, cons int CloudSyncer::GetSyncParamForDownload(TaskId taskId, SyncParam ¶m) { + int ret = E_OK; if (IsCurrentTableResume(taskId, false)) { std::lock_guard autoLock(dataLock_); if (resumeTaskInfos_[taskId].syncParam.tableName == currentContext_.tableName) { param = resumeTaskInfos_[taskId].syncParam; resumeTaskInfos_[taskId].syncParam = {}; + ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark); + if (ret != E_OK) { + LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data when table is resume: %d.", ret); + } LOGD("[CloudSyncer] Get sync param from cache"); return E_OK; } } - int ret = GetCurrentTableName(param.tableName); + ret = GetCurrentTableName(param.tableName); if (ret != E_OK) { LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret); return ret; @@ -2041,7 +2046,6 @@ int CloudSyncer::GetSyncParamForDownload(TaskId taskId, SyncParam ¶m) LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret); } } - ReloadCloudWaterMarkIfNeed(param.tableName, param.cloudWaterMark); currentContext_.notifier->GetDownloadInfoByTableName(param.info); return ret; } diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/process_notifier.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/process_notifier.cpp index bc873fa873d0308a36e3922c27b6a5204362d17f..f852a2a783557a283b193de8dbbc546e16ac9f6c 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/process_notifier.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/process_notifier.cpp @@ -178,7 +178,7 @@ void ProcessNotifier::GetDownloadInfoByTableName(ICloudSyncer::InnerProcessInfo syncProcess = multiSyncProcess_[user_]; } - if (syncProcess.tableProcess.find(process.tableName) == syncProcess.tableProcess.end()) { + if (syncProcess.tableProcess.find(process.tableName) != syncProcess.tableProcess.end()) { process.downLoadInfo = syncProcess.tableProcess[process.tableName].downLoadInfo; } } diff --git a/frameworks/libs/distributeddb/test/unittest/common/storage/cloud/distributeddb_cloud_check_sync_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/storage/cloud/distributeddb_cloud_check_sync_test.cpp index 56b7563dd034983e71de51af9abc4dc10e6662d7..12c6ac9295d4394438704989ca2c509b66b8dce5 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/storage/cloud/distributeddb_cloud_check_sync_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/storage/cloud/distributeddb_cloud_check_sync_test.cpp @@ -201,8 +201,12 @@ protected: void DeleteUserTableRecord(int64_t begin, int64_t end); void DeleteCloudTableRecord(int64_t gid); void CheckCloudTableCount(const std::string &tableName, int64_t expectCount); + bool CheckSyncCount(const Info actualInfo, const Info expectInfo); + bool CheckSyncProcess(std::vector> &actualSyncProcess, + vector &expectSyncProcessV); void PriorityAndNormalSync(const Query &normalQuery, const Query &priorityQuery, - RelationalStoreDelegate *delegate); + RelationalStoreDelegate *delegate, std::vector> &prioritySyncProcess, + bool isCheckProcess); void DeleteCloudDBData(int64_t begin, int64_t count); void SetForkQueryForCloudPrioritySyncTest007(std::atomic &count); void SetForkQueryForCloudPrioritySyncTest008(std::atomic &count); @@ -428,30 +432,89 @@ void DistributedDBCloudCheckSyncTest::CheckCloudTableCount(const std::string &ta EXPECT_EQ(realCount, expectCount); // ExpectCount represents the total amount of cloud data. } +bool DistributedDBCloudCheckSyncTest::CheckSyncCount(const Info actualInfo, const Info expectInfo) +{ + if (actualInfo.batchIndex != expectInfo.batchIndex) { + return false; + } + if (actualInfo.total != expectInfo.total) { + return false; + } + if (actualInfo.successCount != expectInfo.successCount) { + return false; + } + if (actualInfo.failCount != expectInfo.failCount) { + return false; + } + return true; +} + +bool DistributedDBCloudCheckSyncTest::CheckSyncProcess( + std::vector> &actualSyncProcess, vector &expectSyncProcessV) +{ + vector> expectSyncProcess; + for (auto syncProcess : expectSyncProcessV) { + map expectSyncProcessMap = {{"CLOUD", syncProcess}}; + expectSyncProcess.emplace_back(expectSyncProcessMap); + } + for (int i = 0; i < (int) actualSyncProcess.size(); i++) { + map actualSyncProcessMap = actualSyncProcess[i]; + map expectSyncProcessMap = expectSyncProcess[i]; + for (auto &it : actualSyncProcessMap) { + string mapKey = it.first; + if (expectSyncProcessMap.find(mapKey) == expectSyncProcessMap.end()) { + return false; + } + SyncProcess actualSyncProcess = it.second; + SyncProcess expectSyncProcess = expectSyncProcessMap.find(mapKey)->second; + for (const auto &itInner : actualSyncProcess.tableProcess) { + string tableName = itInner.first; + if (expectSyncProcess.tableProcess.find(tableName) == expectSyncProcess.tableProcess.end()) { + return false; + } + TableProcessInfo actualTableProcessInfo = itInner.second; + TableProcessInfo expectTableProcessInfo = expectSyncProcess.tableProcess.find(tableName)->second; + if (!CheckSyncCount(actualTableProcessInfo.downLoadInfo, expectTableProcessInfo.downLoadInfo)) { + return false; + } + if (!CheckSyncCount(actualTableProcessInfo.upLoadInfo, expectTableProcessInfo.upLoadInfo)) { + return false; + } + } + } + } + return true; +} + void DistributedDBCloudCheckSyncTest::PriorityAndNormalSync(const Query &normalQuery, const Query &priorityQuery, - RelationalStoreDelegate *delegate) + RelationalStoreDelegate *delegate, std::vector> &prioritySyncProcess, + bool isCheckProcess) { std::mutex dataMutex; std::condition_variable cv; bool normalFinish = false; bool priorityFinish = false; - auto normalCallback = [&cv, &dataMutex, &normalFinish, &priorityFinish]( + auto normalCallback = [&cv, &dataMutex, &normalFinish, &priorityFinish, &prioritySyncProcess, &isCheckProcess]( const std::map &process) { for (const auto &item: process) { if (item.second.process == DistributedDB::FINISHED) { normalFinish = true; - ASSERT_EQ(priorityFinish, true); + if (isCheckProcess) { + ASSERT_EQ(priorityFinish, true); + } cv.notify_one(); } } + prioritySyncProcess.emplace_back(process); }; - auto priorityCallback = [&cv, &priorityFinish](const std::map &process) { + auto priorityCallback = [&cv, &priorityFinish, &prioritySyncProcess](const std::map &process) { for (const auto &item: process) { if (item.second.process == DistributedDB::FINISHED) { priorityFinish = true; cv.notify_one(); } } + prioritySyncProcess.emplace_back(process); }; CloudSyncOption option; PrepareOption(option, normalQuery, false); @@ -1116,7 +1179,8 @@ HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest003, TestSize.Lev Query normalQuery = Query::Select().FromTable({tableName_}); std::vector idValue = {"0", "1", "2"}; Query priorityQuery = Query::Select().From(tableName_).In("id", idValue); - PriorityAndNormalSync(normalQuery, priorityQuery, delegate_); + std::vector> prioritySyncProcess; + PriorityAndNormalSync(normalQuery, priorityQuery, delegate_, prioritySyncProcess, true); EXPECT_EQ(virtualCloudDb_->GetLockCount(), 2); virtualCloudDb_->Reset(); EXPECT_EQ(virtualCloudDb_->GetLockCount(), 0); @@ -1803,6 +1867,48 @@ HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest015, TestSize.Lev CheckUserTableResult(db_, tableName_, 60); } +/** + * @tc.name: CloudPrioritySyncTest016 + * @tc.desc: priority sync when normal syncing + * @tc.type: FUNC + * @tc.require: + * @tc.author: wangxiangdong + */ +HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest016, TestSize.Level0) +{ + /** + * @tc.steps:step1. insert cloud table record. + * @tc.expected: step1. ok. + */ + const int actualCount = 60; // 60 is count of records + InsertCloudTableRecord(0, actualCount, 0, false); + InsertUserTableRecord(tableName_, 10); + + /** + * @tc.steps:step2. begin normal sync and priority sync. + * @tc.expected: step2. ok. + */ + Query normalQuery = Query::Select().FromTable({tableName_}); + std::vector idValue = {"0", "1", "2"}; + Query priorityQuery = Query::Select().From(tableName_).In("id", idValue); + std::vector> prioritySyncProcess; + PriorityAndNormalSync(normalQuery, priorityQuery, delegate_, prioritySyncProcess, false); + virtualCloudDb_->Reset(); + CheckCloudTableCount(tableName_, 60); // 10 is count of cloud records + /** + * @tc.steps:step3. check sync process result. + * @tc.expected: step3. ok. + */ + std::vector expectSyncResult = { + {PROCESSING, OK, {{tableName_, {PROCESSING, {1, 60, 60, 0, 50, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}}, + {PROCESSING, OK, {{tableName_, {PROCESSING, {1, 3, 3, 0, 0, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}}, + {FINISHED, OK, {{tableName_, {FINISHED, {1, 3, 3, 0, 0, 0, 0}, {1, 3, 3, 0, 0, 3, 0}}}}}, + {PROCESSING, OK, {{tableName_, {PROCESSING, {2, 63, 63, 0, 50, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}}, + {FINISHED, OK, {{tableName_, {FINISHED, {2, 63, 63, 0, 50, 0, 0}, {1, 7, 7, 0, 0, 7, 0}}}}} + }; + EXPECT_EQ(CheckSyncProcess(prioritySyncProcess, expectSyncResult), true); +} + /** * @tc.name: LogicDeleteSyncTest001 * @tc.desc: sync with logic delete diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/distributeddb_cloud_syncer_download_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/distributeddb_cloud_syncer_download_test.cpp index 27c6553a56834ae51fcf07b6881d2502e0d46f5f..af0442cd21834ec5ba9a72d06a66e41440f5be9b 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/distributeddb_cloud_syncer_download_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/distributeddb_cloud_syncer_download_test.cpp @@ -771,6 +771,7 @@ HWTEST_F(DistributedDBCloudSyncerDownloadTest, DownloadMockTest008, TestSize.Lev g_cloudSyncer->SetCloudWaterMarks(param.tableName, param.cloudWaterMark); ICloudSyncer::SyncParam actualParam; EXPECT_EQ(g_cloudSyncer->CallGetSyncParamForDownload(taskId, actualParam), E_OK); + expectCloudWaterMark = ""; EXPECT_EQ(actualParam.cloudWaterMark, expectCloudWaterMark); g_cloudSyncer->SetTaskResume(taskId, false);