diff --git a/services/distributeddataservice/service/object/include/object_callback_proxy.h b/services/distributeddataservice/service/object/include/object_callback_proxy.h index e29908493e5d2e0fe57a3f33710ed8a158e64ef4..9d27f489ed1ea1fa903dae4f12e1907cceffb6ae 100644 --- a/services/distributeddataservice/service/object/include/object_callback_proxy.h +++ b/services/distributeddataservice/service/object/include/object_callback_proxy.h @@ -76,10 +76,23 @@ public: explicit ObjectChangeCallbackProxy(const sptr &impl); ~ObjectChangeCallbackProxy() = default; void Completed(const std::map> &results, bool allReady) override; - private: static inline BrokerDelegator delegator_; }; + +class ObjectProgressCallbackProxyBroker : public IObjectProgressCallback, public IRemoteBroker { +public: + DECLARE_INTERFACE_DESCRIPTOR(u"OHOS.DistributedObject.IObjectProgressCallback"); +}; + +class ObjectProgressCallbackProxy : public IRemoteProxy { +public: + explicit ObjectProgressCallbackProxy(const sptr &impl); + ~ObjectProgressCallbackProxy() = default; + void Completed(int process) override; +private: + static inline BrokerDelegator delegator_; +}; } // namespace DistributedObject } // namespace OHOS #endif // DISTRIBUTEDDATAMGR_OBJECT_CALLBACK_PROXY_H diff --git a/services/distributeddataservice/service/object/include/object_data_listener.h b/services/distributeddataservice/service/object/include/object_data_listener.h index 7c3140c7b0f33f3e8c307a09416d3146b8ebdd45..f46f67400fd862fbfa15e12383d91c87d74775f3 100644 --- a/services/distributeddataservice/service/object/include/object_data_listener.h +++ b/services/distributeddataservice/service/object/include/object_data_listener.h @@ -39,8 +39,8 @@ public: int32_t OnFinished(const std::string &srcNetworkId, const sptr &assetObj, int32_t result) override; int32_t OnRecvProgress(const std::string &srcNetworkId, - const sptr &assetObj, - uint64_t totalBytes, uint64_t processBytes) override; + const sptr &assetObj, + uint64_t totalBytes, uint64_t processBytes) override; }; } // namespace DistributedObject } // namespace OHOS diff --git a/services/distributeddataservice/service/object/include/object_manager.h b/services/distributeddataservice/service/object/include/object_manager.h index 158b5c50a01490a53088d2e1f63dc64cc64e144c..0e1d951e1920a44aeef9612ac9817770055a3644 100644 --- a/services/distributeddataservice/service/object/include/object_manager.h +++ b/services/distributeddataservice/service/object/include/object_manager.h @@ -96,10 +96,16 @@ public: sptr callback); void UnregisterRemoteCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId, const std::string &sessionId = ""); + void RegisterProgressObserverCallback(const std::string &bundleName, const std::string &sessionId, + pid_t pid, uint32_t tokenId, + sptr callback); + void UnregisterProgressObserverCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId, + const std::string &sessionId = ""); void NotifyChange(const ObjectRecord &changedData); void NotifyAssetsReady(const std::string& objectKey, const std::string& bundleName, const std::string& srcNetworkId = ""); void NotifyAssetsStart(const std::string& objectKey, const std::string& srcNetworkId = ""); + void NotifyAssetsRecvProgress(const std::string& objectKey, int32_t progress); void CloseAfterMinute(); int32_t Open(); void SetThreadPool(std::shared_ptr executors); @@ -135,6 +141,17 @@ private: return false; } }; + struct ProgressCallbackInfo { + pid_t pid; + std::map> observers_; + bool operator<(const ProgressCallbackInfo &it_) const + { + if (pid < it_.pid) { + return true; + } + return false; + } + }; struct SaveInfo : DistributedData::Serializable { std::string bundleName; std::string sessionId; @@ -167,6 +184,7 @@ private: void DoNotify(uint32_t tokenId, const CallbackInfo& value, const std::map& data, bool allReady); void DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value, const std::string& objectKey, bool allReady); + void DoNotifyRecvProcess(const ProgressCallbackInfo& value, const std::string& objectKey, int32_t progress); void DoNotifyWaitAssetTimeout(const std::string &objectKey); std::map> GetAssetsFromStore(const ObjectRecord& changedData); static bool IsAssetKey(const std::string& key); @@ -212,12 +230,14 @@ private: std::string userId_; std::atomic isSyncing_ = false; ConcurrentMap callbacks_; + ConcurrentMap process_callbacks; std::shared_ptr executors_; DistributedData::AssetBindInfo ConvertBindInfo(ObjectStore::AssetBindInfo& bindInfo); ConcurrentMap> snapshots_; // key:bundleName_sessionId ConcurrentMap bindSnapshots_; // key:bundleName_storeName ConcurrentMap restoreStatus_; // key:bundleName+sessionId ConcurrentMap objectTimer_; // key:bundleName+sessionId + ConcurrentMap assetsRecvProgress_; // key:bundleName+sessionId }; } // namespace DistributedObject } // namespace OHOS diff --git a/services/distributeddataservice/service/object/include/object_service_impl.h b/services/distributeddataservice/service/object/include/object_service_impl.h index e398edface1861cb6e1ff32a6e57eb41293fba57..4dd5f1ca69e85670efaf8718cfdbf92255eb352b 100644 --- a/services/distributeddataservice/service/object/include/object_service_impl.h +++ b/services/distributeddataservice/service/object/include/object_service_impl.h @@ -38,6 +38,9 @@ public: int32_t RegisterDataObserver(const std::string &bundleName, const std::string &sessionId, sptr callback) override; int32_t UnregisterDataChangeObserver(const std::string &bundleName, const std::string &sessionId) override; + int32_t RegisterProgressObserver(const std::string &bundleName, const std::string &sessionId, + sptr callback) override; + int32_t UnregisterProgressObserver(const std::string &bundleName, const std::string &sessionId) override; int32_t DeleteSnapshot(const std::string &bundleName, const std::string &sessionId) override; int32_t IsBundleNameEqualTokenId( const std::string &bundleName, const std::string &sessionId, const uint32_t &tokenId); diff --git a/services/distributeddataservice/service/object/include/object_service_stub.h b/services/distributeddataservice/service/object/include/object_service_stub.h index fef5c7ecaac6985175b2a45baf113ea1eaf075c4..58ea19f6070434ebe8c4fe333c42f09740415617 100644 --- a/services/distributeddataservice/service/object/include/object_service_stub.h +++ b/services/distributeddataservice/service/object/include/object_service_stub.h @@ -33,6 +33,8 @@ private: int32_t ObjectStoreRetrieveOnRemote(MessageParcel &data, MessageParcel &reply); int32_t OnSubscribeRequest(MessageParcel &data, MessageParcel &reply); int32_t OnUnsubscribeRequest(MessageParcel &data, MessageParcel &reply); + int32_t OnSubscribeProgress(MessageParcel &data, MessageParcel &reply); + int32_t OnUnsubscribeProgress(MessageParcel &data, MessageParcel &reply); int32_t OnAssetChangedOnRemote(MessageParcel &data, MessageParcel &reply); int32_t ObjectStoreBindAssetOnRemote(MessageParcel &data, MessageParcel &reply); int32_t OnDeleteSnapshot(MessageParcel &data, MessageParcel &reply); @@ -48,6 +50,8 @@ private: &ObjectServiceStub::ObjectStoreBindAssetOnRemote, &ObjectServiceStub::OnDeleteSnapshot, &ObjectServiceStub::OnIsContinue, + &ObjectServiceStub::OnSubscribeProgress, + &ObjectServiceStub::OnUnsubscribeProgress, }; }; } // namespace OHOS::DistributedRdb diff --git a/services/distributeddataservice/service/object/src/object_callback_proxy.cpp b/services/distributeddataservice/service/object/src/object_callback_proxy.cpp index 7e4c649a5cab95e7d7b400dfe3601688a38bf209..4f34fd20773372d4a778a34960f23881d66ff991 100644 --- a/services/distributeddataservice/service/object/src/object_callback_proxy.cpp +++ b/services/distributeddataservice/service/object/src/object_callback_proxy.cpp @@ -42,6 +42,11 @@ ObjectChangeCallbackProxy::ObjectChangeCallbackProxy(const sptr & { } +ObjectProgressCallbackProxy::ObjectProgressCallbackProxy(const sptr &impl) + : IRemoteProxy(impl) +{ +} + void ObjectSaveCallbackProxy::Completed(const std::map &results) { MessageParcel data; @@ -117,5 +122,24 @@ void ObjectChangeCallbackProxy::Completed(const std::mapSendRequest(COMPLETED, data, reply, mo); + if (error != 0) { + ZLOGW("SendRequest failed, error %d", error); + } +} } // namespace DistributedObject } // namespace OHOS \ No newline at end of file diff --git a/services/distributeddataservice/service/object/src/object_data_listener.cpp b/services/distributeddataservice/service/object/src/object_data_listener.cpp index 8451fa30db0c17007ad091486b11208bc8377eb9..779404daf95fcdbeca345af4311cc7155f73e81c 100644 --- a/services/distributeddataservice/service/object/src/object_data_listener.cpp +++ b/services/distributeddataservice/service/object/src/object_data_listener.cpp @@ -52,6 +52,7 @@ int32_t ObjectAssetsRecvListener::OnStart(const std::string &srcNetworkId, const auto objectKey = dstBundleName + sessionId; ZLOGI("OnStart, objectKey:%{public}s", objectKey.c_str()); ObjectStoreManager::GetInstance()->NotifyAssetsStart(objectKey, srcNetworkId); + ObjectStoreManager::GetInstance()->NotifyAssetsRecvProgress(objectKey, 0); return OBJECT_SUCCESS; } @@ -69,24 +70,28 @@ int32_t ObjectAssetsRecvListener::OnFinished(const std::string &srcNetworkId, co ZLOGI("OnFinished, status:%{public}d objectKey:%{public}s, asset size:%{public}zu", result, objectKey.c_str(), assetObj->uris_.size()); ObjectStoreManager::GetInstance()->NotifyAssetsReady(objectKey, assetObj->dstBundleName_, srcNetworkId); + ObjectStoreManager::GetInstance()->NotifyAssetsRecvProgress(objectKey, 100); return OBJECT_SUCCESS; } - int32_t ObjectAssetsRecvListener::OnRecvProgress(const std::string &srcNetworkId, const sptr &assetObj, uint64_t totalBytes, uint64_t processBytes) { - if (assetObj == nullptr) { - ZLOGE("OnRecvProgress error! srcNetworkId:%{public}s", - DistributedData::Anonymous::Change(srcNetworkId).c_str()); + if (assetObj == nullptr || totalBytes == 0) { + ZLOGE("OnRecvProgress error! srcNetworkId:%{public}s, totalBytes: %{public}llu", + DistributedData::Anonymous::Change(srcNetworkId).c_str(), totalBytes); return OBJECT_INNER_ERROR; } auto objectKey = assetObj->dstBundleName_ + assetObj->sessionId_; - ZLOGI("OnRecvProgress, srcNetworkId: %{public}s, objectKey:%{public}s, totalBytes: %{public}llu," - "processBytes: %{public}llu.", - DistributedData::Anonymous::Change(srcNetworkId).c_str(), objectKey.c_str(), totalBytes, processBytes); + ZLOGI("srcNetworkId: %{public}s, objectKey:%{public}s, totalBytes: %{public}llu," + "processBytes: %{public}llu.", DistributedData::Anonymous::Change(srcNetworkId).c_str(), + DistributedData::Anonymous::Change(objectKey).c_str(), totalBytes, processBytes); + + int32_t progress = static_cast((processBytes * 100.0 / totalBytes) * 0.9); + ObjectStoreManager::GetInstance()->NotifyAssetsRecvProgress(objectKey, progress); return OBJECT_SUCCESS; } + } // namespace DistributedObject } // namespace OHOS diff --git a/services/distributeddataservice/service/object/src/object_manager.cpp b/services/distributeddataservice/service/object/src/object_manager.cpp index 859de7085d019a26f19bc35140329b36d32a3760..d9778f3368c00f1103f1613a0f038a043cef9920 100644 --- a/services/distributeddataservice/service/object/src/object_manager.cpp +++ b/services/distributeddataservice/service/object/src/object_manager.cpp @@ -412,6 +412,52 @@ void ObjectStoreManager::UnregisterRemoteCallback(const std::string &bundleName, })); } +void ObjectStoreManager::RegisterProgressObserverCallback(const std::string &bundleName, const std::string &sessionId, + pid_t pid, uint32_t tokenId, + sptr callback) +{ + if (bundleName.empty() || sessionId.empty()) { + ZLOGD("ObjectStoreManager::RegisterProgressObserverCallback empty"); + return; + } + ZLOGD("ObjectStoreManager::RegisterProgressObserverCallback start"); + auto proxy = iface_cast(callback); + std::string prefix = bundleName + sessionId; + process_callbacks.Compute(tokenId, ([pid, &proxy, &prefix](const uint32_t key, ProgressCallbackInfo &value) { + if (value.pid != pid) { + value = ProgressCallbackInfo { pid }; + } + value.observers_.insert_or_assign(prefix, proxy); + return !value.observers_.empty(); + })); +} + +void ObjectStoreManager::UnregisterProgressObserverCallback(const std::string &bundleName, pid_t pid, uint32_t tokenId, + const std::string &sessionId) +{ + if (bundleName.empty()) { + ZLOGD("bundleName is empty"); + return; + } + process_callbacks.Compute(tokenId, ([pid, &sessionId, &bundleName](const uint32_t key, ProgressCallbackInfo &value) { + if (value.pid != pid) { + return true; + } + if (sessionId.empty()) { + return false; + } + std::string prefix = bundleName + sessionId; + for (auto it = value.observers_.begin(); it != value.observers_.end();) { + if ((*it).first == prefix) { + it = value.observers_.erase(it); + } else { + ++it; + } + } + return true; + })); +} + void ObjectStoreManager::NotifyChange(const ObjectRecord &changedData) { ZLOGI("OnChange start, size:%{public}zu", changedData.size()); @@ -570,6 +616,18 @@ void ObjectStoreManager::PullAssets(const std::map& d } } +void ObjectStoreManager::NotifyAssetsRecvProgress(const std::string& objectKey, int32_t progress) +{ + assetsRecvProgress_.Compute(objectKey, [progress](const std::string& key, auto& value) { + value = progress; + process_callbacks.ForEach([this, progress](uint32_t tokenId, const ProgressCallbackInfo& value) { + DoNotifyRecvProcess(value, objectKey, progress); + return false; + }); + return true; + }); +} + void ObjectStoreManager::NotifyAssetsReady(const std::string& objectKey, const std::string& bundleName, const std::string& srcNetworkId) { @@ -712,6 +770,20 @@ void ObjectStoreManager::DoNotifyAssetsReady(uint32_t tokenId, const CallbackInf } } +void ObjectStoreManager::DoNotifyRecvProcess(const ProgressCallbackInfo& value, + const std::string& objectKey, int32_t progress) +{ + for (const auto& observer : value.observers_) { + if (objectKey != observer.first) { + continue; + } + observer.second->Completed(process); + if (process == 100) { + assetsRecvProgress_.Erase(objectKey); + } + } +} + void ObjectStoreManager::DoNotifyWaitAssetTimeout(const std::string &objectKey) { ObjectStore::RadarReporter::ReportStageError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, diff --git a/services/distributeddataservice/service/object/src/object_service_impl.cpp b/services/distributeddataservice/service/object/src/object_service_impl.cpp index 71f6195e7a2c4182f769693e996f633a94bc5875..ba6a6912b1dca7c910b4fc2d5048bb087ecaeb0e 100644 --- a/services/distributeddataservice/service/object/src/object_service_impl.cpp +++ b/services/distributeddataservice/service/object/src/object_service_impl.cpp @@ -260,6 +260,33 @@ int32_t ObjectServiceImpl::UnregisterDataChangeObserver(const std::string &bundl return OBJECT_SUCCESS; } +int32_t ObjectServiceImpl::RegisterProgressObserver(const std::string &bundleName, const std::string &sessionId, + sptr callback) +{ + ZLOGD("begin."); + uint32_t tokenId = IPCSkeleton::GetCallingTokenID(); + int32_t status = IsBundleNameEqualTokenId(bundleName, sessionId, tokenId); + if (status != OBJECT_SUCCESS) { + return status; + } + auto pid = IPCSkeleton::GetCallingPid(); + ObjectStoreManager::GetInstance()->RegisterProgressObserverCallback(bundleName, sessionId, pid, tokenId, callback); + return OBJECT_SUCCESS; +} + +int32_t ObjectServiceImpl::UnregisterProgressObserver(const std::string &bundleName, const std::string &sessionId) +{ + ZLOGD("begin."); + uint32_t tokenId = IPCSkeleton::GetCallingTokenID(); + int32_t status = IsBundleNameEqualTokenId(bundleName, sessionId, tokenId); + if (status != OBJECT_SUCCESS) { + return status; + } + auto pid = IPCSkeleton::GetCallingPid(); + ObjectStoreManager::GetInstance()->UnregisterProgressObserverCallback(bundleName, pid, tokenId, sessionId); + return OBJECT_SUCCESS; +} + int32_t ObjectServiceImpl::DeleteSnapshot(const std::string &bundleName, const std::string &sessionId) { uint32_t tokenId = IPCSkeleton::GetCallingTokenID(); @@ -342,6 +369,7 @@ int32_t ObjectServiceImpl::OnAppExit(pid_t uid, pid_t pid, uint32_t tokenId, con ZLOGI("ObjectServiceImpl::OnAppExit uid=%{public}d, pid=%{public}d, tokenId=%{public}d, bundleName=%{public}s", uid, pid, tokenId, appId.c_str()); ObjectStoreManager::GetInstance()->UnregisterRemoteCallback(appId, pid, tokenId); + ObjectStoreManager::GetInstance()->UnregisterProgressObserverCallback(appId, pid, tokenId); return FeatureSystem::STUB_SUCCESS; } diff --git a/services/distributeddataservice/service/object/src/object_service_stub.cpp b/services/distributeddataservice/service/object/src/object_service_stub.cpp index 20255eab45911b9c02250c8a6bc9421e70040497..1b94f0428b26bf17f7df24f4a08edfdefd090ceb 100644 --- a/services/distributeddataservice/service/object/src/object_service_stub.cpp +++ b/services/distributeddataservice/service/object/src/object_service_stub.cpp @@ -164,6 +164,41 @@ int32_t ObjectServiceStub::OnUnsubscribeRequest(MessageParcel &data, MessageParc return 0; } +int32_t ObjectServiceStub::OnSubscribeProgress(MessageParcel &data, MessageParcel &reply) +{ + std::string sessionId; + std::string bundleName; + sptr obj; + if (!ITypesUtil::Unmarshal(data, bundleName, sessionId, obj) || obj == nullptr) { + ZLOGE("Unmarshal sessionId:%{public}s bundleName:%{public}s, callback is nullptr: %{public}d", + DistributedData::Anonymous::Change(sessionId).c_str(), bundleName.c_str(), obj == nullptr); + return IPC_STUB_INVALID_DATA_ERR; + } + int32_t status = RegisterProgressObserver(bundleName, sessionId, obj); + if (!ITypesUtil::Marshal(reply, status)) { + ZLOGE("Marshal status:0x%{public}x", status); + return IPC_STUB_WRITE_PARCEL_ERR; + } + return 0; +} + +int32_t ObjectServiceStub::OnUnsubscribeProgress(MessageParcel &data, MessageParcel &reply) +{ + std::string sessionId; + std::string bundleName; + if (!ITypesUtil::Unmarshal(data, bundleName, sessionId)) { + ZLOGE("Unmarshal sessionId:%{public}s bundleName:%{public}s", + DistributedData::Anonymous::Change(sessionId).c_str(), bundleName.c_str()); + return IPC_STUB_INVALID_DATA_ERR; + } + int32_t status = UnregisterProgressObserver(bundleName, sessionId); + if (!ITypesUtil::Marshal(reply, status)) { + ZLOGE("Marshal status:0x%{public}x", status); + return IPC_STUB_WRITE_PARCEL_ERR; + } + return 0; +} + int32_t ObjectServiceStub::OnDeleteSnapshot(MessageParcel &data, MessageParcel &reply) { std::string sessionId; diff --git a/services/distributeddataservice/service/test/object_service_stub_test.cpp b/services/distributeddataservice/service/test/object_service_stub_test.cpp index 1cbb5e1a50af60a3f88510fde236c00b25c5d19a..b00236072eea54f739aed09a41ea87a20d03c0de 100644 --- a/services/distributeddataservice/service/test/object_service_stub_test.cpp +++ b/services/distributeddataservice/service/test/object_service_stub_test.cpp @@ -93,6 +93,12 @@ HWTEST_F(ObjectServiceStubTest, ObjectServiceTest002, TestSize.Level1) result = objectServiceStub->OnUnsubscribeRequest(request, reply); EXPECT_EQ(result, IPC_STUB_INVALID_DATA_ERR); + result = objectServiceStub->OnSubscribeProgress(request, reply); + EXPECT_EQ(result, IPC_STUB_INVALID_DATA_ERR); + + result = objectServiceStub->OnUnsubscribeProgress(request, reply); + EXPECT_EQ(result, IPC_STUB_INVALID_DATA_ERR); + result = objectServiceStub->OnAssetChangedOnRemote(request, reply); EXPECT_EQ(result, IPC_STUB_INVALID_DATA_ERR);