From 05d94d289d195d613e9182908eaeac2bb23eb00e Mon Sep 17 00:00:00 2001 From: Hollokin Date: Thu, 17 Jul 2025 16:52:15 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hollokin --- .../service/object/include/object_common.h | 2 +- .../service/object/include/object_manager.h | 4 +- .../object/include/object_service_impl.h | 3 +- .../service/object/src/object_manager.cpp | 130 ++++++++++-------- .../object/src/object_service_impl.cpp | 19 +-- .../service/test/object_manager_test.cpp | 2 +- 6 files changed, 90 insertions(+), 70 deletions(-) diff --git a/services/distributeddataservice/service/object/include/object_common.h b/services/distributeddataservice/service/object/include/object_common.h index 759a48a5c..32410cc06 100644 --- a/services/distributeddataservice/service/object/include/object_common.h +++ b/services/distributeddataservice/service/object/include/object_common.h @@ -24,7 +24,7 @@ enum ObjectDistributedType : int32_t { enum Status : int32_t { OBJECT_SUCCESS, - OBJECT_DBSTATUS_ERROR, + OBJECT_DB_ERROR, OBJECT_INNER_ERROR, OBJECT_PERMISSION_DENIED, OBJECT_STORE_NOT_FOUND diff --git a/services/distributeddataservice/service/object/include/object_manager.h b/services/distributeddataservice/service/object/include/object_manager.h index eb61ef3f1..f4b567b85 100644 --- a/services/distributeddataservice/service/object/include/object_manager.h +++ b/services/distributeddataservice/service/object/include/object_manager.h @@ -177,11 +177,9 @@ private: bool IsNeedMetaSync(const StoreMetaData &meta, const std::vector &networkIds); int32_t DoSync(const std::string &prefix, const std::vector &deviceList, uint64_t sequenceId); std::string GetCurrentUser(); - void DoNotify(uint32_t tokenId, const CallbackInfo& value, const std::map& data, - bool allReady); + void DoNotify(const CallbackInfo &value, const std::map &data, bool allReady); void DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value, const std::string& objectKey, bool allReady); void DoNotifyWaitAssetTimeout(const std::string &objectKey); - std::map> GetAssetsFromStore(const ObjectRecord& changedData); static bool IsAssetKey(const std::string& key); static bool IsAssetComplete(const ObjectRecord& result, const std::string& assetPrefix); Assets GetAssetsFromDBRecords(const ObjectRecord& result); diff --git a/services/distributeddataservice/service/object/include/object_service_impl.h b/services/distributeddataservice/service/object/include/object_service_impl.h index 98f0fcd61..1d3a0d90c 100644 --- a/services/distributeddataservice/service/object/include/object_service_impl.h +++ b/services/distributeddataservice/service/object/include/object_service_impl.h @@ -71,8 +71,7 @@ private: }; void RegisterObjectServiceInfo(); void RegisterHandler(); - int32_t SaveMetaData(StoreMetaData& saveMeta); - void UpdateMetaData(); + int32_t SaveMetaData(StoreMetaData& saveMeta, const std::string &user); static Factory factory_; std::shared_ptr executors_; diff --git a/services/distributeddataservice/service/object/src/object_manager.cpp b/services/distributeddataservice/service/object/src/object_manager.cpp index e0665f740..d64f83765 100644 --- a/services/distributeddataservice/service/object/src/object_manager.cpp +++ b/services/distributeddataservice/service/object/src/object_manager.cpp @@ -17,6 +17,7 @@ #include "object_manager.h" #include +#include #include "accesstoken_kit.h" #include "account/account_delegate.h" @@ -72,6 +73,10 @@ ObjectStoreManager::~ObjectStoreManager() DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore() { + if (kvStoreDelegateManager_ == nullptr) { + ZLOGE("Kvstore delegate manager not init"); + return nullptr; + } DistributedDB::KvStoreNbDelegate *store = nullptr; DistributedDB::KvStoreNbDelegate::Option option; option.createDirByStoreIdOnly = true; @@ -86,10 +91,7 @@ DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore() } ZLOGI("GetKvStore successsfully"); store = kvStoreNbDelegate; - std::vector tmpKey; - DistributedDB::DBStatus status = store->RegisterObserver(tmpKey, - DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN, - &objectDataListener_); + auto status = store->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, &objectDataListener_); if (status != DistributedDB::DBStatus::OK) { ZLOGE("RegisterObserver err %{public}d", status); } @@ -106,7 +108,7 @@ bool ObjectStoreManager::RegisterAssetsLister() objectAssetsRecvListener_ = new ObjectAssetsRecvListener(); } auto status = DistributedFileDaemonManager::GetInstance().RegisterAssetCallback(objectAssetsRecvListener_); - if (status != DistributedDB::DBStatus::OK) { + if (status != OBJECT_SUCCESS) { ZLOGE("Register assetsRecvListener err %{public}d", status); return false; } @@ -136,6 +138,11 @@ void ObjectStoreManager::ProcessSyncCallback(const std::map callback) { + if (callback == nullptr) { + ZLOGE("callback is null, appId: %{public}s, sessionId: %{public}s", appId.c_str(), + Anonymous::Change(sessionId).c_str()); + return INVALID_ARGUMENT; + } auto proxy = iface_cast(callback); if (deviceId.size() == 0) { ZLOGE("DeviceId empty, appId: %{public}s, sessionId: %{public}s", appId.c_str(), @@ -208,6 +215,11 @@ int32_t ObjectStoreManager::PushAssets(const std::string &srcBundleName, const s int32_t ObjectStoreManager::RevokeSave( const std::string &appId, const std::string &sessionId, sptr callback) { + if (callback == nullptr) { + ZLOGE("callback is null, appId: %{public}s, sessionId: %{public}s", appId.c_str(), + Anonymous::Change(sessionId).c_str()); + return INVALID_ARGUMENT; + } auto proxy = iface_cast(callback); int32_t result = Open(); if (result != OBJECT_SUCCESS) { @@ -247,6 +259,11 @@ int32_t ObjectStoreManager::RevokeSave( int32_t ObjectStoreManager::Retrieve( const std::string &bundleName, const std::string &sessionId, sptr callback, uint32_t tokenId) { + if (callback == nullptr) { + ZLOGE("callback is null, appId: %{public}s, sessionId: %{public}s", appId.c_str(), + Anonymous::Change(sessionId).c_str()); + return INVALID_ARGUMENT; + } auto proxy = iface_cast(callback); int32_t result = Open(); if (result != OBJECT_SUCCESS) { @@ -373,11 +390,10 @@ int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId, int32_t user return result; } -void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId, - pid_t pid, uint32_t tokenId, - sptr callback) +void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId, pid_t pid, + uint32_t tokenId, sptr callback) { - if (bundleName.empty() || sessionId.empty()) { + if (bundleName.empty() || sessionId.empty() || callback == nullptr) { ZLOGD("ObjectStoreManager::RegisterRemoteCallback empty"); return; } @@ -386,7 +402,7 @@ void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, c std::string prefix = bundleName + sessionId; callbacks_.Compute(tokenId, ([pid, &proxy, &prefix](const uint32_t key, CallbackInfo &value) { if (value.pid != pid) { - value = CallbackInfo { pid }; + value = CallbackInfo{ pid }; } value.observers_.insert_or_assign(prefix, proxy); return !value.observers_.empty(); @@ -422,7 +438,7 @@ void ObjectStoreManager::UnregisterRemoteCallback( void ObjectStoreManager::RegisterProgressObserverCallback(const std::string &bundleName, const std::string &sessionId, pid_t pid, uint32_t tokenId, sptr callback) { - if (bundleName.empty() || sessionId.empty()) { + if (bundleName.empty() || sessionId.empty() || callback == nullptr) { ZLOGD("ObjectStoreManager::RegisterProgressObserverCallback empty bundleName = %{public}s, sessionId = " "%{public}s", bundleName.c_str(), DistributedData::Anonymous::Change(sessionId).c_str()); @@ -489,6 +505,9 @@ void ObjectStoreManager::UnregisterProgressObserverCallback( void ObjectStoreManager::NotifyChange(const ObjectRecord &changedData) { ZLOGI("OnChange start, size:%{public}zu", changedData.size()); + if (changedData.size() == 0) { + return; + } bool hasAsset = false; SaveInfo saveInfo; for (const auto &[key, value] : changedData) { @@ -502,7 +521,7 @@ void ObjectStoreManager::NotifyChange(const ObjectRecord &changedData) ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName); callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) { - DoNotify(tokenId, value, data, true); // no asset, data ready means all ready + DoNotify(value, data, true); // no asset, data ready means all ready return false; }); return; @@ -560,7 +579,7 @@ void ObjectStoreManager::ComputeStatus(const std::string& objectKey, const SaveI ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS); callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) { - DoNotify(tokenId, value, data, true); + DoNotify(value, data, true); return false; }); } else { @@ -568,7 +587,7 @@ void ObjectStoreManager::ComputeStatus(const std::string& objectKey, const SaveI ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName); callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) { - DoNotify(tokenId, value, data, false); + DoNotify(value, data, false); return false; }); WaitAssets(key, saveInfo, data); @@ -705,11 +724,10 @@ void ObjectStoreManager::NotifyAssetsReady( value = RestoreStatus::ALL_READY; ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS); - auto [has, taskId] = objectTimer_.Find(key); - if (has) { - executors_->Remove(taskId); - objectTimer_.Erase(key); - } + objectTimer_.ComputeIfPresent(key, [executors = executors_](const auto &key, auto &value) { + executors->Remove(value); + return false; + }); } else { value = RestoreStatus::ASSETS_READY; ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, @@ -784,8 +802,8 @@ Assets ObjectStoreManager::GetAssetsFromDBRecords(const ObjectRecord& result) return assets; } -void ObjectStoreManager::DoNotify(uint32_t tokenId, const CallbackInfo& value, - const std::map& data, bool allReady) +void ObjectStoreManager::DoNotify( + const CallbackInfo &value, const std::map &data, bool allReady) { for (const auto& observer : value.observers_) { auto it = data.find(observer.first); @@ -793,15 +811,16 @@ void ObjectStoreManager::DoNotify(uint32_t tokenId, const CallbackInfo& value, continue; } observer.second->Completed((*it).second, allReady); + restoreStatus_.ComputeIfPresent([allReady](auto &key, auto &value) { + if (allReady) { + return false; + } + value = RestoreStatus::DATA_NOTIFIED; + return true; + }); if (allReady) { - restoreStatus_.Erase(observer.first); ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED); - } else { - restoreStatus_.ComputeIfPresent(observer.first, [](const auto &key, auto &value) { - value = RestoreStatus::DATA_NOTIFIED; - return true; - }); } } } @@ -819,11 +838,10 @@ void ObjectStoreManager::DoNotifyAssetsReady(uint32_t tokenId, const CallbackInf ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED); } - auto [has, taskId] = objectTimer_.Find(objectKey); - if (has) { - executors_->Remove(taskId); - objectTimer_.Erase(objectKey); - } + objectTimer_.ComputeIfPresent(objectKey, [executors = executors_](const auto &key, auto &value) { + executors->Remove(value); + return false; + }); } } @@ -838,11 +856,10 @@ void ObjectStoreManager::DoNotifyWaitAssetTimeout(const std::string &objectKey) } observer.second->Completed(ObjectRecord(), true); restoreStatus_.Erase(objectKey); - auto [has, taskId] = objectTimer_.Find(objectKey); - if (has) { - executors_->Remove(taskId); - objectTimer_.Erase(objectKey); - } + objectTimer_.ComputeIfPresent(objectKey, [executors = executors_](const auto &key, auto &value) { + executors->Remove(value); + return false; + }); ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::NOTIFY, ObjectStore::RADAR_FAILED, ObjectStore::TIMEOUT, ObjectStore::FINISHED); } @@ -866,23 +883,19 @@ void ObjectStoreManager::SetData(const std::string &dataDir, const std::string & int32_t ObjectStoreManager::Open() { - if (kvStoreDelegateManager_ == nullptr) { - ZLOGE("Kvstore delegate manager not init"); - return OBJECT_INNER_ERROR; - } std::unique_lock lock(rwMutex_); - if (delegate_ == nullptr) { - delegate_ = OpenObjectKvStore(); - if (delegate_ == nullptr) { - ZLOGE("Open object kvstore failed"); - return OBJECT_DBSTATUS_ERROR; - } - syncCount_ = 1; - ZLOGI("Open object kvstore success"); - } else { + if (delegate_ != nullptr) { syncCount_++; - ZLOGI("Object kvstore syncCount: %{public}d", syncCount_.load()); + ZLOGI("Object kv store syncCount: %{public}d", syncCount_.load()); + return OBJECT_SUCCESS; + } + delegate_ = OpenObjectKvStore(); + if (delegate_ == nullptr) { + ZLOGE("Open object kv store failed"); + return OBJECT_DB_ERROR; } + syncCount_ = 1; + ZLOGI("Open object kv store success"); return OBJECT_SUCCESS; } @@ -892,6 +905,7 @@ void ObjectStoreManager::ForceClose() if (delegate_ == nullptr) { return; } + delegate_->UnRegisterObserver(&objectDataListener_); auto status = kvStoreDelegateManager_->CloseKvStore(delegate_); if (status != DistributedDB::DBStatus::OK) { ZLOGE("CloseKvStore fail %{public}d", status); @@ -925,6 +939,7 @@ void ObjectStoreManager::Close() void ObjectStoreManager::SyncCompleted( const std::map &results, uint64_t sequenceId) { + // todo 对results往外传的时候,需要对高斯的错误码做转换 std::string userId; SequenceSyncManager::Result result = SequenceSyncManager::GetInstance()->Process(sequenceId, results, userId); if (result == SequenceSyncManager::SUCCESS_USER_HAS_FINISHED && userId == userId_) { @@ -949,6 +964,7 @@ void ObjectStoreManager::FlushClosedStore() std::unique_lock lock(rwMutex_); if (!isSyncing_ && syncCount_ == 0 && delegate_ != nullptr) { ZLOGD("close store"); + delegate_->UnRegisterObserver(&objectDataListener_); auto status = kvStoreDelegateManager_->CloseKvStore(delegate_); if (status != DistributedDB::DBStatus::OK) { int timeOut = 1000; @@ -1053,11 +1069,15 @@ int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::str int32_t ObjectStoreManager::SyncOnStore( const std::string &prefix, const std::vector &deviceList, SyncCallBack &callback) { + if (callback == nullptr || deviceList.empty()) { + ZLOGE("do not need sync, prefix: %{public}s", Anonymous::Change(prefix).c_str()); + return INVALID_ARGUMENT; + } std::vector syncDevices; for (auto const &device : deviceList) { if (device == LOCAL_DEVICE) { ZLOGI("Save to local, do not need sync, prefix: %{public}s", Anonymous::Change(prefix).c_str()); - callback({{LOCAL_DEVICE, OBJECT_SUCCESS}}); + callback({ { LOCAL_DEVICE, OBJECT_SUCCESS } }); return OBJECT_SUCCESS; } syncDevices.emplace_back(DmAdaper::GetInstance().GetUuidByNetworkId(device)); @@ -1137,7 +1157,7 @@ int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix) } if (status != DistributedDB::DBStatus::OK) { ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status); - return DB_ERROR; + return OBJECT_DB_ERROR; } std::vector> keys; std::for_each(entries.begin(), entries.end(), [&keys](const DistributedDB::Entry &entry) { @@ -1150,7 +1170,7 @@ int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix) if (status != DistributedDB::DBStatus::OK) { ZLOGE("Delete entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status); - return DB_ERROR; + return OBJECT_DB_ERROR; } ZLOGI("Delete entries success, prefix: %{public}s, count: %{public}zu", Anonymous::Change(prefix).c_str(), keys.size()); @@ -1174,7 +1194,7 @@ int32_t ObjectStoreManager::RetrieveFromStore(const std::string &appId, const st } if (status != DistributedDB::DBStatus::OK) { ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status); - return DB_ERROR; + return OBJECT_DB_ERROR; } ZLOGI("GetEntries success,prefix:%{public}s,count:%{public}zu", Anonymous::Change(prefix).c_str(), entries.size()); for (const auto &entry : entries) { diff --git a/services/distributeddataservice/service/object/src/object_service_impl.cpp b/services/distributeddataservice/service/object/src/object_service_impl.cpp index 72a8aa0d0..754a4fd23 100644 --- a/services/distributeddataservice/service/object/src/object_service_impl.cpp +++ b/services/distributeddataservice/service/object/src/object_service_impl.cpp @@ -142,7 +142,9 @@ int32_t ObjectServiceImpl::OnInitialize() } executors_->Schedule(std::chrono::seconds(WAIT_ACCOUNT_SERVICE), [this]() { StoreMetaData saveMeta; - SaveMetaData(saveMeta); + int foregroundUserId = 0; + DistributedData::AccountDelegate::GetInstance()->QueryForegroundUserId(foregroundUserId); + SaveMetaData(saveMeta, std::to_string(foregroundUserId)); ObjectStoreManager::GetInstance()->SetData(saveMeta.dataDir, saveMeta.user); ObjectStoreManager::GetInstance()->InitUserMeta(); RegisterObjectServiceInfo(); @@ -153,7 +155,7 @@ int32_t ObjectServiceImpl::OnInitialize() return OBJECT_SUCCESS; } -int32_t ObjectServiceImpl::SaveMetaData(StoreMetaData &saveMeta) +int32_t ObjectServiceImpl::SaveMetaData(StoreMetaData &saveMeta, const std::string &user) { auto localDeviceId = DmAdapter::GetInstance().GetLocalDevice().uuid; if (localDeviceId.empty()) { @@ -176,9 +178,7 @@ int32_t ObjectServiceImpl::SaveMetaData(StoreMetaData &saveMeta) saveMeta.storeType = ObjectDistributedType::OBJECT_SINGLE_VERSION; saveMeta.dataType = DistributedKv::DataType::TYPE_DYNAMICAL; saveMeta.authType = DistributedKv::AuthType::IDENTICAL_ACCOUNT; - int foregroundUserId = 0; - DistributedData::AccountDelegate::GetInstance()->QueryForegroundUserId(foregroundUserId); - saveMeta.user = std::to_string(foregroundUserId); + saveMeta.user = user; saveMeta.dataDir = METADATA_STORE_PATH; if (!DistributedData::DirectoryManager::GetInstance().CreateDirectory(saveMeta.dataDir)) { ZLOGE("Create directory error, dataDir: %{public}s.", Anonymous::Change(saveMeta.dataDir).c_str()); @@ -187,7 +187,7 @@ int32_t ObjectServiceImpl::SaveMetaData(StoreMetaData &saveMeta) bool isSaved = DistributedData::MetaDataManager::GetInstance().SaveMeta(saveMeta.GetKeyWithoutPath(), saveMeta) && DistributedData::MetaDataManager::GetInstance().SaveMeta(saveMeta.GetKey(), saveMeta, true); if (!isSaved) { - ZLOGE("SaveMeta failed"); + ZLOGE("SaveMeta failed, store:%{public}s", Anonymous::Change(saveMeta.GetStoreAlias()).c_str()); return OBJECT_INNER_ERROR; } DistributedData::AppIDMetaData appIdMeta; @@ -196,6 +196,7 @@ int32_t ObjectServiceImpl::SaveMetaData(StoreMetaData &saveMeta) isSaved = DistributedData::MetaDataManager::GetInstance().SaveMeta(appIdMeta.GetKey(), appIdMeta, true); if (!isSaved) { ZLOGE("Save appIdMeta failed"); + return OBJECT_INNER_ERROR; } ZLOGI("SaveMeta success appId %{public}s, storeId %{public}s", saveMeta.appId.c_str(), saveMeta.GetStoreAlias().c_str()); @@ -204,14 +205,16 @@ int32_t ObjectServiceImpl::SaveMetaData(StoreMetaData &saveMeta) int32_t ObjectServiceImpl::OnUserChange(uint32_t code, const std::string &user, const std::string &account) { + ZLOGI("code:%{public}d, user:%{public}s, account:%{public}s", code, user.c_str(), + Anonymous::Change(account).c_str()); if (code == static_cast(AccountStatus::DEVICE_ACCOUNT_SWITCHED)) { int32_t status = ObjectStoreManager::GetInstance()->Clear(); if (status != OBJECT_SUCCESS) { ZLOGE("Clear fail user:%{public}s, status: %{public}d", user.c_str(), status); } StoreMetaData saveMeta; - SaveMetaData(saveMeta); - ObjectStoreManager::GetInstance()->SetData(saveMeta.dataDir, saveMeta.user); + SaveMetaData(saveMeta, user); + ObjectStoreManager::GetInstance()->SetData(saveMeta.dataDir, user); } return Feature::OnUserChange(code, user, account); } diff --git a/services/distributeddataservice/service/test/object_manager_test.cpp b/services/distributeddataservice/service/test/object_manager_test.cpp index 3462063eb..742da1289 100644 --- a/services/distributeddataservice/service/test/object_manager_test.cpp +++ b/services/distributeddataservice/service/test/object_manager_test.cpp @@ -952,7 +952,7 @@ HWTEST_F(ObjectManagerTest, BindAsset001, TestSize.Level0) std::string bundleName = "BindAsset"; uint32_t tokenId = IPCSkeleton::GetCallingTokenID(); auto result = manager->BindAsset(tokenId, bundleName, sessionId_, assetValue_, assetBindInfo_); - ASSERT_EQ(result, DistributedObject::OBJECT_DBSTATUS_ERROR); + ASSERT_EQ(result, DistributedObject::OBJECT_DB_ERROR); } /** -- Gitee From 6be8d1ae8c8da4b8b0f5f6d05cb4ba8fe86a9fa8 Mon Sep 17 00:00:00 2001 From: Hollokin Date: Thu, 17 Jul 2025 20:45:57 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E4=BD=BF=E7=94=A8=20RAII=20=E6=96=B9?= =?UTF-8?q?=E5=BC=8F=E7=AE=A1=E7=90=86=E8=B5=84=E6=BA=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hollokin --- .../service/object/include/object_manager.h | 25 +++- .../service/object/src/object_manager.cpp | 111 +++++++----------- 2 files changed, 64 insertions(+), 72 deletions(-) diff --git a/services/distributeddataservice/service/object/include/object_manager.h b/services/distributeddataservice/service/object/include/object_manager.h index f4b567b85..bd8ecdf5a 100644 --- a/services/distributeddataservice/service/object/include/object_manager.h +++ b/services/distributeddataservice/service/object/include/object_manager.h @@ -158,11 +158,31 @@ private: bool Unmarshal(const json &node) override; std::string ToPropertyPrefix(); }; + + class StoreGuard { + public: + StoreGuard() : opened_(false) {} + int32_t Open() + { + int32_t result = ObjectStoreManager::GetInstance()->Open(); + opened_ = (result == OBJECT_SUCCESS); + return result; + } + + ~StoreGuard() + { + if (opened_) { + ObjectStoreManager::GetInstance()->Close(); + } + } + + private: + bool opened_; + }; DistributedDB::KvStoreNbDelegate *OpenObjectKvStore(); void FlushClosedStore(); void Close(); void ForceClose(); - int32_t SetSyncStatus(bool status); int32_t SaveToStore(const std::string &appId, const std::string &sessionId, const std::string &toDeviceId, const ObjectRecord &data); int32_t SyncOnStore(const std::string &prefix, const std::vector &deviceList, SyncCallBack &callback); @@ -216,7 +236,8 @@ private: + DmAdaper::GetInstance().GetLocalDevice().udid; }; mutable std::shared_timed_mutex rwMutex_; - std::shared_ptr kvStoreDelegateManager_ = nullptr; + DistributedDB::KvStoreDelegateManager kvStoreDelegateManager_; + std::atomic hasInitMananger_{ false }; DistributedDB::KvStoreNbDelegate *delegate_ = nullptr; ObjectDataListener objectDataListener_; sptr objectAssetsRecvListener_ = nullptr; diff --git a/services/distributeddataservice/service/object/src/object_manager.cpp b/services/distributeddataservice/service/object/src/object_manager.cpp index d64f83765..bd34c376d 100644 --- a/services/distributeddataservice/service/object/src/object_manager.cpp +++ b/services/distributeddataservice/service/object/src/object_manager.cpp @@ -65,16 +65,14 @@ ObjectStoreManager::~ObjectStoreManager() ZLOGI("ObjectStoreManager destroy"); if (objectAssetsRecvListener_ != nullptr) { auto status = DistributedFileDaemonManager::GetInstance().UnRegisterAssetCallback(objectAssetsRecvListener_); - if (status != DistributedDB::DBStatus::OK) { - ZLOGE("UnRegister assetsRecvListener err %{public}d", status); - } + ZLOGI("UnRegister assetsRecvListener, status:%{public}d", status); } } DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore() { - if (kvStoreDelegateManager_ == nullptr) { - ZLOGE("Kvstore delegate manager not init"); + if (!hasInitMananger_.load()) { + ZLOGE("Kv store delegate manager not has been inited"); return nullptr; } DistributedDB::KvStoreNbDelegate *store = nullptr; @@ -82,8 +80,7 @@ DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore() option.createDirByStoreIdOnly = true; option.syncDualTupleMode = true; option.secOption = { DistributedDB::S1, DistributedDB::ECE }; - ZLOGD("start GetKvStore"); - kvStoreDelegateManager_->GetKvStore(ObjectCommon::OBJECTSTORE_DB_STOREID, option, + kvStoreDelegateManager_.GetKvStore(ObjectCommon::OBJECTSTORE_DB_STOREID, option, [&store, this](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) { if (dbStatus != DistributedDB::DBStatus::OK) { ZLOGE("GetKvStore fail %{public}d", dbStatus); @@ -121,7 +118,8 @@ void ObjectStoreManager::ProcessSyncCallback(const std::map callback) { - if (callback == nullptr) { - ZLOGE("callback is null, appId: %{public}s, sessionId: %{public}s", appId.c_str(), - Anonymous::Change(sessionId).c_str()); + if (callback == nullptr || deviceId.empty()) { + ZLOGE("invalid args, appId:%{public}s, sessionId:%{public}s, deviceId is empty?[%{public}d]", appId.c_str(), + Anonymous::Change(sessionId).c_str(), deviceId.empty()); return INVALID_ARGUMENT; } auto proxy = iface_cast(callback); - if (deviceId.size() == 0) { - ZLOGE("DeviceId empty, appId: %{public}s, sessionId: %{public}s", appId.c_str(), - Anonymous::Change(sessionId).c_str()); - proxy->Completed(std::map()); - return INVALID_ARGUMENT; - } - int32_t result = Open(); + StoreGuard storeGuard; + int32_t result = storeGuard.Open(); if (result != OBJECT_SUCCESS) { ZLOGE("Open object kvstore failed, result: %{public}d", result); ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE, ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, ObjectStore::GETKV_FAILED, ObjectStore::FINISHED); - proxy->Completed(std::map()); return STORE_NOT_OPEN; } SaveUserToMeta(); @@ -165,8 +155,6 @@ int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &se ZLOGE("Save to store failed, result: %{public}d", result); ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE, ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED); - Close(); - proxy->Completed(std::map()); return result; } ZLOGI("Sync data, bundleName: %{public}s, sessionId: %{public}s, deviceId: %{public}s", dstBundleName.c_str(), @@ -181,11 +169,8 @@ int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &se ZLOGE("Sync data failed, result: %{public}d", result); ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE, ObjectStore::SYNC_DATA, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED); - Close(); - proxy->Completed(std::map()); return result; } - Close(); return PushAssets(appId, dstBundleName, sessionId, data, deviceId); } @@ -221,7 +206,8 @@ int32_t ObjectStoreManager::RevokeSave( return INVALID_ARGUMENT; } auto proxy = iface_cast(callback); - int32_t result = Open(); + StoreGuard storeGuard; + int32_t result = storeGuard.Open(); if (result != OBJECT_SUCCESS) { ZLOGE("Open failed, errCode = %{public}d", result); proxy->Completed(STORE_NOT_OPEN); @@ -231,7 +217,6 @@ int32_t ObjectStoreManager::RevokeSave( result = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId)); if (result != OBJECT_SUCCESS) { ZLOGE("RevokeSave failed, errCode = %{public}d", result); - Close(); proxy->Completed(result); return result; } @@ -252,7 +237,6 @@ int32_t ObjectStoreManager::RevokeSave( } else { proxy->Completed(OBJECT_SUCCESS); }; - Close(); return result; } @@ -265,7 +249,8 @@ int32_t ObjectStoreManager::Retrieve( return INVALID_ARGUMENT; } auto proxy = iface_cast(callback); - int32_t result = Open(); + StoreGuard storeGuard; + int32_t result = storeGuard.Open(); if (result != OBJECT_SUCCESS) { ZLOGE("Open object kvstore failed, result: %{public}d", result); proxy->Completed(ObjectRecord(), false); @@ -275,7 +260,6 @@ int32_t ObjectStoreManager::Retrieve( int32_t status = RetrieveFromStore(bundleName, sessionId, results); if (status != OBJECT_SUCCESS) { ZLOGI("Retrieve from store failed, status: %{public}d, close after one minute.", status); - CloseAfterMinute(); proxy->Completed(ObjectRecord(), false); return status; } @@ -299,11 +283,9 @@ int32_t ObjectStoreManager::Retrieve( status = RevokeSaveToStore(GetPrefixWithoutDeviceId(bundleName, sessionId)); if (status != OBJECT_SUCCESS) { ZLOGE("Revoke save failed, status: %{public}d", status); - Close(); proxy->Completed(ObjectRecord(), false); return status; } - Close(); proxy->Completed(results, allReady); if (allReady) { ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, @@ -375,7 +357,8 @@ int32_t ObjectStoreManager::InitUserMeta() int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId, int32_t user) { - int32_t result = Open(); + StoreGuard storeGuard; + int32_t result = storeGuard.Open(); if (result != OBJECT_SUCCESS) { ZLOGE("Open store failed, result: %{public}d, appId: %{public}s, user: %{public}d", result, appId.c_str(), user); @@ -386,7 +369,6 @@ int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId, int32_t user ZLOGE("Revoke save failed, result: %{public}d, appId: %{public}s, user: %{public}d", result, appId.c_str(), user); } - Close(); return result; } @@ -870,10 +852,10 @@ void ObjectStoreManager::DoNotifyWaitAssetTimeout(const std::string &objectKey) void ObjectStoreManager::SetData(const std::string &dataDir, const std::string &userId) { ZLOGI("enter, user: %{public}s", userId.c_str()); - kvStoreDelegateManager_ = std::make_shared - (DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId); + kvStoreDelegateManager_(DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId); + hasInitMananger_ = true; DistributedDB::KvStoreConfig kvStoreConfig { dataDir }; - auto status = kvStoreDelegateManager_->SetKvStoreConfig(kvStoreConfig); + auto status = kvStoreDelegateManager_.SetKvStoreConfig(kvStoreConfig); if (status != DistributedDB::OK) { ZLOGE("Set kvstore config failed, status: %{public}d", status); return; @@ -906,7 +888,7 @@ void ObjectStoreManager::ForceClose() return; } delegate_->UnRegisterObserver(&objectDataListener_); - auto status = kvStoreDelegateManager_->CloseKvStore(delegate_); + auto status = kvStoreDelegateManager_.CloseKvStore(delegate_); if (status != DistributedDB::DBStatus::OK) { ZLOGE("CloseKvStore fail %{public}d", status); return; @@ -943,7 +925,7 @@ void ObjectStoreManager::SyncCompleted( std::string userId; SequenceSyncManager::Result result = SequenceSyncManager::GetInstance()->Process(sequenceId, results, userId); if (result == SequenceSyncManager::SUCCESS_USER_HAS_FINISHED && userId == userId_) { - SetSyncStatus(false); + isSyncing_ = false; FlushClosedStore(); } for (const auto &item : results) { @@ -965,7 +947,7 @@ void ObjectStoreManager::FlushClosedStore() if (!isSyncing_ && syncCount_ == 0 && delegate_ != nullptr) { ZLOGD("close store"); delegate_->UnRegisterObserver(&objectDataListener_); - auto status = kvStoreDelegateManager_->CloseKvStore(delegate_); + auto status = kvStoreDelegateManager_.CloseKvStore(delegate_); if (status != DistributedDB::DBStatus::OK) { int timeOut = 1000; executors_->Schedule(std::chrono::milliseconds(timeOut), [this]() { @@ -1050,7 +1032,7 @@ int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::str std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("delegate is nullptr."); - return E_DB_ERROR; + return OBJECT_DB_ERROR; } auto status = delegate_->PutBatch(entries); if (status != DistributedDB::DBStatus::OK) { @@ -1084,9 +1066,11 @@ int32_t ObjectStoreManager::SyncOnStore( } if (syncDevices.empty()) { ZLOGI("Device list is empty, prefix: %{public}s", Anonymous::Change(prefix).c_str()); - callback(std::map()); - return OBJECT_SUCCESS; + callback({}); + return OBJECT_SUCCESS; // todo 这部分不能放在客户端吗?? } + // todo SequenceSyncManager这个东西十分没必要,不应该在Sync开始之前就去注册,得在高斯的Sync之前注册 + // todo 可以一步完成的东西没必要经过一个中间的存储过程 uint64_t sequenceId = SequenceSyncManager::GetInstance()->AddNotifier(userId_, callback); int32_t id = AccountDelegate::GetInstance()->GetUserByToken(IPCSkeleton::GetCallingFullTokenID()); @@ -1111,7 +1095,7 @@ int32_t ObjectStoreManager::DoSync(const std::string &prefix, const std::vector< std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("db store was closed."); - return E_DB_ERROR; + return OBJECT_DB_ERROR; } DistributedDB::Query dbQuery = DistributedDB::Query::Select(); dbQuery.PrefixKey(std::vector(prefix.begin(), prefix.end())); @@ -1132,13 +1116,7 @@ int32_t ObjectStoreManager::DoSync(const std::string &prefix, const std::vector< SequenceSyncManager::GetInstance()->DeleteNotifier(sequenceId, tmp); return status; } - SetSyncStatus(true); - return OBJECT_SUCCESS; -} - -int32_t ObjectStoreManager::SetSyncStatus(bool status) -{ - isSyncing_ = status; + isSyncing_ = true; return OBJECT_SUCCESS; } @@ -1148,7 +1126,7 @@ int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix) std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("db store was closed."); - return E_DB_ERROR; + return OBJECT_DB_ERROR; } auto status = delegate_->GetEntries(std::vector(prefix.begin(), prefix.end()), entries); if (status == DistributedDB::DBStatus::NOT_FOUND) { @@ -1185,7 +1163,7 @@ int32_t ObjectStoreManager::RetrieveFromStore(const std::string &appId, const st std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("db store was closed."); - return E_DB_ERROR; + return OBJECT_DB_ERROR; } auto status = delegate_->GetEntries(std::vector(prefix.begin(), prefix.end()), entries); if (status == DistributedDB::DBStatus::NOT_FOUND) { @@ -1404,8 +1382,8 @@ int32_t ObjectStoreManager::BindAsset(const uint32_t tokenId, const std::string& return std::make_shared>>(); }); auto snapshots = snapshots_.Find(snapshotKey).second; - bindSnapshots_.Compute(storeKey, [this, &asset, snapshots] (const auto &key, auto &value) { - value->emplace(std::pair{asset.uri, snapshots}); + bindSnapshots_.Compute(storeKey, [this, &asset, snapshots](const auto &key, auto &value) { + value->emplace(std::pair{ asset.uri, snapshots }); return true; }); @@ -1422,7 +1400,7 @@ int32_t ObjectStoreManager::BindAsset(const uint32_t tokenId, const std::string& storeInfo.user = tokenInfo.userID; storeInfo.storeName = bindInfo.storeName; - snapshots_.Compute(snapshotKey, [this, &asset, &bindInfo, &storeInfo] (const auto &key, auto &value) { + snapshots_.Compute(snapshotKey, [this, &asset, &bindInfo, &storeInfo](const auto &key, auto &value) { value->BindAsset(ValueProxy::Convert(std::move(asset)), ConvertBindInfo(bindInfo), storeInfo); return true; }); @@ -1443,23 +1421,16 @@ DistributedData::AssetBindInfo ObjectStoreManager::ConvertBindInfo(ObjectStore:: int32_t ObjectStoreManager::OnAssetChanged(const uint32_t tokenId, const std::string& appId, const std::string& sessionId, const std::string& deviceId, const ObjectStore::Asset& asset) { - const int32_t userId = AccountDelegate::GetInstance()->GetUserByToken(tokenId); auto objectAsset = asset; - Asset dataAsset = ValueProxy::Convert(std::move(objectAsset)); + Asset dataAsset = ValueProxy::Convert(std::move(objectAsset)); auto snapshotKey = appId + SEPERATOR + sessionId; - int32_t res = OBJECT_SUCCESS; - bool exist = snapshots_.ComputeIfPresent(snapshotKey, - [&res, &dataAsset, &deviceId](const std::string &key, std::shared_ptr snapshot) { - if (snapshot != nullptr) { - res = snapshot->OnDataChanged(dataAsset, deviceId); // needChange - } - return snapshot != nullptr; - }); - if (exist) { - return res; + auto [exist, snapshot] = snapshots_.Find(snapshotKey); + if (exist && snapshot != nullptr) { + return snapshot->OnDataChanged(dataAsset, deviceId); // needChange } - auto block = std::make_shared>>(WAIT_TIME, std::tuple{ true, true }); + const int32_t userId = AccountDelegate::GetInstance()->GetUserByToken(tokenId); + auto block = std::make_shared>>(WAIT_TIME, std::tuple{ true, false }); ObjectAssetLoader::GetInstance()->TransferAssetsAsync(userId, appId, deviceId, { dataAsset }, [block](bool ret) { block->SetValue({ false, ret }); }); -- Gitee From 965b36dd1413d40ff738fb9860dc2bc7ad024f21 Mon Sep 17 00:00:00 2001 From: Hollokin Date: Thu, 17 Jul 2025 21:44:38 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E5=8E=BB=E6=8E=89=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E6=B5=81=E7=A8=8B=E4=B8=AD=E7=9A=84=E7=BB=93=E6=9E=9C=E5=9B=9E?= =?UTF-8?q?=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hollokin --- .../object/include/object_callback_proxy.h | 15 ----- .../service/object/include/object_manager.h | 2 +- .../object/include/object_service_impl.h | 4 +- .../object/src/object_callback_proxy.cpp | 24 -------- .../service/object/src/object_manager.cpp | 55 +++++++------------ .../object/src/object_service_impl.cpp | 6 +- 6 files changed, 26 insertions(+), 80 deletions(-) diff --git a/services/distributeddataservice/service/object/include/object_callback_proxy.h b/services/distributeddataservice/service/object/include/object_callback_proxy.h index fabd14941..d602c3e4f 100644 --- a/services/distributeddataservice/service/object/include/object_callback_proxy.h +++ b/services/distributeddataservice/service/object/include/object_callback_proxy.h @@ -51,21 +51,6 @@ private: static inline BrokerDelegator delegator_; }; -class ObjectRetrieveCallbackProxyBroker : public IObjectRetrieveCallback, public IRemoteBroker { -public: - DECLARE_INTERFACE_DESCRIPTOR(u"OHOS.DistributedObject.IObjectRetrieveCallback"); -}; - -class ObjectRetrieveCallbackProxy : public IRemoteProxy { -public: - explicit ObjectRetrieveCallbackProxy(const sptr &impl); - ~ObjectRetrieveCallbackProxy() = default; - void Completed(const std::map> &results, bool allReady) override; - -private: - static inline BrokerDelegator delegator_; -}; - class ObjectChangeCallbackProxyBroker : public IObjectChangeCallback, public IRemoteBroker { public: DECLARE_INTERFACE_DESCRIPTOR(u"OHOS.DistributedObject.IObjectChangeCallback"); diff --git a/services/distributeddataservice/service/object/include/object_manager.h b/services/distributeddataservice/service/object/include/object_manager.h index bd8ecdf5a..062e6420a 100644 --- a/services/distributeddataservice/service/object/include/object_manager.h +++ b/services/distributeddataservice/service/object/include/object_manager.h @@ -88,7 +88,7 @@ public: int32_t RevokeSave( const std::string &appId, const std::string &sessionId, sptr callback); int32_t Retrieve(const std::string &bundleName, const std::string &sessionId, - sptr callback, uint32_t tokenId); + std::map> &data, bool &allReady); void SetData(const std::string &dataDir, const std::string &userId); int32_t Clear(); int32_t InitUserMeta(); diff --git a/services/distributeddataservice/service/object/include/object_service_impl.h b/services/distributeddataservice/service/object/include/object_service_impl.h index 1d3a0d90c..a2da46896 100644 --- a/services/distributeddataservice/service/object/include/object_service_impl.h +++ b/services/distributeddataservice/service/object/include/object_service_impl.h @@ -31,8 +31,8 @@ public: int32_t ObjectStoreSave(const std::string &bundleName, const std::string &sessionId, const std::string &deviceId, const std::map> &data, sptr callback) override; - int32_t ObjectStoreRetrieve( - const std::string &bundleName, const std::string &sessionId, sptr callback) override; + int32_t ObjectStoreRetrieve(const std::string &bundleName, const std::string &sessionId, + std::map> &data, bool &allReady) override; int32_t ObjectStoreRevokeSave(const std::string &bundleName, const std::string &sessionId, sptr callback) override; int32_t RegisterDataObserver(const std::string &bundleName, const std::string &sessionId, diff --git a/services/distributeddataservice/service/object/src/object_callback_proxy.cpp b/services/distributeddataservice/service/object/src/object_callback_proxy.cpp index 32fdda715..2d061652e 100644 --- a/services/distributeddataservice/service/object/src/object_callback_proxy.cpp +++ b/services/distributeddataservice/service/object/src/object_callback_proxy.cpp @@ -32,11 +32,6 @@ ObjectRevokeSaveCallbackProxy::ObjectRevokeSaveCallbackProxy(const sptr &impl) - : IRemoteProxy(impl) -{ -} - ObjectChangeCallbackProxy::ObjectChangeCallbackProxy(const sptr &impl) : IRemoteProxy(impl) { @@ -85,25 +80,6 @@ void ObjectRevokeSaveCallbackProxy::Completed(int32_t status) } } -void ObjectRetrieveCallbackProxy::Completed(const std::map> &results, bool allReady) -{ - MessageParcel data; - MessageParcel reply; - if (!data.WriteInterfaceToken(GetDescriptor())) { - ZLOGE("write descriptor failed"); - return; - } - if (!ITypesUtil::Marshal(data, results, allReady)) { - ZLOGE("Marshalling failed, allReady:%{public}d", allReady); - return; - } - MessageOption mo { MessageOption::TF_SYNC }; - int error = Remote()->SendRequest(COMPLETED, data, reply, mo); - if (error != 0) { - ZLOGW("SendRequest failed, error %{public}d", error); - } -} - void ObjectChangeCallbackProxy::Completed(const std::map> &results, bool allReady) { MessageParcel data; diff --git a/services/distributeddataservice/service/object/src/object_manager.cpp b/services/distributeddataservice/service/object/src/object_manager.cpp index bd34c376d..b7f5f7eeb 100644 --- a/services/distributeddataservice/service/object/src/object_manager.cpp +++ b/services/distributeddataservice/service/object/src/object_manager.cpp @@ -17,7 +17,6 @@ #include "object_manager.h" #include -#include #include "accesstoken_kit.h" #include "account/account_delegate.h" @@ -197,6 +196,7 @@ int32_t ObjectStoreManager::PushAssets(const std::string &srcBundleName, const s return status; } +// 该接口JsRevokeSave对外呈现的错误码,统一抛出为INNER_ERROR int32_t ObjectStoreManager::RevokeSave( const std::string &appId, const std::string &sessionId, sptr callback) { @@ -210,62 +210,52 @@ int32_t ObjectStoreManager::RevokeSave( int32_t result = storeGuard.Open(); if (result != OBJECT_SUCCESS) { ZLOGE("Open failed, errCode = %{public}d", result); - proxy->Completed(STORE_NOT_OPEN); return STORE_NOT_OPEN; } result = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId)); if (result != OBJECT_SUCCESS) { ZLOGE("RevokeSave failed, errCode = %{public}d", result); - proxy->Completed(result); return result; } std::vector deviceList; auto deviceInfos = DmAdaper::GetInstance().GetRemoteDevices(); std::for_each(deviceInfos.begin(), deviceInfos.end(), [&deviceList](AppDistributedKv::DeviceInfo info) { deviceList.emplace_back(info.networkId); }); - if (!deviceList.empty()) { - SyncCallBack tmp = [proxy](const std::map &results) { - ZLOGI("revoke save finished"); - proxy->Completed(OBJECT_SUCCESS); - }; - result = SyncOnStore(GetPropertyPrefix(appId, sessionId), deviceList, tmp); - if (result != OBJECT_SUCCESS) { - ZLOGE("sync failed, errCode = %{public}d", result); - proxy->Completed(result); - } - } else { + if (deviceList.empty()) { + ZLOGI("no remote device, no need to sync."); + proxy->Completed(OBJECT_SUCCESS); + return OBJECT_SUCCESS; + } + SyncCallBack tmp = [proxy](const std::map &results) { + ZLOGI("revoke save finished"); proxy->Completed(OBJECT_SUCCESS); }; + result = SyncOnStore(GetPropertyPrefix(appId, sessionId), deviceList, tmp); + if (result != OBJECT_SUCCESS) { + ZLOGE("sync failed, errCode = %{public}d", result); + } return result; } -int32_t ObjectStoreManager::Retrieve( - const std::string &bundleName, const std::string &sessionId, sptr callback, uint32_t tokenId) +// todo 同步接口,没有耗时操作,不需要注册回调,直接把结果results通过IPC传过去就好了 +// todo 这里的callback参数应该被results代替 +int32_t ObjectStoreManager::Retrieve(const std::string &bundleName, const std::string &sessionId, + std::map> &data, bool &allReady) { - if (callback == nullptr) { - ZLOGE("callback is null, appId: %{public}s, sessionId: %{public}s", appId.c_str(), - Anonymous::Change(sessionId).c_str()); - return INVALID_ARGUMENT; - } - auto proxy = iface_cast(callback); StoreGuard storeGuard; int32_t result = storeGuard.Open(); if (result != OBJECT_SUCCESS) { ZLOGE("Open object kvstore failed, result: %{public}d", result); - proxy->Completed(ObjectRecord(), false); return ObjectStore::GETKV_FAILED; } - ObjectRecord results{}; - int32_t status = RetrieveFromStore(bundleName, sessionId, results); + int32_t status = RetrieveFromStore(bundleName, sessionId, data); if (status != OBJECT_SUCCESS) { ZLOGI("Retrieve from store failed, status: %{public}d, close after one minute.", status); - proxy->Completed(ObjectRecord(), false); return status; } - bool allReady = false; - Assets assets = GetAssetsFromDBRecords(results); - if (assets.empty() || results.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == results.end()) { + Assets assets = GetAssetsFromDBRecords(data); + if (assets.empty() || data.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == data.end()) { allReady = true; } else { auto objectKey = bundleName + sessionId; @@ -283,10 +273,8 @@ int32_t ObjectStoreManager::Retrieve( status = RevokeSaveToStore(GetPrefixWithoutDeviceId(bundleName, sessionId)); if (status != OBJECT_SUCCESS) { ZLOGE("Revoke save failed, status: %{public}d", status); - proxy->Completed(ObjectRecord(), false); return status; } - proxy->Completed(results, allReady); if (allReady) { ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED); @@ -1166,10 +1154,7 @@ int32_t ObjectStoreManager::RetrieveFromStore(const std::string &appId, const st return OBJECT_DB_ERROR; } auto status = delegate_->GetEntries(std::vector(prefix.begin(), prefix.end()), entries); - if (status == DistributedDB::DBStatus::NOT_FOUND) { - ZLOGI("Get entries empty, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status); - return KEY_NOT_FOUND; - } + // 服务侧不关心该错误码具体是什么,该错误码不对外 if (status != DistributedDB::DBStatus::OK) { ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status); return OBJECT_DB_ERROR; diff --git a/services/distributeddataservice/service/object/src/object_service_impl.cpp b/services/distributeddataservice/service/object/src/object_service_impl.cpp index 754a4fd23..cb1dfe7e2 100644 --- a/services/distributeddataservice/service/object/src/object_service_impl.cpp +++ b/services/distributeddataservice/service/object/src/object_service_impl.cpp @@ -236,8 +236,8 @@ int32_t ObjectServiceImpl::ObjectStoreRevokeSave( return OBJECT_SUCCESS; } -int32_t ObjectServiceImpl::ObjectStoreRetrieve( - const std::string &bundleName, const std::string &sessionId, sptr callback) +int32_t ObjectServiceImpl::ObjectStoreRetrieve(const std::string &bundleName, const std::string &sessionId, + std::map> &data, bool &allReady) { ZLOGI("begin."); uint32_t tokenId = IPCSkeleton::GetCallingTokenID(); @@ -245,7 +245,7 @@ int32_t ObjectServiceImpl::ObjectStoreRetrieve( if (status != OBJECT_SUCCESS) { return status; } - status = ObjectStoreManager::GetInstance()->Retrieve(bundleName, sessionId, callback, tokenId); + status = ObjectStoreManager::GetInstance()->Retrieve(bundleName, sessionId, data, allReady); if (status != OBJECT_SUCCESS) { ZLOGE("retrieve fail %{public}d", status); return status; -- Gitee From 913e7bf348f21c4ce50829d15e2b1994527465d1 Mon Sep 17 00:00:00 2001 From: Hollokin Date: Fri, 18 Jul 2025 16:13:51 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Hollokin --- .../service/object/include/object_manager.h | 1 + .../service/object/src/object_manager.cpp | 42 +++++++++---------- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/services/distributeddataservice/service/object/include/object_manager.h b/services/distributeddataservice/service/object/include/object_manager.h index 062e6420a..c67e3ae99 100644 --- a/services/distributeddataservice/service/object/include/object_manager.h +++ b/services/distributeddataservice/service/object/include/object_manager.h @@ -66,6 +66,7 @@ public: using DmAdaper = OHOS::DistributedData::DeviceManagerAdapter; using UriToSnapshot = std::shared_ptr>>; using StoreMetaData = OHOS::DistributedData::StoreMetaData; + using DBStatus = DistributedDB::DBStatus; enum RestoreStatus : int32_t { NONE = 0, diff --git a/services/distributeddataservice/service/object/src/object_manager.cpp b/services/distributeddataservice/service/object/src/object_manager.cpp index b7f5f7eeb..8bc1e77f1 100644 --- a/services/distributeddataservice/service/object/src/object_manager.cpp +++ b/services/distributeddataservice/service/object/src/object_manager.cpp @@ -79,16 +79,16 @@ DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore() option.createDirByStoreIdOnly = true; option.syncDualTupleMode = true; option.secOption = { DistributedDB::S1, DistributedDB::ECE }; - kvStoreDelegateManager_.GetKvStore(ObjectCommon::OBJECTSTORE_DB_STOREID, option, - [&store, this](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) { - if (dbStatus != DistributedDB::DBStatus::OK) { + kvStoreDelegateManager_.GetKvStore( + ObjectCommon::OBJECTSTORE_DB_STOREID, option, [&store, this](auto dbStatus, auto *kvStoreNbDelegate) { + if (dbStatus != DBStatus::OK) { ZLOGE("GetKvStore fail %{public}d", dbStatus); return; } ZLOGI("GetKvStore successsfully"); store = kvStoreNbDelegate; auto status = store->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, &objectDataListener_); - if (status != DistributedDB::DBStatus::OK) { + if (status != DBStatus::OK) { ZLOGE("RegisterObserver err %{public}d", status); } }); @@ -842,8 +842,7 @@ void ObjectStoreManager::SetData(const std::string &dataDir, const std::string & ZLOGI("enter, user: %{public}s", userId.c_str()); kvStoreDelegateManager_(DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId); hasInitMananger_ = true; - DistributedDB::KvStoreConfig kvStoreConfig { dataDir }; - auto status = kvStoreDelegateManager_.SetKvStoreConfig(kvStoreConfig); + auto status = kvStoreDelegateManager_.SetKvStoreConfig({ dataDir }); if (status != DistributedDB::OK) { ZLOGE("Set kvstore config failed, status: %{public}d", status); return; @@ -877,7 +876,7 @@ void ObjectStoreManager::ForceClose() } delegate_->UnRegisterObserver(&objectDataListener_); auto status = kvStoreDelegateManager_.CloseKvStore(delegate_); - if (status != DistributedDB::DBStatus::OK) { + if (status != DBStatus::OK) { ZLOGE("CloseKvStore fail %{public}d", status); return; } @@ -906,8 +905,7 @@ void ObjectStoreManager::Close() FlushClosedStore(); } -void ObjectStoreManager::SyncCompleted( - const std::map &results, uint64_t sequenceId) +void ObjectStoreManager::SyncCompleted(const std::map &results, uint64_t sequenceId) { // todo 对results往外传的时候,需要对高斯的错误码做转换 std::string userId; @@ -917,7 +915,7 @@ void ObjectStoreManager::SyncCompleted( FlushClosedStore(); } for (const auto &item : results) { - if (item.second == DistributedDB::DBStatus::OK) { + if (item.second == DBStatus::OK) { ZLOGI("Sync data success, sequenceId: 0x%{public}" PRIx64 "", sequenceId); ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::SAVE, ObjectStore::SYNC_DATA, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED); @@ -936,7 +934,7 @@ void ObjectStoreManager::FlushClosedStore() ZLOGD("close store"); delegate_->UnRegisterObserver(&objectDataListener_); auto status = kvStoreDelegateManager_.CloseKvStore(delegate_); - if (status != DistributedDB::DBStatus::OK) { + if (status != DBStatus::OK) { int timeOut = 1000; executors_->Schedule(std::chrono::milliseconds(timeOut), [this]() { FlushClosedStore(); @@ -958,7 +956,7 @@ void ObjectStoreManager::ProcessOldEntry(const std::string &appId) return; } auto status = delegate_->GetEntries(std::vector(appId.begin(), appId.end()), entries); - if (status != DistributedDB::DBStatus::OK) { + if (status != DBStatus::OK) { ZLOGE("Get old entries failed, bundleName: %{public}s, status %{public}d", appId.c_str(), status); return; } @@ -1023,7 +1021,7 @@ int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::str return OBJECT_DB_ERROR; } auto status = delegate_->PutBatch(entries); - if (status != DistributedDB::DBStatus::OK) { + if (status != DBStatus::OK) { ZLOGE("PutBatch failed, bundleName: %{public}s, sessionId: %{public}s, dstNetworkId: %{public}s, " "status: %{public}d", appId.c_str(), Anonymous::Change(sessionId).c_str(), Anonymous::Change(toDeviceId).c_str(), status); @@ -1089,15 +1087,15 @@ int32_t ObjectStoreManager::DoSync(const std::string &prefix, const std::vector< dbQuery.PrefixKey(std::vector(prefix.begin(), prefix.end())); ZLOGI("Start sync data, sequenceId: 0x%{public}" PRIx64 "", sequenceId); auto status = delegate_->Sync(deviceList, DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY, - [this, sequenceId](const std::map &devicesMap) { + [this, sequenceId](const std::map &devicesMap) { ZLOGI("Sync data finished, sequenceId: 0x%{public}" PRIx64 "", sequenceId); - std::map result; + std::map result; for (auto &item : devicesMap) { result[DmAdaper::GetInstance().ToNetworkID(item.first)] = item.second; } SyncCompleted(result, sequenceId); }, dbQuery, false); - if (status != DistributedDB::DBStatus::OK) { + if (status != DBStatus::OK) { ZLOGE("Sync data failed, prefix: %{public}s, sequenceId: 0x%{public}" PRIx64 ", status: %{public}d", Anonymous::Change(prefix).c_str(), sequenceId, status); std::string tmp; @@ -1117,11 +1115,11 @@ int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix) return OBJECT_DB_ERROR; } auto status = delegate_->GetEntries(std::vector(prefix.begin(), prefix.end()), entries); - if (status == DistributedDB::DBStatus::NOT_FOUND) { + if (status == DBStatus::NOT_FOUND) { ZLOGI("Get entries empty, prefix: %{public}s", Anonymous::Change(prefix).c_str()); return OBJECT_SUCCESS; } - if (status != DistributedDB::DBStatus::OK) { + if (status != DBStatus::OK) { ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status); return OBJECT_DB_ERROR; } @@ -1133,7 +1131,7 @@ int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix) return OBJECT_SUCCESS; } status = delegate_->DeleteBatch(keys); - if (status != DistributedDB::DBStatus::OK) { + if (status != DBStatus::OK) { ZLOGE("Delete entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status); return OBJECT_DB_ERROR; @@ -1155,7 +1153,7 @@ int32_t ObjectStoreManager::RetrieveFromStore(const std::string &appId, const st } auto status = delegate_->GetEntries(std::vector(prefix.begin(), prefix.end()), entries); // 服务侧不关心该错误码具体是什么,该错误码不对外 - if (status != DistributedDB::DBStatus::OK) { + if (status != DBStatus::OK) { ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status); return OBJECT_DB_ERROR; } @@ -1304,7 +1302,7 @@ uint64_t SequenceSyncManager::AddNotifier(const std::string &userId, SyncCallBac } SequenceSyncManager::Result SequenceSyncManager::Process( - uint64_t sequenceId, const std::map &results, std::string &userId) + uint64_t sequenceId, const std::map &results, std::string &userId) { std::lock_guard lock(notifierLock_); if (seqIdCallbackRelations_.count(sequenceId) == 0) { @@ -1313,7 +1311,7 @@ SequenceSyncManager::Result SequenceSyncManager::Process( } std::map syncResults; for (const auto &item : results) { - syncResults[item.first] = item.second == DistributedDB::DBStatus::OK ? 0 : -1; + syncResults[item.first] = item.second == DBStatus::OK ? 0 : -1; } seqIdCallbackRelations_[sequenceId](syncResults); ZLOGD("end complete"); -- Gitee From 58031b767ba8430c338bd8379d7d92c0043c0cb5 Mon Sep 17 00:00:00 2001 From: Hollokin Date: Wed, 23 Jul 2025 09:16:10 +0800 Subject: [PATCH 5/5] update Signed-off-by: Hollokin --- .../object/include/object_asset_loader.h | 4 +- .../object/include/object_data_listener.h | 23 +++---- .../object/include/object_dms_handler.h | 2 +- .../service/object/include/object_manager.h | 4 +- .../object/include/object_service_impl.h | 4 +- .../object/include/object_service_stub.h | 7 ++- .../object/src/object_asset_loader.cpp | 12 ++-- .../object/src/object_callback_proxy.cpp | 12 ++-- .../object/src/object_data_listener.cpp | 17 +++-- .../service/object/src/object_dms_handler.cpp | 4 +- .../service/object/src/object_manager.cpp | 62 +++++++++---------- .../object/src/object_service_impl.cpp | 1 + .../object/src/object_service_stub.cpp | 2 +- .../service/object/src/object_snapshot.cpp | 6 +- 14 files changed, 74 insertions(+), 86 deletions(-) diff --git a/services/distributeddataservice/service/object/include/object_asset_loader.h b/services/distributeddataservice/service/object/include/object_asset_loader.h index 44d3f8ab0..e0ca25a7f 100644 --- a/services/distributeddataservice/service/object/include/object_asset_loader.h +++ b/services/distributeddataservice/service/object/include/object_asset_loader.h @@ -43,9 +43,9 @@ class ObjectAssetLoader { public: static ObjectAssetLoader *GetInstance(); void SetThreadPool(std::shared_ptr executors); - bool Transfer(const int32_t userId, const std::string& bundleName, const std::string& deviceId, + bool Transfer(int32_t userId, const std::string& bundleName, const std::string& deviceId, const DistributedData::Asset& asset); - void TransferAssetsAsync(const int32_t userId, const std::string& bundleName, const std::string& deviceId, + void TransferAssetsAsync(int32_t userId, const std::string& bundleName, const std::string& deviceId, const std::vector& assets, const TransferFunc& callback); int32_t PushAsset(int32_t userId, const sptr &assetObj, const sptr &sendCallback); diff --git a/services/distributeddataservice/service/object/include/object_data_listener.h b/services/distributeddataservice/service/object/include/object_data_listener.h index 7c3140c7b..462d1be3b 100644 --- a/services/distributeddataservice/service/object/include/object_data_listener.h +++ b/services/distributeddataservice/service/object/include/object_data_listener.h @@ -16,8 +16,8 @@ #ifndef DISTRIBUTEDDATAMGR_OBJECT_DATA_LISTENER_H #define DISTRIBUTEDDATAMGR_OBJECT_DATA_LISTENER_H -#include "kv_store_observer.h" #include "asset/asset_recv_callback_stub.h" +#include "kv_store_observer.h" namespace OHOS { namespace DistributedObject { using AssetObj = Storage::DistributedFile::AssetObj; @@ -31,17 +31,14 @@ public: class ObjectAssetsRecvListener : public Storage::DistributedFile::AssetRecvCallbackStub { public: - ObjectAssetsRecvListener() =default; - ~ObjectAssetsRecvListener() =default; - int32_t OnStart(const std::string &srcNetworkId, - const std::string &dstNetworkId, const std::string &sessionId, - const std::string &dstBundleName) override; - int32_t OnFinished(const std::string &srcNetworkId, - const sptr &assetObj, int32_t result) override; - int32_t OnRecvProgress(const std::string &srcNetworkId, - const sptr &assetObj, - uint64_t totalBytes, uint64_t processBytes) override; + ObjectAssetsRecvListener() = default; + ~ObjectAssetsRecvListener() = default; + int32_t OnStart(const std::string &srcNetworkId, const std::string &dstNetworkId, const std::string &sessionId, + const std::string &dstBundleName) override; + int32_t OnFinished(const std::string &srcNetworkId, const sptr &assetObj, int32_t result) override; + int32_t OnRecvProgress(const std::string &srcNetworkId, const sptr &assetObj, uint64_t totalBytes, + uint64_t processBytes) override; }; -} // namespace DistributedObject -} // namespace OHOS +} // namespace DistributedObject +} // namespace OHOS #endif // DISTRIBUTEDDATAMGR_OBJECT_DATA_LISTENER_H diff --git a/services/distributeddataservice/service/object/include/object_dms_handler.h b/services/distributeddataservice/service/object/include/object_dms_handler.h index ee4208a27..d29db2986 100644 --- a/services/distributeddataservice/service/object/include/object_dms_handler.h +++ b/services/distributeddataservice/service/object/include/object_dms_handler.h @@ -18,8 +18,8 @@ #include -#include "dms_listener_stub.h" #include "distributed_sched_types.h" +#include "dms_listener_stub.h" namespace OHOS::DistributedObject { class DmsEventListener : public DistributedSchedule::DSchedEventListenerStub { diff --git a/services/distributeddataservice/service/object/include/object_manager.h b/services/distributeddataservice/service/object/include/object_manager.h index c67e3ae99..885ab4b88 100644 --- a/services/distributeddataservice/service/object/include/object_manager.h +++ b/services/distributeddataservice/service/object/include/object_manager.h @@ -199,7 +199,7 @@ private: int32_t DoSync(const std::string &prefix, const std::vector &deviceList, uint64_t sequenceId); std::string GetCurrentUser(); void DoNotify(const CallbackInfo &value, const std::map &data, bool allReady); - void DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value, const std::string& objectKey, bool allReady); + void DoNotifyAssetsReady(const CallbackInfo& value, const std::string& objectKey, bool allReady); void DoNotifyWaitAssetTimeout(const std::string &objectKey); static bool IsAssetKey(const std::string& key); static bool IsAssetComplete(const ObjectRecord& result, const std::string& assetPrefix); @@ -210,7 +210,7 @@ private: void NotifyDataChanged(const std::map& data, const SaveInfo& saveInfo); int32_t PushAssets(const std::string &srcBundleName, const std::string &dstBundleName, const std::string &sessionId, const ObjectRecord &data, const std::string &deviceId); - int32_t WaitAssets(const std::string& objectKey, const SaveInfo& saveInfo, + void WaitAssets(const std::string& objectKey, const SaveInfo& saveInfo, const std::map& data); void PullAssets(const std::map& data, const SaveInfo& saveInfo); std::map GetObjectData(const ObjectRecord& changedData, SaveInfo& saveInfo, diff --git a/services/distributeddataservice/service/object/include/object_service_impl.h b/services/distributeddataservice/service/object/include/object_service_impl.h index a2da46896..5faf4e00c 100644 --- a/services/distributeddataservice/service/object/include/object_service_impl.h +++ b/services/distributeddataservice/service/object/include/object_service_impl.h @@ -59,7 +59,7 @@ private: using StaticActs = DistributedData::StaticActs; class ObjectStatic : public StaticActs { public: - ~ObjectStatic() override {}; + ~ObjectStatic() override{}; int32_t OnAppUninstall(const std::string &bundleName, int32_t user, int32_t index) override; }; class Factory { @@ -71,7 +71,7 @@ private: }; void RegisterObjectServiceInfo(); void RegisterHandler(); - int32_t SaveMetaData(StoreMetaData& saveMeta, const std::string &user); + int32_t SaveMetaData(StoreMetaData &saveMeta, const std::string &user); static Factory factory_; std::shared_ptr executors_; diff --git a/services/distributeddataservice/service/object/include/object_service_stub.h b/services/distributeddataservice/service/object/include/object_service_stub.h index e254a2bd1..d2a56f1b3 100644 --- a/services/distributeddataservice/service/object/include/object_service_stub.h +++ b/services/distributeddataservice/service/object/include/object_service_stub.h @@ -17,8 +17,9 @@ #define DISTRIBUTED_OBJECT_SERVICE_STUB_H #include -#include "iobject_service.h" + #include "feature/feature_system.h" +#include "iobject_service.h" namespace OHOS::DistributedObject { using ObjectCode = ObjectStoreService::ObjectServiceInterfaceCode; @@ -27,7 +28,7 @@ public: int OnRemoteRequest(uint32_t code, MessageParcel &data, MessageParcel &reply) override; private: - bool CheckInterfaceToken(MessageParcel& data); + bool CheckInterfaceToken(MessageParcel &data); int32_t ObjectStoreSaveOnRemote(MessageParcel &data, MessageParcel &reply); int32_t ObjectStoreRevokeSaveOnRemote(MessageParcel &data, MessageParcel &reply); int32_t ObjectStoreRetrieveOnRemote(MessageParcel &data, MessageParcel &reply); @@ -54,5 +55,5 @@ private: &ObjectServiceStub::OnUnsubscribeProgress, }; }; -} // namespace OHOS::DistributedRdb +} // namespace OHOS::DistributedObject #endif diff --git a/services/distributeddataservice/service/object/src/object_asset_loader.cpp b/services/distributeddataservice/service/object/src/object_asset_loader.cpp index 46aa4498b..7ac30d00f 100644 --- a/services/distributeddataservice/service/object/src/object_asset_loader.cpp +++ b/services/distributeddataservice/service/object/src/object_asset_loader.cpp @@ -36,7 +36,7 @@ void ObjectAssetLoader::SetThreadPool(std::shared_ptr executors) executors_ = executors; } -bool ObjectAssetLoader::Transfer(const int32_t userId, const std::string& bundleName, const std::string& deviceId, +bool ObjectAssetLoader::Transfer(int32_t userId, const std::string& bundleName, const std::string& deviceId, const DistributedData::Asset& asset) { AssetInfo assetInfo; @@ -65,8 +65,8 @@ bool ObjectAssetLoader::Transfer(const int32_t userId, const std::string& bundle return true; } -void ObjectAssetLoader::TransferAssetsAsync(const int32_t userId, const std::string& bundleName, - const std::string& deviceId, const std::vector& assets, const TransferFunc& callback) +void ObjectAssetLoader::TransferAssetsAsync(int32_t userId, const std::string &bundleName, const std::string &deviceId, + const std::vector &assets, const TransferFunc &callback) { if (executors_ == nullptr) { ZLOGE("executors is null, bundleName: %{public}s, deviceId: %{public}s, userId: %{public}d", @@ -76,7 +76,7 @@ void ObjectAssetLoader::TransferAssetsAsync(const int32_t userId, const std::str } TransferTask task = { .callback = callback }; DistributedData::Assets downloadAssets; - for (auto& asset : assets) { + for (auto &asset : assets) { if (IsDownloaded(asset)) { continue; } @@ -91,7 +91,7 @@ void ObjectAssetLoader::TransferAssetsAsync(const int32_t userId, const std::str }); executors_->Execute([this, userId, bundleName, deviceId, downloadAssets]() { bool result = true; - for (const auto& asset : downloadAssets) { + for (const auto &asset : downloadAssets) { if (IsDownloaded(asset)) { FinishTask(asset.uri, result); continue; @@ -99,7 +99,7 @@ void ObjectAssetLoader::TransferAssetsAsync(const int32_t userId, const std::str if (IsDownloading(asset)) { continue; } - downloading_.ComputeIfAbsent(asset.uri, [asset](const std::string& key) { + downloading_.ComputeIfAbsent(asset.uri, [asset](const std::string &key) { return asset.hash; }); auto success = Transfer(userId, bundleName, deviceId, asset); diff --git a/services/distributeddataservice/service/object/src/object_callback_proxy.cpp b/services/distributeddataservice/service/object/src/object_callback_proxy.cpp index 2d061652e..954b548c7 100644 --- a/services/distributeddataservice/service/object/src/object_callback_proxy.cpp +++ b/services/distributeddataservice/service/object/src/object_callback_proxy.cpp @@ -54,7 +54,7 @@ void ObjectSaveCallbackProxy::Completed(const std::map &re ZLOGE("Marshalling failed"); return; } - MessageOption mo { MessageOption::TF_SYNC }; + MessageOption mo{ MessageOption::TF_SYNC }; int error = Remote()->SendRequest(COMPLETED, data, reply, mo); if (error != 0) { ZLOGW("SendRequest failed, error %{public}d", error); @@ -73,7 +73,7 @@ void ObjectRevokeSaveCallbackProxy::Completed(int32_t status) ZLOGE("Marshalling failed, status:%{public}d", status); return; } - MessageOption mo { MessageOption::TF_SYNC }; + MessageOption mo{ MessageOption::TF_SYNC }; int error = Remote()->SendRequest(COMPLETED, data, reply, mo); if (error != 0) { ZLOGW("SendRequest failed, error %{public}d", error); @@ -92,7 +92,7 @@ void ObjectChangeCallbackProxy::Completed(const std::mapSendRequest(COMPLETED, data, reply, mo); if (error != 0) { ZLOGW("SendRequest failed, error %{public}d", error); @@ -111,11 +111,11 @@ void ObjectProgressCallbackProxy::Completed(int32_t progress) ZLOGE("Marshal failed"); return; } - MessageOption mo { MessageOption::TF_ASYNC }; + MessageOption mo{ MessageOption::TF_ASYNC }; int error = Remote()->SendRequest(COMPLETED, data, reply, mo); if (error != 0) { ZLOGW("SendRequest failed, error %{public}d", error); } } -} // namespace DistributedObject -} // namespace OHOS \ No newline at end of file +} // namespace DistributedObject +} // namespace OHOS \ No newline at end of file diff --git a/services/distributeddataservice/service/object/src/object_data_listener.cpp b/services/distributeddataservice/service/object/src/object_data_listener.cpp index 75839c874..8f33d688d 100644 --- a/services/distributeddataservice/service/object/src/object_data_listener.cpp +++ b/services/distributeddataservice/service/object/src/object_data_listener.cpp @@ -16,6 +16,7 @@ #define LOG_TAG "ObjectDataListener" #include "object_data_listener.h" + #include "log_print.h" #include "object_manager.h" #include "object_radar_reporter.h" @@ -24,19 +25,15 @@ namespace OHOS { namespace DistributedObject { constexpr int32_t PROGRESS_MAX = 100; constexpr int32_t PROGRESS_INVALID = -1; -ObjectDataListener::ObjectDataListener() -{ -} +ObjectDataListener::ObjectDataListener() {} -ObjectDataListener::~ObjectDataListener() -{ -} +ObjectDataListener::~ObjectDataListener() {} void ObjectDataListener::OnChange(const DistributedDB::KvStoreChangedData &data) { const auto &insertedDatas = data.GetEntriesInserted(); const auto &updatedDatas = data.GetEntriesUpdated(); - std::map> changedData {}; + std::map> changedData{}; for (const auto &entry : insertedDatas) { std::string key(entry.key.begin(), entry.key.end()); changedData.insert_or_assign(std::move(key), entry.value); @@ -95,9 +92,9 @@ int32_t ObjectAssetsRecvListener::OnRecvProgress( DistributedData::Anonymous::Change(srcNetworkId).c_str(), DistributedData::Anonymous::Change(objectKey).c_str(), totalBytes, processBytes); - int32_t progress = static_cast((processBytes * 100.0 / totalBytes) * 0.9); + auto progress = static_cast((processBytes * 100.0 / totalBytes) * 0.9); ObjectStoreManager::GetInstance()->NotifyAssetsRecvProgress(objectKey, progress); return OBJECT_SUCCESS; } -} // namespace DistributedObject -} // namespace OHOS +} // namespace DistributedObject +} // namespace OHOS diff --git a/services/distributeddataservice/service/object/src/object_dms_handler.cpp b/services/distributeddataservice/service/object/src/object_dms_handler.cpp index 7ec88aca2..50788d149 100644 --- a/services/distributeddataservice/service/object/src/object_dms_handler.cpp +++ b/services/distributeddataservice/service/object/src/object_dms_handler.cpp @@ -45,10 +45,10 @@ void ObjectDmsHandler::ReceiveDmsEvent(DistributedSchedule::EventNotify &event) bool ObjectDmsHandler::IsContinue(const std::string &bundleName) { - std::lock_guard lock(mutex_); auto validityTime = std::chrono::steady_clock::now() - std::chrono::seconds(VALIDITY); DistributedHardware::DmDeviceInfo localDeviceInfo; DistributedHardware::DeviceManager::GetInstance().GetLocalDeviceInfo(PKG_NAME, localDeviceInfo); + std::lock_guard lock(mutex_); for (auto it = dmsEvents_.rbegin(); it != dmsEvents_.rend(); ++it) { if (it->second < validityTime) { continue; @@ -74,10 +74,10 @@ bool ObjectDmsHandler::IsContinue(const std::string &bundleName) std::string ObjectDmsHandler::GetDstBundleName(const std::string &srcBundleName, const std::string &dstNetworkId) { - std::lock_guard lock(mutex_); auto validityTime = std::chrono::steady_clock::now() - std::chrono::seconds(VALIDITY); DistributedHardware::DmDeviceInfo localDeviceInfo; DistributedHardware::DeviceManager::GetInstance().GetLocalDeviceInfo(PKG_NAME, localDeviceInfo); + std::lock_guard lock(mutex_); for (auto it = dmsEvents_.rbegin(); it != dmsEvents_.rend(); ++it) { if (it->second < validityTime) { continue; diff --git a/services/distributeddataservice/service/object/src/object_manager.cpp b/services/distributeddataservice/service/object/src/object_manager.cpp index 8bc1e77f1..ed5af71d2 100644 --- a/services/distributeddataservice/service/object/src/object_manager.cpp +++ b/services/distributeddataservice/service/object/src/object_manager.cpp @@ -75,7 +75,7 @@ DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore() return nullptr; } DistributedDB::KvStoreNbDelegate *store = nullptr; - DistributedDB::KvStoreNbDelegate::Option option; + DistributedDB::KvStoreNbDelegate::Option option{}; option.createDirByStoreIdOnly = true; option.syncDualTupleMode = true; option.secOption = { DistributedDB::S1, DistributedDB::ECE }; @@ -161,7 +161,9 @@ int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &se SyncCallBack syncCallback = [proxy, dstBundleName, sessionId, deviceId, this](const std::map &results) { ProcessSyncCallback(results, dstBundleName, sessionId, deviceId); - proxy->Completed(results); + if (proxy != nullptr) { + proxy->Completed(results); + } }; result = SyncOnStore(GetPropertyPrefix(dstBundleName, sessionId, deviceId), { deviceId }, syncCallback); if (result != OBJECT_SUCCESS) { @@ -185,14 +187,14 @@ int32_t ObjectStoreManager::PushAssets(const std::string &srcBundleName, const s assetObj->srcBundleName_ = srcBundleName; assetObj->dstNetworkId_ = deviceId; assetObj->sessionId_ = sessionId; - for (const auto& asset : assets) { + for (const auto &asset : assets) { assetObj->uris_.push_back(asset.uri); } if (objectAssetsSendListener_ == nullptr) { objectAssetsSendListener_ = new ObjectAssetsSendListener(); } int userId = std::atoi(GetCurrentUser().c_str()); - auto status = ObjectAssetLoader::GetInstance()->PushAsset(userId, assetObj, objectAssetsSendListener_); + auto status = ObjectAssetLoader::GetInstance()->PushAsset(userId, assetObj, objectAssetsSendListener_); return status; } @@ -258,8 +260,7 @@ int32_t ObjectStoreManager::Retrieve(const std::string &bundleName, const std::s if (assets.empty() || data.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == data.end()) { allReady = true; } else { - auto objectKey = bundleName + sessionId; - restoreStatus_.ComputeIfPresent(objectKey, [&allReady](const auto &key, auto &value) { + restoreStatus_.ComputeIfPresent(bundleName + sessionId, [&allReady](const auto &key, auto &value) { if (value == RestoreStatus::ALL_READY) { allReady = true; return false; @@ -605,8 +606,8 @@ void ObjectStoreManager::NotifyDataChanged(const std::map& data) +void ObjectStoreManager::WaitAssets( + const std::string &objectKey, const SaveInfo &saveInfo, const std::map &data) { auto taskId = executors_->Schedule(std::chrono::seconds(WAIT_TIME), [this, objectKey, data, saveInfo]() { ZLOGE("wait assets finisehd timeout, try pull assets, objectKey:%{public}s", objectKey.c_str()); @@ -614,11 +615,7 @@ int32_t ObjectStoreManager::WaitAssets(const std::string& objectKey, const SaveI DoNotifyWaitAssetTimeout(objectKey); }); - objectTimer_.ComputeIfAbsent( - objectKey, [taskId](const std::string& key) -> auto { - return taskId; - }); - return OBJECT_SUCCESS; + objectTimer_.ComputeIfAbsent(objectKey, [taskId](const std::string &key) -> auto { return taskId; }); } void ObjectStoreManager::PullAssets(const std::map& data, const SaveInfo& saveInfo) @@ -645,18 +642,17 @@ void ObjectStoreManager::NotifyAssetsRecvProgress(const std::string &objectKey, assetsRecvProgress_.InsertOrAssign(objectKey, progress); std::list> observers; bool flag = false; - processCallbacks_.ForEach( - [&objectKey, &observers, &flag](uint32_t tokenId, const ProgressCallbackInfo &value) { - if (value.observers_.empty()) { - flag = true; - return false; - } - auto it = value.observers_.find(objectKey); - if (it != value.observers_.end()) { - observers.push_back(it->second); - } + processCallbacks_.ForEach([&objectKey, &observers, &flag](uint32_t tokenId, const ProgressCallbackInfo &value) { + if (value.observers_.empty()) { + flag = true; return false; - }); + } + auto it = value.observers_.find(objectKey); + if (it != value.observers_.end()) { + observers.push_back(it->second); + } + return false; + }); if (flag) { std::lock_guard lock(progressMutex_); progressInfo_.insert_or_assign(objectKey, progress); @@ -678,16 +674,14 @@ void ObjectStoreManager::NotifyAssetsReady( const std::string &objectKey, const std::string &bundleName, const std::string &srcNetworkId) { restoreStatus_.ComputeIfAbsent( - objectKey, [](const std::string& key) -> auto { - return RestoreStatus::NONE; - }); - restoreStatus_.Compute(objectKey, [this, &bundleName] (const auto &key, auto &value) { + objectKey, [](const std::string &key) -> auto { return RestoreStatus::NONE; }); + restoreStatus_.Compute(objectKey, [this, &bundleName](const auto &key, auto &value) { if (value == RestoreStatus::DATA_NOTIFIED) { value = RestoreStatus::ALL_READY; ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS); - callbacks_.ForEach([this, key](uint32_t tokenId, const CallbackInfo& value) { - DoNotifyAssetsReady(tokenId, value, key, true); + callbacks_.ForEach([this, key](uint32_t tokenId, const CallbackInfo &value) { + DoNotifyAssetsReady(value, key, true); return false; }); } else if (value == RestoreStatus::DATA_READY) { @@ -775,7 +769,7 @@ Assets ObjectStoreManager::GetAssetsFromDBRecords(const ObjectRecord& result) void ObjectStoreManager::DoNotify( const CallbackInfo &value, const std::map &data, bool allReady) { - for (const auto& observer : value.observers_) { + for (const auto &observer : value.observers_) { auto it = data.find(observer.first); if (it == data.end()) { continue; @@ -795,8 +789,7 @@ void ObjectStoreManager::DoNotify( } } -void ObjectStoreManager::DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value, - const std::string& objectKey, bool allReady) +void ObjectStoreManager::DoNotifyAssetsReady(const CallbackInfo &value, const std::string &objectKey, bool allReady) { for (const auto& observer : value.observers_) { if (objectKey != observer.first) { @@ -948,7 +941,7 @@ void ObjectStoreManager::FlushClosedStore() void ObjectStoreManager::ProcessOldEntry(const std::string &appId) { - std::vector entries; + std::vector entries{}; { std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { @@ -1445,6 +1438,7 @@ void ObjectStoreManager::DeleteSnapshot(const std::string& bundleName, const std ZLOGD("Not find snapshot, don't need delete"); return; } + // todo 这里的snapshot传递给了RDB,object修改share_ptr,rdb读取它,并发存在问题 bindSnapshots_.ForEach([snapshot](auto& key, auto& value) { for (auto pos = value->begin(); pos != value->end();) { if (pos->second == snapshot) { diff --git a/services/distributeddataservice/service/object/src/object_service_impl.cpp b/services/distributeddataservice/service/object/src/object_service_impl.cpp index cb1dfe7e2..2919c67e5 100644 --- a/services/distributeddataservice/service/object/src/object_service_impl.cpp +++ b/services/distributeddataservice/service/object/src/object_service_impl.cpp @@ -397,6 +397,7 @@ ObjectServiceImpl::ObjectServiceImpl() meta.bundleName = eventInfo.bundleName; meta.user = std::to_string(eventInfo.user); meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid; + // todo 这里获取不到联系人的元数据,需要孟瑶修改处理 if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKeyWithoutPath(), meta)) { ZLOGE("meta empty, bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(), meta.GetStoreAlias().c_str()); diff --git a/services/distributeddataservice/service/object/src/object_service_stub.cpp b/services/distributeddataservice/service/object/src/object_service_stub.cpp index a5d521a39..9893d14b9 100644 --- a/services/distributeddataservice/service/object/src/object_service_stub.cpp +++ b/services/distributeddataservice/service/object/src/object_service_stub.cpp @@ -241,7 +241,7 @@ bool ObjectServiceStub::CheckInterfaceToken(MessageParcel& data) int ObjectServiceStub::OnRemoteRequest(uint32_t code, MessageParcel& data, MessageParcel& reply) { - ZLOGD("code:%{public}u, callingPid:%{public}d", code, IPCSkeleton::GetCallingPid()); + ZLOGI("code:%{public}u, callingPid:%{public}d", code, IPCSkeleton::GetCallingPid()); if (!CheckInterfaceToken(data)) { return -1; } diff --git a/services/distributeddataservice/service/object/src/object_snapshot.cpp b/services/distributeddataservice/service/object/src/object_snapshot.cpp index 14775b730..8159e5002 100644 --- a/services/distributeddataservice/service/object/src/object_snapshot.cpp +++ b/services/distributeddataservice/service/object/src/object_snapshot.cpp @@ -34,10 +34,7 @@ int32_t ObjectSnapshot::Upload(Asset& asset) bool ObjectSnapshot::IsBoundAsset(const Asset& asset) { auto it = changedAssets_.find(asset.uri); - if (it != changedAssets_.end()) { - return true; - } - return false; + return it != changedAssets_.end(); } int32_t ObjectSnapshot::Download(Asset& asset) @@ -88,6 +85,7 @@ int32_t ObjectSnapshot::BindAsset(const Asset& asset, const DistributedData::Ass ZLOGD("Asset is bound. asset.uri:%{public}s :", Anonymous::Change(asset.uri).c_str()); return E_OK; } + // todo 加锁保护 changedAssets_[asset.uri] = ChangedAssetInfo(asset, bindInfo, storeInfo); return E_OK; } -- Gitee