From e4e587944846ed02c5c58833f5402216a78fa87a Mon Sep 17 00:00:00 2001 From: lobty Date: Mon, 9 Mar 2026 19:27:36 +0800 Subject: [PATCH] [wip] async remove device Signed-off-by: lobty --- .../relational/relational_store_delegate.h | 26 ++ .../relational_store_delegate_impl.cpp | 57 ++++ .../relational_store_delegate_impl.h | 6 + .../relational/relational_sync_able_storage.h | 6 +- .../include/icloud_sync_storage_interface.h | 5 + .../include/relational_store_connection.h | 2 + .../storage/include/storage_proxy.h | 2 + .../relational_sync_able_storage.cpp | 24 ++ .../relational/sqlite_relational_store.cpp | 18 +- .../relational/sqlite_relational_store.h | 2 + .../sqlite_relational_store_connection.cpp | 15 + .../sqlite_relational_store_connection.h | 2 + ...qlite_single_relational_storage_engine.cpp | 98 +++++++ .../sqlite_single_relational_storage_engine.h | 28 ++ .../storage/src/storage_proxy.cpp | 10 + .../syncer/src/cloud/cloud_syncer.cpp | 5 + ...ces_relational_remove_device_data_test.cpp | 266 ++++++++++++++++++ 17 files changed, 567 insertions(+), 5 deletions(-) diff --git a/frameworks/libs/distributeddb/interfaces/include/relational/relational_store_delegate.h b/frameworks/libs/distributeddb/interfaces/include/relational/relational_store_delegate.h index ac6935ed13f..2a7bfee3197 100644 --- a/frameworks/libs/distributeddb/interfaces/include/relational/relational_store_delegate.h +++ b/frameworks/libs/distributeddb/interfaces/include/relational/relational_store_delegate.h @@ -55,6 +55,10 @@ public: bool isAsync = false; }; + struct RemoveDeviceDataConfig { + bool isAsync = false; + }; + DB_API virtual DBStatus SetStoreConfig(const StoreConfig &config) { return OK; @@ -87,6 +91,12 @@ public: return RemoveDeviceDataInner(device, mode); } + DB_API DBStatus RemoveDeviceData(const std::string &device, ClearMode mode, + const RemoveDeviceDataConfig &config) + { + return RemoveDeviceDataInner(device, mode, config); + } + DB_API virtual DBStatus RemoveDeviceData(const std::string &device, const std::string &tableName) { return OK; @@ -97,6 +107,12 @@ public: return RemoveDeviceTableDataInner(option); } + DB_API DBStatus RemoveDeviceData(const ClearDeviceDataOption &option, + const RemoveDeviceDataConfig &config) + { + return RemoveDeviceTableDataInner(option, config); + } + // timeout is in ms. DB_API virtual DBStatus RemoteQuery(const std::string &device, const RemoteCondition &condition, uint64_t timeout, std::shared_ptr &result) @@ -227,12 +243,22 @@ public: } protected: virtual DBStatus RemoveDeviceDataInner(const std::string &device, ClearMode mode) = 0; + virtual DBStatus RemoveDeviceDataInner(const std::string &device, ClearMode mode, + const RemoveDeviceDataConfig &config) + { + return RemoveDeviceDataInner(device, mode); + } virtual DBStatus CreateDistributedTableInner(const std::string &tableName, TableSyncType type, const CreateDistributedTableConfig &config) = 0; virtual DBStatus RemoveDeviceTableDataInner(const ClearDeviceDataOption &option) { return DBStatus::OK; } + virtual DBStatus RemoveDeviceTableDataInner(const ClearDeviceDataOption &option, + const RemoveDeviceDataConfig &config) + { + return RemoveDeviceTableDataInner(option); + } }; } // namespace DistributedDB #endif // RELATIONAL_STORE_DELEGATE_H \ No newline at end of file diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp b/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp index 432771ae981..5513f4fe090 100644 --- a/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp +++ b/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp @@ -71,6 +71,37 @@ DBStatus RelationalStoreDelegateImpl::RemoveDeviceDataInner(const std::string &d #endif } +DBStatus RelationalStoreDelegateImpl::RemoveDeviceDataInner(const std::string &device, ClearMode mode, + const RemoveDeviceDataConfig &config) +{ + if (mode >= BUTT || mode < 0) { + LOGE("Invalid mode for Remove device data, %d.", INVALID_ARGS); + return INVALID_ARGS; + } + if (mode == DEFAULT) { +#ifdef USE_DISTRIBUTEDDB_DEVICE + return RemoveDeviceData(device, ""); +#else + return OK; +#endif + } + if (conn_ == nullptr) { + LOGE("[RelationalStore Delegate] Invalid connection for operation!"); + return DB_ERROR; + } + +#ifdef USE_DISTRIBUTEDDB_CLOUD + int errCode = conn_->DoClean(mode, {}, config.isAsync); + if (errCode != E_OK) { + LOGE("[RelationalStore Delegate] remove device cloud data failed:%d", errCode); + return TransferDBErrno(errCode); + } + return OK; +#else + return OK; +#endif +} + DBStatus RelationalStoreDelegateImpl::RemoveDeviceTableDataInner(const ClearDeviceDataOption &option) { if (conn_ == nullptr) { @@ -96,6 +127,32 @@ DBStatus RelationalStoreDelegateImpl::RemoveDeviceTableDataInner(const ClearDevi return NOT_SUPPORT; } +DBStatus RelationalStoreDelegateImpl::RemoveDeviceTableDataInner(const ClearDeviceDataOption &option, + const RemoveDeviceDataConfig &config) +{ + if (conn_ == nullptr) { + LOGE("[RelationalStore Delegate] invalid connection for RemoveDeviceData!"); + return DB_ERROR; + } + if (option.mode >= BUTT || option.mode < 0) { + LOGE("[RelationalStore Delegate] invalid mode for Remove device data, %d.", option.mode); + return INVALID_ARGS; + } + if (option.mode == ClearMode::DEFAULT || option.mode == ClearMode::CLEAR_SHARED_TABLE) { + LOGE("[RelationalStore Delegate] not Support mode for Remove device data, %d.", option.mode); + return NOT_SUPPORT; + } +#ifdef USE_DISTRIBUTEDDB_CLOUD + int errCode = conn_->DoClean(option.mode, option.tableList, config.isAsync); + if (errCode != E_OK) { + LOGE("[RelationalStore Delegate] remove device cloud data failed: %d", errCode); + return TransferDBErrno(errCode); + } + return OK; +#endif + return NOT_SUPPORT; +} + DBStatus RelationalStoreDelegateImpl::CreateDistributedTableInner(const std::string &tableName, TableSyncType type, const CreateDistributedTableConfig &config) { diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.h b/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.h index 9f67a746658..01f91c0c9a3 100644 --- a/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.h +++ b/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.h @@ -31,8 +31,14 @@ public: DBStatus RemoveDeviceDataInner(const std::string &device, ClearMode mode) override; + DBStatus RemoveDeviceDataInner(const std::string &device, ClearMode mode, + const RemoveDeviceDataConfig &config) override; + DBStatus RemoveDeviceTableDataInner(const ClearDeviceDataOption &option) override; + DBStatus RemoveDeviceTableDataInner(const ClearDeviceDataOption &option, + const RemoveDeviceDataConfig &config) override; + DBStatus CreateDistributedTableInner(const std::string &tableName, TableSyncType type, const CreateDistributedTableConfig &config) override; diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h b/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h index 62ceb71e43a..cffc5447f72 100644 --- a/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h +++ b/frameworks/libs/distributeddb/interfaces/src/relational/relational_sync_able_storage.h @@ -288,6 +288,10 @@ public: int DropTempTable(const std::string &tableName) override; int DeleteCloudNoneExistRecord(const std::string &tableName, std::pair isNeedDeleted) override; + + int GetGidRecordCount(const std::string &tableName, uint64_t &count) const override; + + bool IsCleanDataTaskRunning() const override; #endif protected: int FillReferenceData(CloudSyncData &syncData); @@ -380,8 +384,6 @@ private: int DeleteOneCloudNoneExistRecord(const std::string &tableName, SQLiteRelationalUtils::CloudNotExistRecord &record, SQLiteSingleVerRelationalStorageExecutor *handle, std::vector &removeAssetsVec); - - int GetGidRecordCount(const std::string &tableName, uint64_t &count) const override; #endif void SetLocalHashDevId(const std::string &devId); diff --git a/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h b/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h index f571b5c7569..ae89f0593cf 100644 --- a/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h +++ b/frameworks/libs/distributeddb/storage/include/icloud_sync_storage_interface.h @@ -355,6 +355,11 @@ public: { return E_OK; } + + virtual bool IsCleanDataTaskRunning() const + { + return false; + } }; } diff --git a/frameworks/libs/distributeddb/storage/include/relational_store_connection.h b/frameworks/libs/distributeddb/storage/include/relational_store_connection.h index 8b2166f341d..9cd64bb70d9 100644 --- a/frameworks/libs/distributeddb/storage/include/relational_store_connection.h +++ b/frameworks/libs/distributeddb/storage/include/relational_store_connection.h @@ -78,6 +78,8 @@ public: virtual int DoClean(ClearMode mode, const std::vector &tableNameList) = 0; + virtual int DoClean(ClearMode mode, const std::vector &tableNameList, bool isAsync) = 0; + virtual int ClearCloudWatermark(const std::set &tableNames) = 0; virtual int SetCloudDB(const std::shared_ptr &cloudDb) = 0; diff --git a/frameworks/libs/distributeddb/storage/include/storage_proxy.h b/frameworks/libs/distributeddb/storage/include/storage_proxy.h index c3fac431273..6db39343cb2 100644 --- a/frameworks/libs/distributeddb/storage/include/storage_proxy.h +++ b/frameworks/libs/distributeddb/storage/include/storage_proxy.h @@ -217,6 +217,8 @@ public: int DeleteCloudNoneExistRecord(const std::string &tableName, std::pair isNeedDeleted = {false, true}); int GetGidRecordCount(const std::string &tableName, uint64_t &count) const; + + bool IsCleanDataTaskRunning() const; protected: void Init(); private: diff --git a/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage.cpp b/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage.cpp index bb221514525..f6502d3c54e 100644 --- a/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage.cpp +++ b/frameworks/libs/distributeddb/storage/src/relational/relational_sync_able_storage.cpp @@ -1962,5 +1962,29 @@ int RelationalSyncAbleStorage::ClearUnLockingNoNeedCompensated() ReleaseHandle(handle); return errCode; } + +#ifdef USE_DISTRIBUTEDDB_CLOUD +int RelationalSyncAbleStorage::GetGidRecordCount(const std::string &tableName, uint64_t &count) const +{ + int errCode = E_OK; + auto *handle = GetHandle(false, errCode); + if (handle == nullptr) { + return errCode; + } + ResFinalizer finalizer([this, handle]() { + auto releaseHandle = handle; + ReleaseHandle(releaseHandle); + }); + return handle->GetGidRecordCount(tableName, count); +} + +bool RelationalSyncAbleStorage::IsCleanDataTaskRunning() const +{ + if (storageEngine_ == nullptr) { + return false; + } + return storageEngine_->IsCleanDataTaskRunning(); +} +#endif } #endif diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp index b74c0b3ebbc..e4048400774 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp @@ -549,6 +549,11 @@ int SQLiteRelationalStore::CheckAndCollectCloudTables(ClearMode mode, const Rela } int SQLiteRelationalStore::CleanCloudData(ClearMode mode, const std::vector &tableList) +{ + return CleanCloudData(mode, tableList, false); +} + +int SQLiteRelationalStore::CleanCloudData(ClearMode mode, const std::vector &tableList, bool isAsync) { int errCode = StopGenLogTask(tableList); if (errCode != E_OK) { @@ -577,9 +582,16 @@ int SQLiteRelationalStore::CleanCloudData(ClearMode mode, const std::vectorCleanCloudData(mode, cloudTableNameList, localSchema); - if (errCode != E_OK) { - LOGE("[RelationalStore] failed to clean cloud data, %d.", errCode); + if (isAsync) { + errCode = sqliteStorageEngine_->TriggerCleanDataTask(mode, cloudTableNameList); + if (errCode != E_OK) { + LOGE("[RelationalStore] failed to trigger async clean cloud data task, %d.", errCode); + } + } else { + errCode = cloudSyncer_->CleanCloudData(mode, cloudTableNameList, localSchema); + if (errCode != E_OK) { + LOGE("[RelationalStore] failed to clean cloud data, %d.", errCode); + } } return errCode; diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.h index d22e851d488..7257ff4a928 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.h @@ -103,6 +103,8 @@ public: int CleanCloudData(ClearMode mode, const std::vector &tableList = {}); + int CleanCloudData(ClearMode mode, const std::vector &tableList, bool isAsync); + int CheckAndCollectCloudTables(ClearMode mode, const RelationalSchemaObject &localSchema, const std::vector &tableList, std::vector &cloudTableNameList); diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp index 84eb650735b..1fb71c62fea 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp @@ -206,6 +206,21 @@ int SQLiteRelationalStoreConnection::DoClean(ClearMode mode, const std::vector &tableList, bool isAsync) +{ + auto *store = GetDB(); + if (store == nullptr) { + LOGE("[RelationalConnection] store is null, get DB failed!"); + return -E_INVALID_CONNECTION; + } + + int errCode = store->CleanCloudData(mode, tableList, isAsync); + if (errCode != E_OK) { + LOGE("[RelationalConnection] failed to clean cloud data, %d.", errCode); + } + return errCode; +} + int SQLiteRelationalStoreConnection::ClearCloudWatermark(const std::set &tableNames) { auto *store = GetDB(); diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.h index 57a167c6964..9036d9897d9 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.h @@ -60,6 +60,8 @@ public: #ifdef USE_DISTRIBUTEDDB_CLOUD int DoClean(ClearMode mode, const std::vector &tableList) override; + int DoClean(ClearMode mode, const std::vector &tableList, bool isAsync) override; + int ClearCloudWatermark(const std::set &tableNames) override; int32_t GetCloudSyncTaskCount() override; diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp index e7ac4390273..04d81abed00 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp @@ -1738,6 +1738,104 @@ int SQLiteSingleRelationalStorageEngine::DropTempTable(const std::string &tableN }); return handle->DropTempTable(tableName); } + +int SQLiteSingleRelationalStorageEngine::TriggerCleanDataTask(ClearMode mode, + const std::vector &tableList) +{ + std::lock_guard lock(asyncTaskStatusMutex_); + if (currentAsyncTaskType_ == AsyncTaskType::CLEAN_DATA_TASK) { + LOGW("[TriggerCleanDataTask] CLEAN_DATA task already running"); + return -E_BUSY; + } + if (currentAsyncTaskType_ == AsyncTaskType::GEN_LOG_TASK) { + LOGI("[TriggerCleanDataTask] Interrupting GEN_LOG task for CLEAN_DATA task"); + genLogTaskStatus_ = GenLogTaskStatus::INTERRUPTED; + } + currentAsyncTaskType_ = AsyncTaskType::CLEAN_DATA_TASK; + cleanDataTaskStatus_ = CleanDataTaskStatus::RUNNING; + + RefObject::IncObjRef(this); + int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, mode, tableList]() { + bool isNeedStartTask = false; + { + std::lock_guard taskLock(asyncTaskStatusMutex_); + if (cleanDataTaskStatus_ == CleanDataTaskStatus::IDLE) { + cleanDataTaskStatus_ = CleanDataTaskStatus::RUNNING; + currentAsyncTaskType_ = AsyncTaskType::CLEAN_DATA_TASK; + isNeedStartTask = true; + } + } + if (isNeedStartTask) { + LOGI("[TriggerCleanDataTask] Start async clean data task"); + } + { + std::lock_guard taskLock(asyncTaskStatusMutex_); + currentAsyncTaskType_ = AsyncTaskType::NONE; + cleanDataTaskStatus_ = CleanDataTaskStatus::IDLE; + } + asyncTaskCv_.notify_all(); + RefObject::DecObjRef(this); + }); + if (errCode != E_OK) { + LOGE("[TriggerCleanDataTask] Schedule clean data task failed: %d", errCode); + std::lock_guard lock(asyncTaskStatusMutex_); + currentAsyncTaskType_ = AsyncTaskType::NONE; + cleanDataTaskStatus_ = CleanDataTaskStatus::IDLE; + } + return errCode; +} + +void SQLiteSingleRelationalStorageEngine::StopCleanDataTask(bool isCloseDb) +{ + std::lock_guard lock(asyncTaskStatusMutex_); + if (isCloseDb) { + cleanDataTaskStatus_ = CleanDataTaskStatus::DB_CLOSED; + } else { + cleanDataTaskStatus_ = CleanDataTaskStatus::INTERRUPTED; + } +} + +bool SQLiteSingleRelationalStorageEngine::IsCleanDataTaskRunning() const +{ + std::lock_guard lock(asyncTaskStatusMutex_); + return currentAsyncTaskType_ == AsyncTaskType::CLEAN_DATA_TASK && + cleanDataTaskStatus_ == CleanDataTaskStatus::RUNNING; +} + +bool SQLiteSingleRelationalStorageEngine::IsNeedStopCleanDataTask() +{ + std::lock_guard lock(asyncTaskStatusMutex_); + return cleanDataTaskStatus_ == CleanDataTaskStatus::DB_CLOSED || + cleanDataTaskStatus_ == CleanDataTaskStatus::INTERRUPTED; +} + +void SQLiteSingleRelationalStorageEngine::SetCleanDataTaskStatus(CleanDataTaskStatus status) +{ + std::lock_guard lock(asyncTaskStatusMutex_); + cleanDataTaskStatus_ = status; +} + +void SQLiteSingleRelationalStorageEngine::ResetCleanDataTaskStatus() +{ + std::lock_guard lock(asyncTaskStatusMutex_); + if (cleanDataTaskStatus_ == CleanDataTaskStatus::INTERRUPTED || + cleanDataTaskStatus_ == CleanDataTaskStatus::DB_CLOSED) { + cleanDataTaskStatus_ = CleanDataTaskStatus::IDLE; + currentAsyncTaskType_ = AsyncTaskType::NONE; + } +} + +AsyncTaskType SQLiteSingleRelationalStorageEngine::GetCurrentAsyncTaskType() const +{ + std::lock_guard lock(asyncTaskStatusMutex_); + return currentAsyncTaskType_; +} + +bool SQLiteSingleRelationalStorageEngine::IsAsyncTaskRunning() const +{ + std::lock_guard lock(asyncTaskStatusMutex_); + return currentAsyncTaskType_ != AsyncTaskType::NONE; +} #endif int SQLiteSingleRelationalStorageEngine::CheckTableExists(const std::string &tableName, bool &isCreated) diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h index f35cdc83671..201a190c1de 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h @@ -33,6 +33,19 @@ enum class GenLogTaskStatus { DB_CLOSED, }; +enum class CleanDataTaskStatus { + IDLE = 0, + RUNNING, + INTERRUPTED, + DB_CLOSED, +}; + +enum class AsyncTaskType { + NONE = 0, + GEN_LOG_TASK, + CLEAN_DATA_TASK, +}; + // Once waiting time for log generation task before cloud sync constexpr std::chrono::milliseconds SYNC_WAIT_GEN_LOG_ONCE_TIME = std::chrono::milliseconds(10 * 1000); // Max waiting time for log generation task before cloud sync @@ -112,6 +125,15 @@ public: #ifdef USE_DISTRIBUTEDDB_CLOUD int PutCloudGid(const std::string &tableName, std::vector &data); int DropTempTable(const std::string &tableName); + + int TriggerCleanDataTask(ClearMode mode, const std::vector &tableList); + void StopCleanDataTask(bool isCloseDb = false); + bool IsCleanDataTaskRunning() const; + bool IsNeedStopCleanDataTask(); + void SetCleanDataTaskStatus(CleanDataTaskStatus status); + void ResetCleanDataTaskStatus(); + AsyncTaskType GetCurrentAsyncTaskType() const; + bool IsAsyncTaskRunning() const; #endif int CheckTableExists(const std::string &tableName, bool &isCreated); protected: @@ -209,6 +231,12 @@ private: GenLogTaskStatus genLogTaskStatus_ = GenLogTaskStatus::IDLE; mutable std::mutex genLogTaskCvMutex_; std::condition_variable genLogTaskCv_; + + mutable std::mutex asyncTaskStatusMutex_; + AsyncTaskType currentAsyncTaskType_ = AsyncTaskType::NONE; + CleanDataTaskStatus cleanDataTaskStatus_ = CleanDataTaskStatus::IDLE; + mutable std::mutex asyncTaskCvMutex_; + std::condition_variable asyncTaskCv_; }; } // namespace DistributedDB #endif diff --git a/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp b/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp index 4ebcd7d3ad4..c013ce46bc2 100644 --- a/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp +++ b/frameworks/libs/distributeddb/storage/src/storage_proxy.cpp @@ -1007,4 +1007,14 @@ int StorageProxy::GetGidRecordCount(const std::string &tableName, uint64_t &coun } return store_->GetGidRecordCount(tableName, count); } + +bool StorageProxy::IsCleanDataTaskRunning() const +{ + std::shared_lock readLock(storeMutex_); + if (store_ == nullptr) { + LOGE("[IsCleanDataTaskRunning] store is null"); + return false; + } + return store_->IsCleanDataTaskRunning(); +} } \ No newline at end of file diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp index 893aabbefd5..3336d8babc8 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp @@ -223,6 +223,11 @@ int CloudSyncer::DoSync(TaskId taskId) LOGE("[CloudSyncer] Wait async gen log task finished failed: %d", errCode); return errCode; } + if (storageProxy_ != nullptr && storageProxy_->IsCleanDataTaskRunning()) { + SyncMachineDoFinished(); + LOGE("[CloudSyncer] Clean data task is running, sync aborted"); + return -E_BUSY; + } std::lock_guard lock(syncMutex_); ResetCurrentTableUploadBatchIndex(); CloudTaskInfo taskInfo; diff --git a/frameworks/libs/distributeddb/test/unittest/common/interfaces/distributeddb_cloud_interfaces_relational_remove_device_data_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/interfaces/distributeddb_cloud_interfaces_relational_remove_device_data_test.cpp index 9cf5e251e92..ebe09b0e147 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/interfaces/distributeddb_cloud_interfaces_relational_remove_device_data_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/interfaces/distributeddb_cloud_interfaces_relational_remove_device_data_test.cpp @@ -15,6 +15,7 @@ #ifdef RELATIONAL_STORE #include #include +#include #include "cloud/cloud_storage_utils.h" #include "cloud/cloud_db_constant.h" #include "distributeddb_data_generate_unit_test.h" @@ -2449,5 +2450,270 @@ HWTEST_F(DistributedDBCloudInterfacesRelationalRemoveDeviceDataTest, CloudInsuff CheckCompensatedNum(db, g_tables, {10, 0}); CloseDb(); } + +void WaitForAsyncCleanTaskFinished(int64_t waitTime) +{ + std::unique_lock lock(g_processMutex); + g_processCondition.wait_for(lock, std::chrono::seconds(waitTime), []() { + return true; + }); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); +} + +HWTEST_F(DistributedDBCloudInterfacesRelationalRemoveDeviceDataTest, AsyncRemoveDeviceDataTest001, TestSize.Level1) +{ + int64_t paddingSize = 10; + int cloudCount = 20; + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + InsertUserTableRecord(db, 0, 10, paddingSize, false); + Query query = Query::Select().FromTable(g_tables); + std::vector expectProcess; + InitProcessForCleanCloudData1(cloudCount, expectProcess); + CloudSyncStatusCallback callback; + GetCallback(g_syncProcess, callback, expectProcess); + ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime), + DBStatus::OK); + WaitForSyncFinish(g_syncProcess, g_syncWaitTime); + CheckCloudRecordNum(db, g_tables, {20, 20}); + std::string device = ""; + RemoveDeviceDataConfig config = { .isAsync = true }; + ASSERT_EQ(g_delegate->RemoveDeviceData(device, FLAG_ONLY, config), DBStatus::OK); + WaitForAsyncCleanTaskFinished(g_syncWaitTime); + CheckCleanLogNum(db, g_tables, 0); + CloseDb(); +} + +HWTEST_F(DistributedDBCloudInterfacesRelationalRemoveDeviceDataTest, AsyncRemoveDeviceDataTest002, TestSize.Level1) +{ + int64_t paddingSize = 10; + int cloudCount = 20; + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + InsertUserTableRecord(db, 0, 10, paddingSize, false); + Query query = Query::Select().FromTable(g_tables); + std::vector expectProcess; + InitProcessForCleanCloudData1(cloudCount, expectProcess); + CloudSyncStatusCallback callback; + GetCallback(g_syncProcess, callback, expectProcess); + ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime), + DBStatus::OK); + WaitForSyncFinish(g_syncProcess, g_syncWaitTime); + CheckCloudRecordNum(db, g_tables, {20, 20}); + std::string device = ""; + RemoveDeviceDataConfig config = { .isAsync = false }; + ASSERT_EQ(g_delegate->RemoveDeviceData(device, FLAG_AND_DATA, config), DBStatus::OK); + CheckCleanDataAndLogNum(db, g_tables, 0, {10, 0}); + CloseDb(); +} + +HWTEST_F(DistributedDBCloudInterfacesRelationalRemoveDeviceDataTest, AsyncRemoveDeviceDataTest003, TestSize.Level1) +{ + int64_t paddingSize = 10; + int cloudCount = 20; + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + InsertUserTableRecord(db, 0, 10, paddingSize, false); + Query query = Query::Select().FromTable(g_tables); + std::vector expectProcess; + InitProcessForCleanCloudData1(cloudCount, expectProcess); + CloudSyncStatusCallback callback; + GetCallback(g_syncProcess, callback, expectProcess); + ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime), + DBStatus::OK); + WaitForSyncFinish(g_syncProcess, g_syncWaitTime); + CheckCloudRecordNum(db, g_tables, {20, 20}); + std::string device = ""; + RemoveDeviceDataConfig config = { .isAsync = true }; + std::vector results(2, DBStatus::INVALID_ARGS); + std::vector threads; + for (int i = 0; i < 2; ++i) { + threads.emplace_back([device, &config, &results, i]() { + results[i] = g_delegate->RemoveDeviceData(device, FLAG_ONLY, config); + }); + } + for (auto &thread : threads) { + thread.join(); + } + int successCount = 0; + int busyCount = 0; + for (const auto &result : results) { + if (result == DBStatus::OK) { + successCount++; + } else if (result == DBStatus::BUSY) { + busyCount++; + } + } + EXPECT_EQ(successCount, 1); + EXPECT_GE(busyCount, 0); + WaitForAsyncCleanTaskFinished(g_syncWaitTime); + CloseDb(); +} + +HWTEST_F(DistributedDBCloudInterfacesRelationalRemoveDeviceDataTest, AsyncRemoveDeviceDataTest004, TestSize.Level1) +{ + int64_t paddingSize = 10; + int localCount = 100; + InsertUserTableRecord(db, 0, localCount, paddingSize, false); + CreateDistributedTableConfig tableConfig = { .isAsync = true }; + ASSERT_EQ(g_delegate->CreateDistributedTable(g_tableName1, CLOUD_COOPERATION, tableConfig), DBStatus::OK); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::string device = ""; + RemoveDeviceDataConfig config = { .isAsync = true }; + ASSERT_EQ(g_delegate->RemoveDeviceData(device, FLAG_ONLY, config), DBStatus::OK); + WaitForAsyncCleanTaskFinished(g_syncWaitTime); + CloseDb(); +} + +HWTEST_F(DistributedDBCloudInterfacesRelationalRemoveDeviceDataTest, AsyncRemoveDeviceDataTest005, TestSize.Level1) +{ + int64_t paddingSize = 10; + int cloudCount = 20; + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + InsertUserTableRecord(db, 0, 10, paddingSize, false); + Query query = Query::Select().FromTable(g_tables); + std::vector expectProcess; + InitProcessForCleanCloudData1(cloudCount, expectProcess); + CloudSyncStatusCallback callback; + GetCallback(g_syncProcess, callback, expectProcess); + ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime), + DBStatus::OK); + WaitForSyncFinish(g_syncProcess, g_syncWaitTime); + CheckCloudRecordNum(db, g_tables, {20, 20}); + std::string device = ""; + RemoveDeviceDataConfig config = { .isAsync = true }; + ASSERT_EQ(g_delegate->RemoveDeviceData(device, FLAG_ONLY, config), DBStatus::OK); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + g_syncProcess = {}; + CloudSyncStatusCallback callback2; + DBStatus syncResult = g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback2, g_syncWaitTime); + EXPECT_TRUE(syncResult == DBStatus::OK || syncResult == DBStatus::BUSY); + WaitForAsyncCleanTaskFinished(g_syncWaitTime); + CloseDb(); +} + +HWTEST_F(DistributedDBCloudInterfacesRelationalRemoveDeviceDataTest, AsyncRemoveDeviceDataTest006, TestSize.Level1) +{ + int64_t paddingSize = 10; + int cloudCount = 20; + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + InsertUserTableRecord(db, 0, 10, paddingSize, false); + Query query = Query::Select().FromTable(g_tables); + std::vector expectProcess; + InitProcessForCleanCloudData1(cloudCount, expectProcess); + CloudSyncStatusCallback callback; + GetCallback(g_syncProcess, callback, expectProcess); + ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime), + DBStatus::OK); + WaitForSyncFinish(g_syncProcess, g_syncWaitTime); + CheckCloudRecordNum(db, g_tables, {20, 20}); + std::string device = ""; + RemoveDeviceDataConfig config = { .isAsync = true }; + ASSERT_EQ(g_delegate->RemoveDeviceData(device, FLAG_ONLY, config), DBStatus::OK); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + CloseDb(); + g_delegate = nullptr; + WaitForAsyncCleanTaskFinished(g_syncWaitTime); +} + +HWTEST_F(DistributedDBCloudInterfacesRelationalRemoveDeviceDataTest, AsyncRemoveDeviceDataTest007, TestSize.Level1) +{ + int64_t paddingSize = 10; + int cloudCount = 20; + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + InsertUserTableRecord(db, 0, 10, paddingSize, false); + Query query = Query::Select().FromTable(g_tables); + std::vector expectProcess; + InitProcessForCleanCloudData1(cloudCount, expectProcess); + CloudSyncStatusCallback callback; + GetCallback(g_syncProcess, callback, expectProcess); + ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime), + DBStatus::OK); + WaitForSyncFinish(g_syncProcess, g_syncWaitTime); + CheckCloudRecordNum(db, g_tables, {20, 20}); + std::string device = ""; + RemoveDeviceDataConfig config = { .isAsync = true }; + ASSERT_EQ(g_delegate->RemoveDeviceData(device, FLAG_ONLY, config), DBStatus::OK); + WaitForAsyncCleanTaskFinished(g_syncWaitTime); + ASSERT_EQ(g_delegate->RemoveDeviceData(device, FLAG_ONLY, config), DBStatus::OK); + WaitForAsyncCleanTaskFinished(g_syncWaitTime); + CloseDb(); +} + +HWTEST_F(DistributedDBCloudInterfacesRelationalRemoveDeviceDataTest, AsyncRemoveDeviceDataTest008, TestSize.Level1) +{ + int64_t paddingSize = 10; + int cloudCount = 20; + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + InsertUserTableRecord(db, 0, 10, paddingSize, false); + Query query = Query::Select().FromTable(g_tables); + std::vector expectProcess; + InitProcessForCleanCloudData1(cloudCount, expectProcess); + CloudSyncStatusCallback callback; + GetCallback(g_syncProcess, callback, expectProcess); + ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime), + DBStatus::OK); + WaitForSyncFinish(g_syncProcess, g_syncWaitTime); + std::string device = ""; + RemoveDeviceDataConfig config = { .isAsync = true }; + ASSERT_EQ(g_delegate->RemoveDeviceData(device, DEFAULT, config), DBStatus::OK); + CloseDb(); +} + +HWTEST_F(DistributedDBCloudInterfacesRelationalRemoveDeviceDataTest, AsyncRemoveDeviceDataTest009, TestSize.Level1) +{ + int64_t paddingSize = 10; + int cloudCount = 20; + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + InsertUserTableRecord(db, 0, 10, paddingSize, false); + Query query = Query::Select().FromTable(g_tables); + std::vector expectProcess; + InitProcessForCleanCloudData1(cloudCount, expectProcess); + CloudSyncStatusCallback callback; + GetCallback(g_syncProcess, callback, expectProcess); + ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime), + DBStatus::OK); + WaitForSyncFinish(g_syncProcess, g_syncWaitTime); + CheckCloudRecordNum(db, g_tables, {20, 20}); + std::string device = ""; + ClearDeviceDataOption option = { + DistributedDB::FLAG_AND_DATA, + device, + {} + }; + RemoveDeviceDataConfig config = { .isAsync = true }; + ASSERT_EQ(g_delegate->RemoveDeviceData(option, config), DBStatus::OK); + WaitForAsyncCleanTaskFinished(g_syncWaitTime); + CheckCleanDataAndLogNum(db, g_tables, 0, {0, 0}); + CloseDb(); +} + +HWTEST_F(DistributedDBCloudInterfacesRelationalRemoveDeviceDataTest, AsyncRemoveDeviceDataTest010, TestSize.Level1) +{ + int64_t paddingSize = 10; + int cloudCount = 20; + InsertCloudTableRecord(0, cloudCount, paddingSize, false); + InsertUserTableRecord(db, 0, 10, paddingSize, false); + Query query = Query::Select().FromTable(g_tables); + std::vector expectProcess; + InitProcessForCleanCloudData1(cloudCount, expectProcess); + CloudSyncStatusCallback callback; + GetCallback(g_syncProcess, callback, expectProcess); + ASSERT_EQ(g_delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_FORCE_PULL, query, callback, g_syncWaitTime), + DBStatus::OK); + WaitForSyncFinish(g_syncProcess, g_syncWaitTime); + CheckCloudRecordNum(db, g_tables, {20, 20}); + std::string device = ""; + RemoveDeviceDataConfig config = { .isAsync = true }; + ASSERT_EQ(g_delegate->RemoveDeviceData(device, FLAG_ONLY, config), DBStatus::OK); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + ClearDeviceDataOption option = { + DistributedDB::FLAG_ONLY, + device, + {g_tables[0]} + }; + RemoveDeviceDataConfig config2 = { .isAsync = true }; + DBStatus result = g_delegate->RemoveDeviceData(option, config2); + EXPECT_TRUE(result == DBStatus::OK || result == DBStatus::BUSY); + WaitForAsyncCleanTaskFinished(g_syncWaitTime); + CloseDb(); +} } #endif // RELATIONAL_STORE \ No newline at end of file -- Gitee