From 3eeaff02c83a7521e5f4749e73ea020104cff7d7 Mon Sep 17 00:00:00 2001 From: luqing Date: Fri, 16 May 2025 15:50:12 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=8E=A5=E7=BB=AD=E6=95=B4=E6=94=B9=20Sign?= =?UTF-8?q?ed-off-by:=20luqing=20?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../object/include/object_callback_proxy.h | 10 ++- .../object/include/object_data_listener.h | 4 +- .../service/object/include/object_manager.h | 19 ++++++ .../object/include/object_service_impl.h | 3 + .../object/include/object_service_stub.h | 4 ++ .../object/src/object_callback_proxy.cpp | 19 ++++++ .../object/src/object_data_listener.cpp | 21 ++++++ .../service/object/src/object_manager.cpp | 67 +++++++++++++++++++ .../object/src/object_service_impl.cpp | 27 ++++++++ .../object/src/object_service_stub.cpp | 35 ++++++++++ 10 files changed, 206 insertions(+), 3 deletions(-) diff --git a/services/distributeddataservice/service/object/include/object_callback_proxy.h b/services/distributeddataservice/service/object/include/object_callback_proxy.h index e29908493..7e6d8cf18 100644 --- a/services/distributeddataservice/service/object/include/object_callback_proxy.h +++ b/services/distributeddataservice/service/object/include/object_callback_proxy.h @@ -76,10 +76,18 @@ public: explicit ObjectChangeCallbackProxy(const sptr &impl); ~ObjectChangeCallbackProxy() = default; void Completed(const std::map> &results, bool allReady) override; - private: static inline BrokerDelegator delegator_; }; + +class ObjectProgressCallbackProxy : public IRemoteProxy { +public: + explicit ObjectProgressCallbackProxy(const sptr &impl); + ~ObjectProgressCallbackProxy() = default; + void Completed(int process) override; +private: + static inline BrokerDelegator delegator_; +}; } // namespace DistributedObject } // namespace OHOS #endif // DISTRIBUTEDDATAMGR_OBJECT_CALLBACK_PROXY_H diff --git a/services/distributeddataservice/service/object/include/object_data_listener.h b/services/distributeddataservice/service/object/include/object_data_listener.h index 7c3140c7b..f46f67400 100644 --- a/services/distributeddataservice/service/object/include/object_data_listener.h +++ b/services/distributeddataservice/service/object/include/object_data_listener.h @@ -39,8 +39,8 @@ public: int32_t OnFinished(const std::string &srcNetworkId, const sptr &assetObj, int32_t result) override; int32_t OnRecvProgress(const std::string &srcNetworkId, - const sptr &assetObj, - uint64_t totalBytes, uint64_t processBytes) override; + const sptr &assetObj, + uint64_t totalBytes, uint64_t processBytes) override; }; } // namespace DistributedObject } // namespace OHOS diff --git a/services/distributeddataservice/service/object/include/object_manager.h b/services/distributeddataservice/service/object/include/object_manager.h index 158b5c50a..efcc60f82 100644 --- a/services/distributeddataservice/service/object/include/object_manager.h +++ b/services/distributeddataservice/service/object/include/object_manager.h @@ -96,6 +96,11 @@ public: sptr callback); void UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId, const std::string &sessionId = ""); + void RegisterProgressObserver(const std::string &bundleName, const std::string &sessionId, + pid_t pid, uint32_t tokenId, + sptr callback); + void UnregisterProgressObserver(const std::string &bundleName, pid_t pid, uint32_t tokenId, + const std::string &sessionId = ""); void NotifyChange(const ObjectRecord &changedData); void NotifyAssetsReady(const std::string& objectKey, const std::string& bundleName, const std::string& srcNetworkId = ""); @@ -135,6 +140,17 @@ private: return false; } }; + struct ProgressCallbackInfo { + pid_t pid; + std::map> observers_; + bool operator<(const ProgressCallbackInfo &it_) const + { + if (pid < it_.pid) { + return true; + } + return false; + } + }; struct SaveInfo : DistributedData::Serializable { std::string bundleName; std::string sessionId; @@ -167,6 +183,7 @@ private: void DoNotify(uint32_t tokenId, const CallbackInfo& value, const std::map& data, bool allReady); void DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value, const std::string& objectKey, bool allReady); + void DoNotifyRecvProcess(uint32_t tokenId, const CallbackInfo& value, const std::string& objectKey, int process); void DoNotifyWaitAssetTimeout(const std::string &objectKey); std::map> GetAssetsFromStore(const ObjectRecord& changedData); static bool IsAssetKey(const std::string& key); @@ -212,12 +229,14 @@ private: std::string userId_; std::atomic isSyncing_ = false; ConcurrentMap callbacks_; + ConcurrentMap process_callbacks; std::shared_ptr executors_; DistributedData::AssetBindInfo ConvertBindInfo(ObjectStore::AssetBindInfo& bindInfo); ConcurrentMap> snapshots_; // key:bundleName_sessionId ConcurrentMap bindSnapshots_; // key:bundleName_storeName ConcurrentMap restoreStatus_; // key:bundleName+sessionId ConcurrentMap objectTimer_; // key:bundleName+sessionId + ConcurrentMap assetsRecvProgress_; // key:bundleName+sessionId }; } // namespace DistributedObject } // namespace OHOS diff --git a/services/distributeddataservice/service/object/include/object_service_impl.h b/services/distributeddataservice/service/object/include/object_service_impl.h index e398edfac..4dd5f1ca6 100644 --- a/services/distributeddataservice/service/object/include/object_service_impl.h +++ b/services/distributeddataservice/service/object/include/object_service_impl.h @@ -38,6 +38,9 @@ public: int32_t RegisterDataObserver(const std::string &bundleName, const std::string &sessionId, sptr callback) override; int32_t UnregisterDataChangeObserver(const std::string &bundleName, const std::string &sessionId) override; + int32_t RegisterProgressObserver(const std::string &bundleName, const std::string &sessionId, + sptr callback) override; + int32_t UnregisterProgressObserver(const std::string &bundleName, const std::string &sessionId) override; int32_t DeleteSnapshot(const std::string &bundleName, const std::string &sessionId) override; int32_t IsBundleNameEqualTokenId( const std::string &bundleName, const std::string &sessionId, const uint32_t &tokenId); diff --git a/services/distributeddataservice/service/object/include/object_service_stub.h b/services/distributeddataservice/service/object/include/object_service_stub.h index fef5c7eca..58ea19f60 100644 --- a/services/distributeddataservice/service/object/include/object_service_stub.h +++ b/services/distributeddataservice/service/object/include/object_service_stub.h @@ -33,6 +33,8 @@ private: int32_t ObjectStoreRetrieveOnRemote(MessageParcel &data, MessageParcel &reply); int32_t OnSubscribeRequest(MessageParcel &data, MessageParcel &reply); int32_t OnUnsubscribeRequest(MessageParcel &data, MessageParcel &reply); + int32_t OnSubscribeProgress(MessageParcel &data, MessageParcel &reply); + int32_t OnUnsubscribeProgress(MessageParcel &data, MessageParcel &reply); int32_t OnAssetChangedOnRemote(MessageParcel &data, MessageParcel &reply); int32_t ObjectStoreBindAssetOnRemote(MessageParcel &data, MessageParcel &reply); int32_t OnDeleteSnapshot(MessageParcel &data, MessageParcel &reply); @@ -48,6 +50,8 @@ private: &ObjectServiceStub::ObjectStoreBindAssetOnRemote, &ObjectServiceStub::OnDeleteSnapshot, &ObjectServiceStub::OnIsContinue, + &ObjectServiceStub::OnSubscribeProgress, + &ObjectServiceStub::OnUnsubscribeProgress, }; }; } // namespace OHOS::DistributedRdb diff --git a/services/distributeddataservice/service/object/src/object_callback_proxy.cpp b/services/distributeddataservice/service/object/src/object_callback_proxy.cpp index 7e4c649a5..b9908f922 100644 --- a/services/distributeddataservice/service/object/src/object_callback_proxy.cpp +++ b/services/distributeddataservice/service/object/src/object_callback_proxy.cpp @@ -117,5 +117,24 @@ void ObjectChangeCallbackProxy::Completed(const std::mapSendRequest(COMPLETED, data, reply, mo); + if (error != 0) { + ZLOGW("SendRequest failed, error %d", error); + } +} } // namespace DistributedObject } // namespace OHOS \ No newline at end of file diff --git a/services/distributeddataservice/service/object/src/object_data_listener.cpp b/services/distributeddataservice/service/object/src/object_data_listener.cpp index 8451fa30d..8a0f18bac 100644 --- a/services/distributeddataservice/service/object/src/object_data_listener.cpp +++ b/services/distributeddataservice/service/object/src/object_data_listener.cpp @@ -52,6 +52,7 @@ int32_t ObjectAssetsRecvListener::OnStart(const std::string &srcNetworkId, const auto objectKey = dstBundleName + sessionId; ZLOGI("OnStart, objectKey:%{public}s", objectKey.c_str()); ObjectStoreManager::GetInstance()->NotifyAssetsStart(objectKey, srcNetworkId); + ObjectStoreManager::GetInstance()->NotifyAssetsRecvProgress(objectKey, 0); return OBJECT_SUCCESS; } @@ -69,6 +70,26 @@ int32_t ObjectAssetsRecvListener::OnFinished(const std::string &srcNetworkId, co ZLOGI("OnFinished, status:%{public}d objectKey:%{public}s, asset size:%{public}zu", result, objectKey.c_str(), assetObj->uris_.size()); ObjectStoreManager::GetInstance()->NotifyAssetsReady(objectKey, assetObj->dstBundleName_, srcNetworkId); + ObjectStoreManager::GetInstance()->NotifyAssetsRecvProgress(objectKey, 100); + return OBJECT_SUCCESS; +} + +int32_t ObjectAssetsRecvListener::OnRecvProgress(const std::string &srcNetworkId, const sptr &assetObj, + uint64_t totalBytes, uint64_t processBytes) +{ + if (assetObj == nullptr || totalBytes == 0) { + ZLOGE("OnRecvProgress error! srcNetworkId:%{public}s, totalBytes: %{public}llu", + DistributedData::Anonymous::Change(srcNetworkId).c_str(), totalBytes); + return OBJECT_INNER_ERROR; + } + + auto objectKey = assetObj->dstBundleName_ + assetObj->sessionId_; + ZLOGI("srcNetworkId: %{public}s, objectKey:%{public}s, totalBytes: %{public}llu," + "processBytes: %{public}llu.", + DistributedData::Anonymous::Change(srcNetworkId).c_str(), objectKey.c_str(), totalBytes, processBytes); + + int32_t progress = static_cast((processedBytes * 100.0 / totalBytes) * 0.9); + ObjectStoreManager::GetInstance()->NotifyAssetsRecvProgress(objectKey, progress); return OBJECT_SUCCESS; } diff --git a/services/distributeddataservice/service/object/src/object_manager.cpp b/services/distributeddataservice/service/object/src/object_manager.cpp index 859de7085..344e44630 100644 --- a/services/distributeddataservice/service/object/src/object_manager.cpp +++ b/services/distributeddataservice/service/object/src/object_manager.cpp @@ -412,6 +412,52 @@ void ObjectStoreManager::UnregisterRemoteCallback(const std::string &bundleName, })); } +void ObjectStoreManager::RegisterProgressObserver(const std::string &bundleName, const std::string &sessionId, + pid_t pid, uint32_t tokenId, + sptr callback) +{ + if (bundleName.empty() || sessionId.empty()) { + ZLOGD("ObjectStoreManager::RegisterProgressObserver empty"); + return; + } + ZLOGD("ObjectStoreManager::RegisterProgressObserver start"); + auto proxy = iface_cast(callback); + std::string prefix = bundleName + sessionId; + process_callbacks.Compute(tokenId, ([pid, &proxy, &prefix](const uint32_t key, ProgressCallbackInfo &value) { + if (value.pid != pid) { + value = ProgressCallbackInfo { pid }; + } + value.observers_.insert_or_assign(prefix, proxy); + return !value.observers_.empty(); + })); +} + +void ObjectStoreManager::UnregisterProgressObserver(const std::string &bundleName, pid_t pid, uint32_t tokenId, + const std::string &sessionId) +{ + if (bundleName.empty()) { + ZLOGD("bundleName is empty"); + return; + } + process_callbacks.Compute(tokenId, ([pid, &sessionId, &bundleName](const uint32_t key, ProgressCallbackInfo &value) { + if (value.pid != pid) { + return true; + } + if (sessionId.empty()) { + return false; + } + std::string prefix = bundleName + sessionId; + for (auto it = value.observers_.begin(); it != value.observers_.end();) { + if ((*it).first == prefix) { + it = value.observers_.erase(it); + } else { + ++it; + } + } + return true; + })); +} + void ObjectStoreManager::NotifyChange(const ObjectRecord &changedData) { ZLOGI("OnChange start, size:%{public}zu", changedData.size()); @@ -570,6 +616,16 @@ void ObjectStoreManager::PullAssets(const std::map& d } } +void ObjectStoreManager::NotifyAssetsRecvProgress(const std::string& objectKey, int32_t progress) +{ + assetsRecvProgress_[objectKey] = progress; + + callbacks_process.ForEach([this, &objectKey, progress](uint32_t tokenId, const CallbackInfo& value) { + DoNotifyRecvProcess(tokenId, value, key, progress); + return false; + }); +} + void ObjectStoreManager::NotifyAssetsReady(const std::string& objectKey, const std::string& bundleName, const std::string& srcNetworkId) { @@ -712,6 +768,17 @@ void ObjectStoreManager::DoNotifyAssetsReady(uint32_t tokenId, const CallbackInf } } +void ObjectStoreManager::DoNotifyRecvProcess(uint32_t tokenId, const CallbackInfo& value, + const std::string& objectKey, int process); +{ + for (const auto& observer : value.observers_) { + if (objectKey != observer.first) { + continue; + } + observer.second->Completed(process); + } +} + void ObjectStoreManager::DoNotifyWaitAssetTimeout(const std::string &objectKey) { ObjectStore::RadarReporter::ReportStageError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, diff --git a/services/distributeddataservice/service/object/src/object_service_impl.cpp b/services/distributeddataservice/service/object/src/object_service_impl.cpp index 71f6195e7..0b822be32 100644 --- a/services/distributeddataservice/service/object/src/object_service_impl.cpp +++ b/services/distributeddataservice/service/object/src/object_service_impl.cpp @@ -260,6 +260,33 @@ int32_t ObjectServiceImpl::UnregisterDataChangeObserver(const std::string &bundl return OBJECT_SUCCESS; } +int32_t ObjectServiceImpl::RegisterProgressObserver(const std::string &bundleName, const std::string &sessionId, + sptr callback) +{ + ZLOGD("begin."); + uint32_t tokenId = IPCSkeleton::GetCallingTokenID(); + int32_t status = IsBundleNameEqualTokenId(bundleName, sessionId, tokenId); + if (status != OBJECT_SUCCESS) { + return status; + } + auto pid = IPCSkeleton::GetCallingPid(); + ObjectStoreManager::GetInstance()->RegisterProgressObserver(bundleName, sessionId, pid, tokenId, callback); + return OBJECT_SUCCESS; +} + +int32_t ObjectServiceImpl::UnregisterProgressObserver(const std::string &bundleName, const std::string &sessionId) +{ + ZLOGD("begin."); + uint32_t tokenId = IPCSkeleton::GetCallingTokenID(); + int32_t status = IsBundleNameEqualTokenId(bundleName, sessionId, tokenId); + if (status != OBJECT_SUCCESS) { + return status; + } + auto pid = IPCSkeleton::GetCallingPid(); + ObjectStoreManager::GetInstance()->UnregisterProgressObserver(bundleName, pid, tokenId, sessionId); + return OBJECT_SUCCESS; +} + int32_t ObjectServiceImpl::DeleteSnapshot(const std::string &bundleName, const std::string &sessionId) { uint32_t tokenId = IPCSkeleton::GetCallingTokenID(); diff --git a/services/distributeddataservice/service/object/src/object_service_stub.cpp b/services/distributeddataservice/service/object/src/object_service_stub.cpp index 20255eab4..1b94f0428 100644 --- a/services/distributeddataservice/service/object/src/object_service_stub.cpp +++ b/services/distributeddataservice/service/object/src/object_service_stub.cpp @@ -164,6 +164,41 @@ int32_t ObjectServiceStub::OnUnsubscribeRequest(MessageParcel &data, MessageParc return 0; } +int32_t ObjectServiceStub::OnSubscribeProgress(MessageParcel &data, MessageParcel &reply) +{ + std::string sessionId; + std::string bundleName; + sptr obj; + if (!ITypesUtil::Unmarshal(data, bundleName, sessionId, obj) || obj == nullptr) { + ZLOGE("Unmarshal sessionId:%{public}s bundleName:%{public}s, callback is nullptr: %{public}d", + DistributedData::Anonymous::Change(sessionId).c_str(), bundleName.c_str(), obj == nullptr); + return IPC_STUB_INVALID_DATA_ERR; + } + int32_t status = RegisterProgressObserver(bundleName, sessionId, obj); + if (!ITypesUtil::Marshal(reply, status)) { + ZLOGE("Marshal status:0x%{public}x", status); + return IPC_STUB_WRITE_PARCEL_ERR; + } + return 0; +} + +int32_t ObjectServiceStub::OnUnsubscribeProgress(MessageParcel &data, MessageParcel &reply) +{ + std::string sessionId; + std::string bundleName; + if (!ITypesUtil::Unmarshal(data, bundleName, sessionId)) { + ZLOGE("Unmarshal sessionId:%{public}s bundleName:%{public}s", + DistributedData::Anonymous::Change(sessionId).c_str(), bundleName.c_str()); + return IPC_STUB_INVALID_DATA_ERR; + } + int32_t status = UnregisterProgressObserver(bundleName, sessionId); + if (!ITypesUtil::Marshal(reply, status)) { + ZLOGE("Marshal status:0x%{public}x", status); + return IPC_STUB_WRITE_PARCEL_ERR; + } + return 0; +} + int32_t ObjectServiceStub::OnDeleteSnapshot(MessageParcel &data, MessageParcel &reply) { std::string sessionId; -- Gitee From d8460a7c3b046e285ef52ec0e98422257fc972d3 Mon Sep 17 00:00:00 2001 From: luqing Date: Fri, 23 May 2025 16:23:34 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E9=97=AE=E9=A2=98=E5=8D=95=E6=95=B4?= =?UTF-8?q?=E6=94=B9=20Signed-off-by:=20luqing=20?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../object/include/object_callback_proxy.h | 5 ++++ .../service/object/include/object_manager.h | 7 +++-- .../object/src/object_callback_proxy.cpp | 7 ++++- .../object/src/object_data_listener.cpp | 22 ++------------ .../service/object/src/object_manager.cpp | 29 +++++++++++-------- .../object/src/object_service_impl.cpp | 5 ++-- .../service/test/object_service_stub_test.cpp | 6 ++++ 7 files changed, 44 insertions(+), 37 deletions(-) diff --git a/services/distributeddataservice/service/object/include/object_callback_proxy.h b/services/distributeddataservice/service/object/include/object_callback_proxy.h index 7e6d8cf18..9d27f489e 100644 --- a/services/distributeddataservice/service/object/include/object_callback_proxy.h +++ b/services/distributeddataservice/service/object/include/object_callback_proxy.h @@ -80,6 +80,11 @@ private: static inline BrokerDelegator delegator_; }; +class ObjectProgressCallbackProxyBroker : public IObjectProgressCallback, public IRemoteBroker { +public: + DECLARE_INTERFACE_DESCRIPTOR(u"OHOS.DistributedObject.IObjectProgressCallback"); +}; + class ObjectProgressCallbackProxy : public IRemoteProxy { public: explicit ObjectProgressCallbackProxy(const sptr &impl); diff --git a/services/distributeddataservice/service/object/include/object_manager.h b/services/distributeddataservice/service/object/include/object_manager.h index efcc60f82..0e1d951e1 100644 --- a/services/distributeddataservice/service/object/include/object_manager.h +++ b/services/distributeddataservice/service/object/include/object_manager.h @@ -96,15 +96,16 @@ public: sptr callback); void UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId, const std::string &sessionId = ""); - void RegisterProgressObserver(const std::string &bundleName, const std::string &sessionId, + void RegisterProgressObserverCallback(const std::string &bundleName, const std::string &sessionId, pid_t pid, uint32_t tokenId, sptr callback); - void UnregisterProgressObserver(const std::string &bundleName, pid_t pid, uint32_t tokenId, + void UnregisterProgressObserverCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId, const std::string &sessionId = ""); void NotifyChange(const ObjectRecord &changedData); void NotifyAssetsReady(const std::string& objectKey, const std::string& bundleName, const std::string& srcNetworkId = ""); void NotifyAssetsStart(const std::string& objectKey, const std::string& srcNetworkId = ""); + void NotifyAssetsRecvProgress(const std::string& objectKey, int32_t progress); void CloseAfterMinute(); int32_t Open(); void SetThreadPool(std::shared_ptr executors); @@ -183,7 +184,7 @@ private: void DoNotify(uint32_t tokenId, const CallbackInfo& value, const std::map& data, bool allReady); void DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value, const std::string& objectKey, bool allReady); - void DoNotifyRecvProcess(uint32_t tokenId, const CallbackInfo& value, const std::string& objectKey, int process); + void DoNotifyRecvProcess(const ProgressCallbackInfo& value, const std::string& objectKey, int32_t progress); void DoNotifyWaitAssetTimeout(const std::string &objectKey); std::map> GetAssetsFromStore(const ObjectRecord& changedData); static bool IsAssetKey(const std::string& key); diff --git a/services/distributeddataservice/service/object/src/object_callback_proxy.cpp b/services/distributeddataservice/service/object/src/object_callback_proxy.cpp index b9908f922..4f34fd207 100644 --- a/services/distributeddataservice/service/object/src/object_callback_proxy.cpp +++ b/services/distributeddataservice/service/object/src/object_callback_proxy.cpp @@ -42,6 +42,11 @@ ObjectChangeCallbackProxy::ObjectChangeCallbackProxy(const sptr & { } +ObjectProgressCallbackProxy::ObjectProgressCallbackProxy(const sptr &impl) + : IRemoteProxy(impl) +{ +} + void ObjectSaveCallbackProxy::Completed(const std::map &results) { MessageParcel data; @@ -118,7 +123,7 @@ void ObjectChangeCallbackProxy::Completed(const std::mapdstBundleName_ + assetObj->sessionId_; ZLOGI("srcNetworkId: %{public}s, objectKey:%{public}s, totalBytes: %{public}llu," - "processBytes: %{public}llu.", - DistributedData::Anonymous::Change(srcNetworkId).c_str(), objectKey.c_str(), totalBytes, processBytes); + "processBytes: %{public}llu.", DistributedData::Anonymous::Change(srcNetworkId).c_str(), + DistributedData::Anonymous::Change(objectKey).c_str(), totalBytes, processBytes); - int32_t progress = static_cast((processedBytes * 100.0 / totalBytes) * 0.9); + int32_t progress = static_cast((processBytes * 100.0 / totalBytes) * 0.9); ObjectStoreManager::GetInstance()->NotifyAssetsRecvProgress(objectKey, progress); return OBJECT_SUCCESS; } - -int32_t ObjectAssetsRecvListener::OnRecvProgress(const std::string &srcNetworkId, const sptr &assetObj, - uint64_t totalBytes, uint64_t processBytes) -{ - if (assetObj == nullptr) { - ZLOGE("OnRecvProgress error! srcNetworkId:%{public}s", - DistributedData::Anonymous::Change(srcNetworkId).c_str()); - return OBJECT_INNER_ERROR; - } - - auto objectKey = assetObj->dstBundleName_ + assetObj->sessionId_; - ZLOGI("OnRecvProgress, srcNetworkId: %{public}s, objectKey:%{public}s, totalBytes: %{public}llu," - "processBytes: %{public}llu.", - DistributedData::Anonymous::Change(srcNetworkId).c_str(), objectKey.c_str(), totalBytes, processBytes); - return OBJECT_SUCCESS; -} } // namespace DistributedObject } // namespace OHOS diff --git a/services/distributeddataservice/service/object/src/object_manager.cpp b/services/distributeddataservice/service/object/src/object_manager.cpp index 344e44630..d9778f336 100644 --- a/services/distributeddataservice/service/object/src/object_manager.cpp +++ b/services/distributeddataservice/service/object/src/object_manager.cpp @@ -412,16 +412,16 @@ void ObjectStoreManager::UnregisterRemoteCallback(const std::string &bundleName, })); } -void ObjectStoreManager::RegisterProgressObserver(const std::string &bundleName, const std::string &sessionId, +void ObjectStoreManager::RegisterProgressObserverCallback(const std::string &bundleName, const std::string &sessionId, pid_t pid, uint32_t tokenId, sptr callback) { if (bundleName.empty() || sessionId.empty()) { - ZLOGD("ObjectStoreManager::RegisterProgressObserver empty"); + ZLOGD("ObjectStoreManager::RegisterProgressObserverCallback empty"); return; } - ZLOGD("ObjectStoreManager::RegisterProgressObserver start"); - auto proxy = iface_cast(callback); + ZLOGD("ObjectStoreManager::RegisterProgressObserverCallback start"); + auto proxy = iface_cast(callback); std::string prefix = bundleName + sessionId; process_callbacks.Compute(tokenId, ([pid, &proxy, &prefix](const uint32_t key, ProgressCallbackInfo &value) { if (value.pid != pid) { @@ -432,7 +432,7 @@ void ObjectStoreManager::RegisterProgressObserver(const std::string &bundleName, })); } -void ObjectStoreManager::UnregisterProgressObserver(const std::string &bundleName, pid_t pid, uint32_t tokenId, +void ObjectStoreManager::UnregisterProgressObserverCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId, const std::string &sessionId) { if (bundleName.empty()) { @@ -618,11 +618,13 @@ void ObjectStoreManager::PullAssets(const std::map& d void ObjectStoreManager::NotifyAssetsRecvProgress(const std::string& objectKey, int32_t progress) { - assetsRecvProgress_[objectKey] = progress; - - callbacks_process.ForEach([this, &objectKey, progress](uint32_t tokenId, const CallbackInfo& value) { - DoNotifyRecvProcess(tokenId, value, key, progress); - return false; + assetsRecvProgress_.Compute(objectKey, [progress](const std::string& key, auto& value) { + value = progress; + process_callbacks.ForEach([this, progress](uint32_t tokenId, const ProgressCallbackInfo& value) { + DoNotifyRecvProcess(value, objectKey, progress); + return false; + }); + return true; }); } @@ -768,14 +770,17 @@ void ObjectStoreManager::DoNotifyAssetsReady(uint32_t tokenId, const CallbackInf } } -void ObjectStoreManager::DoNotifyRecvProcess(uint32_t tokenId, const CallbackInfo& value, - const std::string& objectKey, int process); +void ObjectStoreManager::DoNotifyRecvProcess(const ProgressCallbackInfo& value, + const std::string& objectKey, int32_t progress) { for (const auto& observer : value.observers_) { if (objectKey != observer.first) { continue; } observer.second->Completed(process); + if (process == 100) { + assetsRecvProgress_.Erase(objectKey); + } } } diff --git a/services/distributeddataservice/service/object/src/object_service_impl.cpp b/services/distributeddataservice/service/object/src/object_service_impl.cpp index 0b822be32..ba6a6912b 100644 --- a/services/distributeddataservice/service/object/src/object_service_impl.cpp +++ b/services/distributeddataservice/service/object/src/object_service_impl.cpp @@ -270,7 +270,7 @@ int32_t ObjectServiceImpl::RegisterProgressObserver(const std::string &bundleNam return status; } auto pid = IPCSkeleton::GetCallingPid(); - ObjectStoreManager::GetInstance()->RegisterProgressObserver(bundleName, sessionId, pid, tokenId, callback); + ObjectStoreManager::GetInstance()->RegisterProgressObserverCallback(bundleName, sessionId, pid, tokenId, callback); return OBJECT_SUCCESS; } @@ -283,7 +283,7 @@ int32_t ObjectServiceImpl::UnregisterProgressObserver(const std::string &bundleN return status; } auto pid = IPCSkeleton::GetCallingPid(); - ObjectStoreManager::GetInstance()->UnregisterProgressObserver(bundleName, pid, tokenId, sessionId); + ObjectStoreManager::GetInstance()->UnregisterProgressObserverCallback(bundleName, pid, tokenId, sessionId); return OBJECT_SUCCESS; } @@ -369,6 +369,7 @@ int32_t ObjectServiceImpl::OnAppExit(pid_t uid, pid_t pid, uint32_t tokenId, con ZLOGI("ObjectServiceImpl::OnAppExit uid=%{public}d, pid=%{public}d, tokenId=%{public}d, bundleName=%{public}s", uid, pid, tokenId, appId.c_str()); ObjectStoreManager::GetInstance()->UnregisterRemoteCallback(appId, pid, tokenId); + ObjectStoreManager::GetInstance()->UnregisterProgressObserverCallback(appId, pid, tokenId); return FeatureSystem::STUB_SUCCESS; } diff --git a/services/distributeddataservice/service/test/object_service_stub_test.cpp b/services/distributeddataservice/service/test/object_service_stub_test.cpp index 1cbb5e1a5..b00236072 100644 --- a/services/distributeddataservice/service/test/object_service_stub_test.cpp +++ b/services/distributeddataservice/service/test/object_service_stub_test.cpp @@ -93,6 +93,12 @@ HWTEST_F(ObjectServiceStubTest, ObjectServiceTest002, TestSize.Level1) result = objectServiceStub->OnUnsubscribeRequest(request, reply); EXPECT_EQ(result, IPC_STUB_INVALID_DATA_ERR); + result = objectServiceStub->OnSubscribeProgress(request, reply); + EXPECT_EQ(result, IPC_STUB_INVALID_DATA_ERR); + + result = objectServiceStub->OnUnsubscribeProgress(request, reply); + EXPECT_EQ(result, IPC_STUB_INVALID_DATA_ERR); + result = objectServiceStub->OnAssetChangedOnRemote(request, reply); EXPECT_EQ(result, IPC_STUB_INVALID_DATA_ERR); -- Gitee