diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp b/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp index a498a2d87e31b4672f919beb176aa32a1d4532f8..4ae3ae96fd2801e3d4fbf8f3c6c604028dedd387 100644 --- a/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp +++ b/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp @@ -296,13 +296,16 @@ std::pair SoftBusAdapter::OpenConnect(const std::shared_ptrUpdateNetworkId(networkId); } } - auto task = [this, connect = std::weak_ptr(conn)]() { + auto applyTask = [deviceId](int32_t errcode) { + CommunicatorContext::GetInstance().NotifySessionReady(deviceId.deviceId, errcode); + }; + auto connectTask = [this, connect = std::weak_ptr(conn)]() { auto conn = connect.lock(); if (conn != nullptr) { conn->OpenConnect(&clientListener_); } }; - ConnectManager::GetInstance()->ApplyConnect(networkId, task); + ConnectManager::GetInstance()->ApplyConnect(networkId, applyTask, connectTask); return std::make_pair(Status::RATE_LIMIT, 0); } diff --git a/services/distributeddataservice/framework/communication/connect_manager.cpp b/services/distributeddataservice/framework/communication/connect_manager.cpp index 74f8f70a88d4e0120b66cfc9e4eada414b975c55..d63ad3342ac40c75902433955dd8c63bbc6de630 100644 --- a/services/distributeddataservice/framework/communication/connect_manager.cpp +++ b/services/distributeddataservice/framework/communication/connect_manager.cpp @@ -129,9 +129,10 @@ void ConnectManager::OnDestory() { } -int32_t ConnectManager::ApplyConnect(__attribute__((unused)) const std::string &networkId, ConnectTask task) +int32_t ConnectManager::ApplyConnect(__attribute__((unused)) const std::string &networkId, + __attribute__((unused)) ApplyTask applyTask, ConnectTask connectTask) { - task(); + connectTask(); return 0; } } // OHOS::AppDistributedKv \ No newline at end of file diff --git a/services/distributeddataservice/framework/include/communication/connect_manager.h b/services/distributeddataservice/framework/include/communication/connect_manager.h index 5e9093c75964e43dbd631edb5bbe67062129005e..eb0e07c167d1bbd9d6916250a31f840afdd1b963 100644 --- a/services/distributeddataservice/framework/include/communication/connect_manager.h +++ b/services/distributeddataservice/framework/include/communication/connect_manager.h @@ -27,6 +27,7 @@ namespace OHOS { namespace AppDistributedKv { class API_EXPORT ConnectManager { public: + using ApplyTask = std::function; using ConnectTask = std::function; using CloseSessionTask = std::function; using SessionCloseListener = std::function; @@ -51,7 +52,7 @@ public: virtual void OnStart(); virtual void OnDestory(); - virtual int32_t ApplyConnect(const std::string &networkId, ConnectTask task); + virtual int32_t ApplyConnect(const std::string &networkId, ApplyTask applyTask, ConnectTask connectTask); private: static std::mutex mtx_; diff --git a/services/distributeddataservice/framework/include/metadata/meta_data_manager.h b/services/distributeddataservice/framework/include/metadata/meta_data_manager.h index fb8dc79c18b2df2589432a8758938d4b59eeb313..dee2bf9fb44df799c95301024233d30553d0814b 100644 --- a/services/distributeddataservice/framework/include/metadata/meta_data_manager.h +++ b/services/distributeddataservice/framework/include/metadata/meta_data_manager.h @@ -85,7 +85,8 @@ public: API_EXPORT bool Subscribe(std::shared_ptr filter, Observer observer); API_EXPORT bool Subscribe(std::string prefix, Observer observer, bool isLocal = false); API_EXPORT bool Unsubscribe(std::string filter); - API_EXPORT bool Sync(const std::vector &devices, OnComplete complete, bool wait = false); + API_EXPORT bool Sync(const std::vector &devices, OnComplete complete, bool wait = false, + bool isRetry = true); private: MetaDataManager(); diff --git a/services/distributeddataservice/framework/include/store/general_value.h b/services/distributeddataservice/framework/include/store/general_value.h index 888c9f7c151ab147b1cb9dce1d3080f6875ce9a3..88ef5065ce6973e38627387cc4d89bd0d04b9650 100644 --- a/services/distributeddataservice/framework/include/store/general_value.h +++ b/services/distributeddataservice/framework/include/store/general_value.h @@ -101,6 +101,7 @@ struct SyncParam { std::string prepareTraceId; int32_t user; bool asyncDownloadAsset = false; + bool isRetry = true; }; enum SyncStage : int8_t { diff --git a/services/distributeddataservice/framework/metadata/meta_data_manager.cpp b/services/distributeddataservice/framework/metadata/meta_data_manager.cpp index 09bd5c24058a17552d681f4f7596983558403c62..a5cc012ade281ad8bb3575405c8259b43d14d675 100644 --- a/services/distributeddataservice/framework/metadata/meta_data_manager.cpp +++ b/services/distributeddataservice/framework/metadata/meta_data_manager.cpp @@ -358,21 +358,27 @@ bool MetaDataManager::DelMeta(const std::vector &keys, bool isLocal return ((status == DistributedDB::DBStatus::OK) || (status == DistributedDB::DBStatus::NOT_FOUND)); } -bool MetaDataManager::Sync(const std::vector &devices, OnComplete complete, bool wait) +bool MetaDataManager::Sync(const std::vector &devices, OnComplete complete, bool wait, bool isRetry) { if (!inited_ || devices.empty()) { return false; } - auto status = metaStore_->Sync(devices, DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL, [complete](auto &dbResults) { - if (complete == nullptr) { - return; - } - std::map results; - for (auto &[uuid, status] : dbResults) { - results.insert_or_assign(uuid, static_cast(status)); - } - complete(results); - }, wait); + DistributedDB::DeviceSyncOption syncOption; + syncOption.devices = devices; + syncOption.mode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL; + syncOption.isWait = wait; + syncOption.isRetry = isRetry; + auto status = metaStore_->Sync(syncOption, + [complete](const std::map &dbResults) { + if (complete == nullptr) { + return; + } + std::map results; + for (auto &[uuid, status] : dbResults) { + results.insert_or_assign(uuid, static_cast(status)); + } + complete(results); + }); if (status == DistributedDB::DBStatus::INVALID_PASSWD_OR_CORRUPTED_DB) { ZLOGE("db corrupted! status:%{public}d", status); CorruptReporter::CreateCorruptedFlag(DirectoryManager::GetInstance().GetMetaStorePath(), storeId_); diff --git a/services/distributeddataservice/framework/test/mock/db_store_mock.cpp b/services/distributeddataservice/framework/test/mock/db_store_mock.cpp index 012561f3892a97284da2bef6c9c22a1354f1ba07..53c1b35eff66e1f9ff1e3f98f16b42b7131df884 100644 --- a/services/distributeddataservice/framework/test/mock/db_store_mock.cpp +++ b/services/distributeddataservice/framework/test/mock/db_store_mock.cpp @@ -351,6 +351,17 @@ DBStatus DBStoreMock::Sync(const DeviceSyncOption &option, const DeviceSyncProce return NOT_SUPPORT; } +DBStatus DBStoreMock::Sync(const DeviceSyncOption &option, + const std::function &)> &onComplete) +{ + std::map result; + for (const auto &device : option.devices) { + result[device] = OK; + } + onComplete(result); + return OK; +} + DBStatus DBStoreMock::CancelSync(uint32_t syncId) { return NOT_SUPPORT; diff --git a/services/distributeddataservice/framework/test/mock/db_store_mock.h b/services/distributeddataservice/framework/test/mock/db_store_mock.h index ae250effc26d555f90385899d7ab6e45c5d015f4..c9246a8c82c2a9f2dc66cdfe22089406ee87ba94 100644 --- a/services/distributeddataservice/framework/test/mock/db_store_mock.h +++ b/services/distributeddataservice/framework/test/mock/db_store_mock.h @@ -115,6 +115,8 @@ public: DBStatus SetReceiveDataInterceptor(const DataInterceptor &interceptor) override; DBStatus GetDeviceEntries(const std::string &device, std::vector &entries) const override; DBStatus Sync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess) override; + DBStatus Sync(const DeviceSyncOption &option, + const std::function &)> &onComplete) override; DBStatus CancelSync(uint32_t syncId) override; DatabaseStatus GetDatabaseStatus() const override; private: diff --git a/services/distributeddataservice/service/kvdb/kvdb_general_store.cpp b/services/distributeddataservice/service/kvdb/kvdb_general_store.cpp index 64964d040b1777b93692ecd2e32d1b1f75278d55..7a80722c370e91de3cf2d6faf3e9cc6b2707b114 100644 --- a/services/distributeddataservice/service/kvdb/kvdb_general_store.cpp +++ b/services/distributeddataservice/service/kvdb/kvdb_general_store.cpp @@ -443,11 +443,9 @@ std::pair KVDBGeneralStore::Sync(const Devices &devices, GenQu dbStatus = delegate_->UnSubscribeRemoteQuery(devices, GetDBSyncCompleteCB(std::move(async)), dbQuery, false); } else if (syncMode < NEARBY_END) { - if (kvQuery->IsEmpty()) { - dbStatus = delegate_->Sync(devices, dbMode, GetDBSyncCompleteCB(std::move(async)), false); - } else { - dbStatus = delegate_->Sync(devices, dbMode, GetDBSyncCompleteCB(std::move(async)), dbQuery, false); - } + DeviceSyncOption syncOption = { .devices = devices, .mode = dbMode, .query = dbQuery, .isWait = false, + .isRetry = syncParam.isRetry }; + dbStatus = delegate_->Sync(syncOption, GetDBSyncCompleteCB(std::move(async))); } else { ZLOGE("Err sync mode! sync mode:%{public}d", syncMode); dbStatus = DistributedDB::INVALID_ARGS; diff --git a/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp b/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp index 4aa437cbcca79c75abd356630f8a1c23ac6a1746..d3845d089c491e9dfa71f1e455cbf37db3f2029b 100644 --- a/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp +++ b/services/distributeddataservice/service/kvdb/kvdb_service_impl.cpp @@ -1127,7 +1127,7 @@ Status KVDBServiceImpl::DoSyncInOrder( auto status = DoSyncBegin(ret.first, meta, info, complete, type); ZLOGD("data sync status:%{public}d appId:%{public}s, storeId:%{public}s", static_cast(status), meta.bundleName.c_str(), Anonymous::Change(meta.storeId).c_str()); - }); + }, false, info.isRetry); if (!result) { RADAR_REPORT(STANDARD_DEVICE_SYNC, STANDARD_META_SYNC, RADAR_FAILED, ERROR_CODE, Status::ERROR, BIZ_STATE, END, SYNC_STORE_ID, Anonymous::Change(meta.storeId), SYNC_APP_ID, meta.bundleName, @@ -1229,15 +1229,14 @@ Status KVDBServiceImpl::DoSyncBegin(const std::vector &devices, con } SyncParam syncParam{}; syncParam.mode = mode; + syncParam.isRetry = info.isRetry; RADAR_REPORT(STANDARD_DEVICE_SYNC, START_SYNC, RADAR_START, SYNC_STORE_ID, Anonymous::Change(meta.storeId), SYNC_APP_ID, meta.bundleName, CONCURRENT_ID, std::to_string(info.syncId), DATA_TYPE, meta.dataType); - auto ret = store->Sync( - devices, query, + auto ret = store->Sync(devices, query, [this, complete](const GenDetails &result) mutable { auto deviceStatus = HandleGenBriefDetails(result); complete(deviceStatus); - }, - syncParam); + }, syncParam); auto status = Status(ret.first); if (status != Status::SUCCESS) { RADAR_REPORT(STANDARD_DEVICE_SYNC, START_SYNC, RADAR_FAILED, ERROR_CODE, status, BIZ_STATE, END, diff --git a/services/distributeddataservice/service/kvdb/kvdb_service_stub.cpp b/services/distributeddataservice/service/kvdb/kvdb_service_stub.cpp index 4febb92f005e479910fa5d87ce603e97022da9ea..acfa5fbdb03ae7970492bd34f73aabe8c7371b8e 100644 --- a/services/distributeddataservice/service/kvdb/kvdb_service_stub.cpp +++ b/services/distributeddataservice/service/kvdb/kvdb_service_stub.cpp @@ -219,7 +219,7 @@ int32_t KVDBServiceStub::OnSync(const AppId &appId, const StoreId &storeId, Mess SyncInfo syncInfo; int32_t subUser; if (!ITypesUtil::Unmarshal(data, syncInfo.seqId, syncInfo.mode, syncInfo.devices, syncInfo.delay, syncInfo.query, - subUser)) { + subUser, syncInfo.isRetry)) { ZLOGE("Unmarshal appId:%{public}s storeId:%{public}s", appId.appId.c_str(), Anonymous::Change(storeId.storeId).c_str()); return IPC_STUB_INVALID_DATA_ERR; diff --git a/services/distributeddataservice/service/test/mock/db_store_mock.cpp b/services/distributeddataservice/service/test/mock/db_store_mock.cpp index d10ec482bbd3ad0aef6e9abdd59f686a18022550..2135093bca584b7d5cf1237d2e92e5863dd923fa 100644 --- a/services/distributeddataservice/service/test/mock/db_store_mock.cpp +++ b/services/distributeddataservice/service/test/mock/db_store_mock.cpp @@ -378,6 +378,12 @@ DBStatus DBStoreMock::Sync(const DeviceSyncOption &option, const DeviceSyncProce return NOT_SUPPORT; } +DBStatus DBStoreMock::Sync(const DeviceSyncOption &option, + const std::function &devicesMap)> &onComplete) +{ + return NOT_SUPPORT; +} + DBStatus DBStoreMock::CancelSync(uint32_t syncId) { return NOT_SUPPORT; diff --git a/services/distributeddataservice/service/test/mock/db_store_mock.h b/services/distributeddataservice/service/test/mock/db_store_mock.h index 8f9e24ac238b15bb4994cd357633e72dba1a408c..f8ae7d49ab410a9e2cb553fa1077756edc11f652 100644 --- a/services/distributeddataservice/service/test/mock/db_store_mock.h +++ b/services/distributeddataservice/service/test/mock/db_store_mock.h @@ -114,6 +114,8 @@ public: DBStatus SetReceiveDataInterceptor(const DataInterceptor &interceptor) override; DBStatus GetDeviceEntries(const std::string &device, std::vector &entries) const override; DBStatus Sync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess) override; + DBStatus Sync(const DeviceSyncOption &option, + const std::function &devicesMap)> &onComplete) override; DBStatus CancelSync(uint32_t syncId) override; DatabaseStatus GetDatabaseStatus() const override; private: diff --git a/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_corruption_mock.cpp b/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_corruption_mock.cpp index ecc6b4b349ac6fd56b7db31136423eacd21e4aef..b9ead1e0773c4bf0d6dcaa135e3f6e79f3b98617 100644 --- a/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_corruption_mock.cpp +++ b/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_corruption_mock.cpp @@ -330,6 +330,12 @@ DBStatus KvStoreNbDelegateCorruptionMock::Sync(const DeviceSyncOption &option, return DBStatus::OK; } +DBStatus KvStoreNbDelegateCorruptionMock::Sync(const DeviceSyncOption &option, + const std::function &devicesMap)> &onComplete) +{ + return DBStatus::OK; +} + DBStatus KvStoreNbDelegateCorruptionMock::CancelSync(uint32_t syncId) { return DBStatus::OK; diff --git a/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_corruption_mock.h b/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_corruption_mock.h index 7f7834822b0a4dc8578043b48154e3cda7c99e3d..6a7fa48a087eb588a3a2c5f8b185837dd8c63ec6 100644 --- a/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_corruption_mock.h +++ b/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_corruption_mock.h @@ -105,6 +105,8 @@ public: DBStatus SetCloudSyncConfig(const CloudSyncConfig &config); DBStatus GetDeviceEntries(const std::string &device, std::vector &entries) const; DBStatus Sync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess); + DBStatus Sync(const DeviceSyncOption &option, + const std::function &devicesMap)> &onComplete); DBStatus CancelSync(uint32_t syncId); DatabaseStatus GetDatabaseStatus() const; DBStatus ClearMetaData(ClearKvMetaDataOption option); diff --git a/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_mock.cpp b/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_mock.cpp index fd5dab19db96d5b04be1fa3509c60f512bb20475..0d6667bc7b36246364593a244feab7a1ca903ccc 100644 --- a/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_mock.cpp +++ b/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_mock.cpp @@ -328,6 +328,12 @@ DBStatus KvStoreNbDelegateMock::Sync(const DeviceSyncOption &option, const Devic return DBStatus::OK; } +DBStatus KvStoreNbDelegateMock::Sync(const DeviceSyncOption &option, + const std::function &devicesMap)> &onComplete) +{ + return DBStatus::OK; +} + DBStatus KvStoreNbDelegateMock::CancelSync(uint32_t syncId) { return DBStatus::OK; diff --git a/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_mock.h b/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_mock.h index 8a97092b82cb5b7fc6f57fea11f33aec4cbf0c53..bf55801369bcf6b569b515c1a262108add64313b 100644 --- a/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_mock.h +++ b/services/distributeddataservice/service/test/mock/kv_store_nb_delegate_mock.h @@ -105,6 +105,8 @@ public: DBStatus SetCloudSyncConfig(const CloudSyncConfig &config); DBStatus GetDeviceEntries(const std::string &device, std::vector &entries) const; DBStatus Sync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess); + DBStatus Sync(const DeviceSyncOption &option, + const std::function &devicesMap)> &onComplete); DBStatus CancelSync(uint32_t syncId); DatabaseStatus GetDatabaseStatus() const; DBStatus ClearMetaData(ClearKvMetaDataOption option); diff --git a/services/distributeddataservice/service/test/mock/meta_data_manager_mock.cpp b/services/distributeddataservice/service/test/mock/meta_data_manager_mock.cpp index fff2933da39d07a3ff484c570b15638dbce00355..3a23d32296ed914847a17eca64fb454f2b79b6e3 100644 --- a/services/distributeddataservice/service/test/mock/meta_data_manager_mock.cpp +++ b/services/distributeddataservice/service/test/mock/meta_data_manager_mock.cpp @@ -41,9 +41,9 @@ bool OHOS::DistributedData::MetaDataManager::LoadMeta(const std::string &key, Se } bool OHOS::DistributedData::MetaDataManager::Sync(const std::vector &devices, - MetaDataManager::OnComplete complete, bool wait) + MetaDataManager::OnComplete complete, bool wait, bool isRetry) { - return BMetaDataManager::metaDataManager->Sync(devices, complete, wait); + return BMetaDataManager::metaDataManager->Sync(devices, complete, wait, isRetry); } template<> diff --git a/services/distributeddataservice/service/test/mock/meta_data_manager_mock.h b/services/distributeddataservice/service/test/mock/meta_data_manager_mock.h index 42926ab9209b1728815318dd28165f6942192e01..32bcd387345b2faf5265eac63e4dca59efd2a11d 100644 --- a/services/distributeddataservice/service/test/mock/meta_data_manager_mock.h +++ b/services/distributeddataservice/service/test/mock/meta_data_manager_mock.h @@ -26,7 +26,7 @@ namespace OHOS::DistributedData { class BMetaDataManager { public: virtual bool LoadMeta(const std::string &, Serializable &, bool) = 0; - virtual bool Sync(const std::vector &, MetaDataManager::OnComplete, bool) = 0; + virtual bool Sync(const std::vector &, MetaDataManager::OnComplete, bool, bool) = 0; BMetaDataManager() = default; virtual ~BMetaDataManager() = default; static inline std::shared_ptr metaDataManager = nullptr; @@ -34,7 +34,7 @@ public: class MetaDataManagerMock : public BMetaDataManager { public: MOCK_METHOD(bool, LoadMeta, (const std::string &, Serializable &, bool), (override)); - MOCK_METHOD(bool, Sync, (const std::vector &, MetaDataManager::OnComplete, bool), (override)); + MOCK_METHOD(bool, Sync, (const std::vector &, MetaDataManager::OnComplete, bool, bool), (override)); }; template class BMetaData { diff --git a/services/distributeddataservice/service/test/object_manager_mock_test.cpp b/services/distributeddataservice/service/test/object_manager_mock_test.cpp index faf7ca79cb6d74afb3c7a792747562ee955ee7f1..8258f515e62557cb3615b5a7cd1b3ef7705f382f 100644 --- a/services/distributeddataservice/service/test/object_manager_mock_test.cpp +++ b/services/distributeddataservice/service/test/object_manager_mock_test.cpp @@ -202,7 +202,7 @@ HWTEST_F(ObjectManagerMockTest, SyncOnStore001, TestSize.Level0) EXPECT_CALL(*devMgrAdapterMock, ToUUID(testing::A &>())) .WillOnce(Return(std::vector{ "mock_uuid_1" })); EXPECT_CALL(*metaDataManagerMock, LoadMeta(_, _, _)).WillOnce(testing::Return(false)); - EXPECT_CALL(*metaDataManagerMock, Sync(_, _, _)).WillOnce(testing::Return(true)); + EXPECT_CALL(*metaDataManagerMock, Sync(_, _, _, _)).WillOnce(testing::Return(true)); auto result = manager.SyncOnStore(prefix, remoteDeviceList, func); EXPECT_EQ(result, OBJECT_SUCCESS); }