diff --git a/services/distributeddataservice/service/kvdb/kvdb_general_store.cpp b/services/distributeddataservice/service/kvdb/kvdb_general_store.cpp index 64964d040b1777b93692ecd2e32d1b1f75278d55..3ee9f9ae7013bd0b199aa8a973664135c2ef8b4d 100644 --- a/services/distributeddataservice/service/kvdb/kvdb_general_store.cpp +++ b/services/distributeddataservice/service/kvdb/kvdb_general_store.cpp @@ -101,7 +101,7 @@ static DBSchema GetDBSchema(const Database &database) KVDBGeneralStore::DBPassword KVDBGeneralStore::GetDBPassword(const StoreMetaData &data) { - DBPassword dbPassword; + DBPassword dbPassword{}; if (!data.isEncrypt) { return dbPassword; } @@ -139,7 +139,7 @@ KVDBGeneralStore::DBSecurity KVDBGeneralStore::GetDBSecurity(int32_t secLevel) KVDBGeneralStore::DBOption KVDBGeneralStore::GetDBOption(const StoreMetaData &data, const DBPassword &password) { - DBOption dbOption; + DBOption dbOption{}; dbOption.createIfNecessary = false; dbOption.isMemoryDb = false; dbOption.isEncryptedDb = data.isEncrypt; @@ -243,7 +243,7 @@ int32_t KVDBGeneralStore::Bind( ZLOGW("No cloudDB!"); return GeneralError::E_OK; } - std::map schemas; + std::map schemas{}; std::map> dbClouds{}; auto dbSchema = GetDBSchema(database); { @@ -258,7 +258,7 @@ int32_t KVDBGeneralStore::Bind( schemas.insert({ std::to_string(userId), dbSchema }); } } - DistributedDB::CloudSyncConfig dbConfig; + DistributedDB::CloudSyncConfig dbConfig{}; dbConfig.maxUploadCount = config.maxNumber; dbConfig.maxUploadSize = config.maxSize; dbConfig.maxRetryConflictTimes = config.maxRetryConflictTimes; @@ -372,7 +372,7 @@ KVDBGeneralStore::DBSyncCallback KVDBGeneralStore::GetDBSyncCompleteCB(DetailAsy DBStatus KVDBGeneralStore::CloudSync(const Devices &devices, DistributedDB::SyncMode cloudSyncMode, DetailAsync async, int64_t wait, const std::string &prepareTraceId) { - DistributedDB::CloudSyncOption syncOption; + DistributedDB::CloudSyncOption syncOption{}; syncOption.devices = devices; syncOption.mode = cloudSyncMode; syncOption.waitTime = wait; diff --git a/services/distributeddataservice/service/object/include/object_asset_loader.h b/services/distributeddataservice/service/object/include/object_asset_loader.h index 160539fafb77d9cbdf4aa21ddc7c6c2470b82d14..4d8e7d88eed1befab1dfb39a74df682b001c0c68 100644 --- a/services/distributeddataservice/service/object/include/object_asset_loader.h +++ b/services/distributeddataservice/service/object/include/object_asset_loader.h @@ -43,9 +43,9 @@ class ObjectAssetLoader { public: static ObjectAssetLoader &GetInstance(); void SetThreadPool(std::shared_ptr executors); - bool Transfer(const int32_t userId, const std::string& bundleName, const std::string& deviceId, + bool Transfer(int32_t userId, const std::string& bundleName, const std::string& deviceId, const DistributedData::Asset& asset); - void TransferAssetsAsync(const int32_t userId, const std::string& bundleName, const std::string& deviceId, + void TransferAssetsAsync(int32_t userId, const std::string& bundleName, const std::string& deviceId, const std::vector& assets, const TransferFunc& callback); int32_t PushAsset(int32_t userId, const sptr &assetObj, const sptr &sendCallback); diff --git a/services/distributeddataservice/service/object/include/object_callback_proxy.h b/services/distributeddataservice/service/object/include/object_callback_proxy.h index fabd1494186af829b9f377b09fe6fa4c10e7e358..d602c3e4f389142aeb0760b21494a34694b9191f 100644 --- a/services/distributeddataservice/service/object/include/object_callback_proxy.h +++ b/services/distributeddataservice/service/object/include/object_callback_proxy.h @@ -51,21 +51,6 @@ private: static inline BrokerDelegator delegator_; }; -class ObjectRetrieveCallbackProxyBroker : public IObjectRetrieveCallback, public IRemoteBroker { -public: - DECLARE_INTERFACE_DESCRIPTOR(u"OHOS.DistributedObject.IObjectRetrieveCallback"); -}; - -class ObjectRetrieveCallbackProxy : public IRemoteProxy { -public: - explicit ObjectRetrieveCallbackProxy(const sptr &impl); - ~ObjectRetrieveCallbackProxy() = default; - void Completed(const std::map> &results, bool allReady) override; - -private: - static inline BrokerDelegator delegator_; -}; - class ObjectChangeCallbackProxyBroker : public IObjectChangeCallback, public IRemoteBroker { public: DECLARE_INTERFACE_DESCRIPTOR(u"OHOS.DistributedObject.IObjectChangeCallback"); diff --git a/services/distributeddataservice/service/object/include/object_common.h b/services/distributeddataservice/service/object/include/object_common.h index 759a48a5c57ac2bf5b477f9df2ca64948d1c3cfc..32410cc06396b154587e99899eb07052b443871b 100644 --- a/services/distributeddataservice/service/object/include/object_common.h +++ b/services/distributeddataservice/service/object/include/object_common.h @@ -24,7 +24,7 @@ enum ObjectDistributedType : int32_t { enum Status : int32_t { OBJECT_SUCCESS, - OBJECT_DBSTATUS_ERROR, + OBJECT_DB_ERROR, OBJECT_INNER_ERROR, OBJECT_PERMISSION_DENIED, OBJECT_STORE_NOT_FOUND diff --git a/services/distributeddataservice/service/object/include/object_data_listener.h b/services/distributeddataservice/service/object/include/object_data_listener.h index 7c3140c7b0f33f3e8c307a09416d3146b8ebdd45..462d1be3b609fb6852f0e49e3402c63d97d6dbd0 100644 --- a/services/distributeddataservice/service/object/include/object_data_listener.h +++ b/services/distributeddataservice/service/object/include/object_data_listener.h @@ -16,8 +16,8 @@ #ifndef DISTRIBUTEDDATAMGR_OBJECT_DATA_LISTENER_H #define DISTRIBUTEDDATAMGR_OBJECT_DATA_LISTENER_H -#include "kv_store_observer.h" #include "asset/asset_recv_callback_stub.h" +#include "kv_store_observer.h" namespace OHOS { namespace DistributedObject { using AssetObj = Storage::DistributedFile::AssetObj; @@ -31,17 +31,14 @@ public: class ObjectAssetsRecvListener : public Storage::DistributedFile::AssetRecvCallbackStub { public: - ObjectAssetsRecvListener() =default; - ~ObjectAssetsRecvListener() =default; - int32_t OnStart(const std::string &srcNetworkId, - const std::string &dstNetworkId, const std::string &sessionId, - const std::string &dstBundleName) override; - int32_t OnFinished(const std::string &srcNetworkId, - const sptr &assetObj, int32_t result) override; - int32_t OnRecvProgress(const std::string &srcNetworkId, - const sptr &assetObj, - uint64_t totalBytes, uint64_t processBytes) override; + ObjectAssetsRecvListener() = default; + ~ObjectAssetsRecvListener() = default; + int32_t OnStart(const std::string &srcNetworkId, const std::string &dstNetworkId, const std::string &sessionId, + const std::string &dstBundleName) override; + int32_t OnFinished(const std::string &srcNetworkId, const sptr &assetObj, int32_t result) override; + int32_t OnRecvProgress(const std::string &srcNetworkId, const sptr &assetObj, uint64_t totalBytes, + uint64_t processBytes) override; }; -} // namespace DistributedObject -} // namespace OHOS +} // namespace DistributedObject +} // namespace OHOS #endif // DISTRIBUTEDDATAMGR_OBJECT_DATA_LISTENER_H diff --git a/services/distributeddataservice/service/object/include/object_dms_handler.h b/services/distributeddataservice/service/object/include/object_dms_handler.h index ee4208a2798901d63f84f781595f62cbee99a74b..d29db29865986291ef6917d61ad7e058ee188102 100644 --- a/services/distributeddataservice/service/object/include/object_dms_handler.h +++ b/services/distributeddataservice/service/object/include/object_dms_handler.h @@ -18,8 +18,8 @@ #include -#include "dms_listener_stub.h" #include "distributed_sched_types.h" +#include "dms_listener_stub.h" namespace OHOS::DistributedObject { class DmsEventListener : public DistributedSchedule::DSchedEventListenerStub { diff --git a/services/distributeddataservice/service/object/include/object_manager.h b/services/distributeddataservice/service/object/include/object_manager.h index 85dd81349dd67e928395a2e6977a696f35543d45..d31711490f726e1eeadcbeb3b181522935ddd857 100644 --- a/services/distributeddataservice/service/object/include/object_manager.h +++ b/services/distributeddataservice/service/object/include/object_manager.h @@ -66,6 +66,7 @@ public: using DmAdaper = OHOS::DistributedData::DeviceManagerAdapter; using UriToSnapshot = std::shared_ptr>>; using StoreMetaData = OHOS::DistributedData::StoreMetaData; + using DBStatus = DistributedDB::DBStatus; enum RestoreStatus : int32_t { NONE = 0, @@ -88,7 +89,7 @@ public: int32_t RevokeSave( const std::string &appId, const std::string &sessionId, sptr callback); int32_t Retrieve(const std::string &bundleName, const std::string &sessionId, - sptr callback, uint32_t tokenId); + std::map> &data, bool &allReady); void SetData(const std::string &dataDir, const std::string &userId); int32_t Clear(); int32_t InitUserMeta(); @@ -159,11 +160,31 @@ private: bool Unmarshal(const json &node) override; std::string ToPropertyPrefix(); }; + + class StoreGuard { + public: + StoreGuard() : opened_(false) {} + int32_t Open() + { + int32_t result = ObjectStoreManager::GetInstance()->Open(); + opened_ = (result == OBJECT_SUCCESS); + return result; + } + + ~StoreGuard() + { + if (opened_) { + ObjectStoreManager::GetInstance()->Close(); + } + } + + private: + bool opened_; + }; DistributedDB::KvStoreNbDelegate *OpenObjectKvStore(); void FlushClosedStore(); void Close(); void ForceClose(); - int32_t SetSyncStatus(bool status); int32_t SaveToStore(const std::string &appId, const std::string &sessionId, const std::string &toDeviceId, const ObjectRecord &data); int32_t SyncOnStore(const std::string &prefix, const std::vector &deviceList, SyncCallBack &callback); @@ -178,11 +199,9 @@ private: bool IsNeedMetaSync(const StoreMetaData &meta, const std::vector &networkIds); int32_t DoSync(const std::string &prefix, const std::vector &deviceList, uint64_t sequenceId); std::string GetCurrentUser(); - void DoNotify(uint32_t tokenId, const CallbackInfo& value, const std::map& data, - bool allReady); - void DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value, const std::string& objectKey, bool allReady); + void DoNotify(const CallbackInfo &value, const std::map &data, bool allReady); + void DoNotifyAssetsReady(const CallbackInfo& value, const std::string& objectKey, bool allReady); void DoNotifyWaitAssetTimeout(const std::string &objectKey); - std::map> GetAssetsFromStore(const ObjectRecord& changedData); static bool IsAssetKey(const std::string& key); static bool IsAssetComplete(const ObjectRecord& result, const std::string& assetPrefix); Assets GetAssetsFromDBRecords(const ObjectRecord& result); @@ -192,7 +211,7 @@ private: void NotifyDataChanged(const std::map& data, const SaveInfo& saveInfo); int32_t PushAssets(const std::string &srcBundleName, const std::string &dstBundleName, const std::string &sessionId, const ObjectRecord &data, const std::string &deviceId); - int32_t WaitAssets(const std::string& objectKey, const SaveInfo& saveInfo, + void WaitAssets(const std::string& objectKey, const SaveInfo& saveInfo, const std::map& data); void PullAssets(const std::map& data, const SaveInfo& saveInfo); std::map GetObjectData(const ObjectRecord& changedData, SaveInfo& saveInfo, @@ -219,7 +238,8 @@ private: + DmAdaper::GetInstance().GetLocalDevice().udid; }; mutable std::shared_timed_mutex rwMutex_; - std::shared_ptr kvStoreDelegateManager_ = nullptr; + DistributedDB::KvStoreDelegateManager kvStoreDelegateManager_; + std::atomic hasInitMananger_{ false }; DistributedDB::KvStoreNbDelegate *delegate_ = nullptr; ObjectDataListener objectDataListener_; sptr objectAssetsRecvListener_ = nullptr; diff --git a/services/distributeddataservice/service/object/include/object_service_impl.h b/services/distributeddataservice/service/object/include/object_service_impl.h index 98f0fcd616966ccde0ad575a327a4d0fbf01c540..5faf4e00ccaf7250a3ffd602417619efbcb0cfe2 100644 --- a/services/distributeddataservice/service/object/include/object_service_impl.h +++ b/services/distributeddataservice/service/object/include/object_service_impl.h @@ -31,8 +31,8 @@ public: int32_t ObjectStoreSave(const std::string &bundleName, const std::string &sessionId, const std::string &deviceId, const std::map> &data, sptr callback) override; - int32_t ObjectStoreRetrieve( - const std::string &bundleName, const std::string &sessionId, sptr callback) override; + int32_t ObjectStoreRetrieve(const std::string &bundleName, const std::string &sessionId, + std::map> &data, bool &allReady) override; int32_t ObjectStoreRevokeSave(const std::string &bundleName, const std::string &sessionId, sptr callback) override; int32_t RegisterDataObserver(const std::string &bundleName, const std::string &sessionId, @@ -59,7 +59,7 @@ private: using StaticActs = DistributedData::StaticActs; class ObjectStatic : public StaticActs { public: - ~ObjectStatic() override {}; + ~ObjectStatic() override{}; int32_t OnAppUninstall(const std::string &bundleName, int32_t user, int32_t index) override; }; class Factory { @@ -71,8 +71,7 @@ private: }; void RegisterObjectServiceInfo(); void RegisterHandler(); - int32_t SaveMetaData(StoreMetaData& saveMeta); - void UpdateMetaData(); + int32_t SaveMetaData(StoreMetaData &saveMeta, const std::string &user); static Factory factory_; std::shared_ptr executors_; diff --git a/services/distributeddataservice/service/object/include/object_service_stub.h b/services/distributeddataservice/service/object/include/object_service_stub.h index e254a2bd1584eff386c236388ad3c9173f4e22a8..d2a56f1b3c53cbb1bde0d0ccab40d24f466224e1 100644 --- a/services/distributeddataservice/service/object/include/object_service_stub.h +++ b/services/distributeddataservice/service/object/include/object_service_stub.h @@ -17,8 +17,9 @@ #define DISTRIBUTED_OBJECT_SERVICE_STUB_H #include -#include "iobject_service.h" + #include "feature/feature_system.h" +#include "iobject_service.h" namespace OHOS::DistributedObject { using ObjectCode = ObjectStoreService::ObjectServiceInterfaceCode; @@ -27,7 +28,7 @@ public: int OnRemoteRequest(uint32_t code, MessageParcel &data, MessageParcel &reply) override; private: - bool CheckInterfaceToken(MessageParcel& data); + bool CheckInterfaceToken(MessageParcel &data); int32_t ObjectStoreSaveOnRemote(MessageParcel &data, MessageParcel &reply); int32_t ObjectStoreRevokeSaveOnRemote(MessageParcel &data, MessageParcel &reply); int32_t ObjectStoreRetrieveOnRemote(MessageParcel &data, MessageParcel &reply); @@ -54,5 +55,5 @@ private: &ObjectServiceStub::OnUnsubscribeProgress, }; }; -} // namespace OHOS::DistributedRdb +} // namespace OHOS::DistributedObject #endif diff --git a/services/distributeddataservice/service/object/src/object_asset_loader.cpp b/services/distributeddataservice/service/object/src/object_asset_loader.cpp index efabc95ba69400754d3fd6ab6e30816568d720d8..0f7428d87c213e458a8606b89a0a0aa2241c0169 100644 --- a/services/distributeddataservice/service/object/src/object_asset_loader.cpp +++ b/services/distributeddataservice/service/object/src/object_asset_loader.cpp @@ -36,7 +36,7 @@ void ObjectAssetLoader::SetThreadPool(std::shared_ptr executors) executors_ = executors; } -bool ObjectAssetLoader::Transfer(const int32_t userId, const std::string& bundleName, const std::string& deviceId, +bool ObjectAssetLoader::Transfer(int32_t userId, const std::string& bundleName, const std::string& deviceId, const DistributedData::Asset& asset) { AssetInfo assetInfo; diff --git a/services/distributeddataservice/service/object/src/object_callback_proxy.cpp b/services/distributeddataservice/service/object/src/object_callback_proxy.cpp index 32fdda71506e171c72ef55766507d71a730d0d69..954b548c7466f7bfbd692b87d0b26c68578efaa4 100644 --- a/services/distributeddataservice/service/object/src/object_callback_proxy.cpp +++ b/services/distributeddataservice/service/object/src/object_callback_proxy.cpp @@ -32,11 +32,6 @@ ObjectRevokeSaveCallbackProxy::ObjectRevokeSaveCallbackProxy(const sptr &impl) - : IRemoteProxy(impl) -{ -} - ObjectChangeCallbackProxy::ObjectChangeCallbackProxy(const sptr &impl) : IRemoteProxy(impl) { @@ -59,7 +54,7 @@ void ObjectSaveCallbackProxy::Completed(const std::map &re ZLOGE("Marshalling failed"); return; } - MessageOption mo { MessageOption::TF_SYNC }; + MessageOption mo{ MessageOption::TF_SYNC }; int error = Remote()->SendRequest(COMPLETED, data, reply, mo); if (error != 0) { ZLOGW("SendRequest failed, error %{public}d", error); @@ -78,26 +73,7 @@ void ObjectRevokeSaveCallbackProxy::Completed(int32_t status) ZLOGE("Marshalling failed, status:%{public}d", status); return; } - MessageOption mo { MessageOption::TF_SYNC }; - int error = Remote()->SendRequest(COMPLETED, data, reply, mo); - if (error != 0) { - ZLOGW("SendRequest failed, error %{public}d", error); - } -} - -void ObjectRetrieveCallbackProxy::Completed(const std::map> &results, bool allReady) -{ - MessageParcel data; - MessageParcel reply; - if (!data.WriteInterfaceToken(GetDescriptor())) { - ZLOGE("write descriptor failed"); - return; - } - if (!ITypesUtil::Marshal(data, results, allReady)) { - ZLOGE("Marshalling failed, allReady:%{public}d", allReady); - return; - } - MessageOption mo { MessageOption::TF_SYNC }; + MessageOption mo{ MessageOption::TF_SYNC }; int error = Remote()->SendRequest(COMPLETED, data, reply, mo); if (error != 0) { ZLOGW("SendRequest failed, error %{public}d", error); @@ -116,7 +92,7 @@ void ObjectChangeCallbackProxy::Completed(const std::mapSendRequest(COMPLETED, data, reply, mo); if (error != 0) { ZLOGW("SendRequest failed, error %{public}d", error); @@ -135,11 +111,11 @@ void ObjectProgressCallbackProxy::Completed(int32_t progress) ZLOGE("Marshal failed"); return; } - MessageOption mo { MessageOption::TF_ASYNC }; + MessageOption mo{ MessageOption::TF_ASYNC }; int error = Remote()->SendRequest(COMPLETED, data, reply, mo); if (error != 0) { ZLOGW("SendRequest failed, error %{public}d", error); } } -} // namespace DistributedObject -} // namespace OHOS \ No newline at end of file +} // namespace DistributedObject +} // namespace OHOS \ No newline at end of file diff --git a/services/distributeddataservice/service/object/src/object_data_listener.cpp b/services/distributeddataservice/service/object/src/object_data_listener.cpp index ad10e0391707a9f11c5a9bb34206d42b7aaec146..344c530177c649fc406977f0eeec0deb8ede21ad 100644 --- a/services/distributeddataservice/service/object/src/object_data_listener.cpp +++ b/services/distributeddataservice/service/object/src/object_data_listener.cpp @@ -16,6 +16,7 @@ #define LOG_TAG "ObjectDataListener" #include "object_data_listener.h" + #include "log_print.h" #include "object_manager.h" #include "object_radar_reporter.h" @@ -24,19 +25,15 @@ namespace OHOS { namespace DistributedObject { constexpr int32_t PROGRESS_MAX = 100; constexpr int32_t PROGRESS_INVALID = -1; -ObjectDataListener::ObjectDataListener() -{ -} +ObjectDataListener::ObjectDataListener() {} -ObjectDataListener::~ObjectDataListener() -{ -} +ObjectDataListener::~ObjectDataListener() {} void ObjectDataListener::OnChange(const DistributedDB::KvStoreChangedData &data) { const auto &insertedDatas = data.GetEntriesInserted(); const auto &updatedDatas = data.GetEntriesUpdated(); - std::map> changedData {}; + std::map> changedData{}; for (const auto &entry : insertedDatas) { std::string key(entry.key.begin(), entry.key.end()); changedData.insert_or_assign(std::move(key), entry.value); @@ -95,9 +92,9 @@ int32_t ObjectAssetsRecvListener::OnRecvProgress( DistributedData::Anonymous::Change(srcNetworkId).c_str(), DistributedData::Anonymous::Change(objectKey).c_str(), totalBytes, processBytes); - int32_t progress = static_cast((processBytes * 100.0 / totalBytes) * 0.9); + auto progress = static_cast((processBytes * 100.0 / totalBytes) * 0.9); ObjectStoreManager::GetInstance().NotifyAssetsRecvProgress(objectKey, progress); return OBJECT_SUCCESS; } -} // namespace DistributedObject -} // namespace OHOS +} // namespace DistributedObject +} // namespace OHOS diff --git a/services/distributeddataservice/service/object/src/object_dms_handler.cpp b/services/distributeddataservice/service/object/src/object_dms_handler.cpp index 7ec88aca2eaa6542b5697a2f68c3d72fc803fe15..50788d149b3829a20174408e89050d70148f5abb 100644 --- a/services/distributeddataservice/service/object/src/object_dms_handler.cpp +++ b/services/distributeddataservice/service/object/src/object_dms_handler.cpp @@ -45,10 +45,10 @@ void ObjectDmsHandler::ReceiveDmsEvent(DistributedSchedule::EventNotify &event) bool ObjectDmsHandler::IsContinue(const std::string &bundleName) { - std::lock_guard lock(mutex_); auto validityTime = std::chrono::steady_clock::now() - std::chrono::seconds(VALIDITY); DistributedHardware::DmDeviceInfo localDeviceInfo; DistributedHardware::DeviceManager::GetInstance().GetLocalDeviceInfo(PKG_NAME, localDeviceInfo); + std::lock_guard lock(mutex_); for (auto it = dmsEvents_.rbegin(); it != dmsEvents_.rend(); ++it) { if (it->second < validityTime) { continue; @@ -74,10 +74,10 @@ bool ObjectDmsHandler::IsContinue(const std::string &bundleName) std::string ObjectDmsHandler::GetDstBundleName(const std::string &srcBundleName, const std::string &dstNetworkId) { - std::lock_guard lock(mutex_); auto validityTime = std::chrono::steady_clock::now() - std::chrono::seconds(VALIDITY); DistributedHardware::DmDeviceInfo localDeviceInfo; DistributedHardware::DeviceManager::GetInstance().GetLocalDeviceInfo(PKG_NAME, localDeviceInfo); + std::lock_guard lock(mutex_); for (auto it = dmsEvents_.rbegin(); it != dmsEvents_.rend(); ++it) { if (it->second < validityTime) { continue; diff --git a/services/distributeddataservice/service/object/src/object_manager.cpp b/services/distributeddataservice/service/object/src/object_manager.cpp index 8abb69bbd2916d8c02ad8d850a82601e879fa026..671a233b6c1310f6ef443e38cc386313cbe920e7 100644 --- a/services/distributeddataservice/service/object/src/object_manager.cpp +++ b/services/distributeddataservice/service/object/src/object_manager.cpp @@ -61,19 +61,27 @@ ObjectStoreManager::ObjectStoreManager() ObjectStoreManager::~ObjectStoreManager() { + ZLOGI("ObjectStoreManager destroy"); + if (objectAssetsRecvListener_ != nullptr) { + auto status = DistributedFileDaemonManager::GetInstance().UnRegisterAssetCallback(objectAssetsRecvListener_); + ZLOGI("UnRegister assetsRecvListener, status:%{public}d", status); + } } DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore() { + if (!hasInitMananger_.load()) { + ZLOGE("Kv store delegate manager not has been inited"); + return nullptr; + } DistributedDB::KvStoreNbDelegate *store = nullptr; - DistributedDB::KvStoreNbDelegate::Option option; + DistributedDB::KvStoreNbDelegate::Option option{}; option.createDirByStoreIdOnly = true; option.syncDualTupleMode = true; option.secOption = { DistributedDB::S1, DistributedDB::ECE }; - ZLOGD("start GetKvStore"); - kvStoreDelegateManager_->GetKvStore(ObjectCommon::OBJECTSTORE_DB_STOREID, option, - [&store, this](DistributedDB::DBStatus dbStatus, DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate) { - if (dbStatus != DistributedDB::DBStatus::OK) { + kvStoreDelegateManager_.GetKvStore( + ObjectCommon::OBJECTSTORE_DB_STOREID, option, [&store, this](auto dbStatus, auto *kvStoreNbDelegate) { + if (dbStatus != DBStatus::OK) { ZLOGE("GetKvStore fail %{public}d", dbStatus); return; } @@ -83,11 +91,8 @@ DistributedDB::KvStoreNbDelegate *ObjectStoreManager::OpenObjectKvStore() } ZLOGI("GetKvStore successsfully"); store = kvStoreNbDelegate; - std::vector tmpKey; - DistributedDB::DBStatus status = store->RegisterObserver(tmpKey, - DistributedDB::ObserverMode::OBSERVER_CHANGES_FOREIGN, - &objectDataListener_); - if (status != DistributedDB::DBStatus::OK) { + auto status = store->RegisterObserver({}, DistributedDB::OBSERVER_CHANGES_FOREIGN, &objectDataListener_); + if (status != DBStatus::OK) { ZLOGE("RegisterObserver err %{public}d", status); } }); @@ -103,7 +108,7 @@ bool ObjectStoreManager::RegisterAssetsLister() objectAssetsRecvListener_ = new ObjectAssetsRecvListener(); } auto status = DistributedFileDaemonManager::GetInstance().RegisterAssetCallback(objectAssetsRecvListener_); - if (status != DistributedDB::DBStatus::OK) { + if (status != OBJECT_SUCCESS) { ZLOGE("Register assetsRecvListener err %{public}d", status); return false; } @@ -129,7 +134,8 @@ void ObjectStoreManager::ProcessSyncCallback(const std::mapCompleted(std::map()); return INVALID_ARGUMENT; } - int32_t result = Open(); + auto proxy = iface_cast(callback); + StoreGuard storeGuard; + int32_t result = storeGuard.Open(); if (result != OBJECT_SUCCESS) { ZLOGE("Open object kvstore failed, result: %{public}d", result); ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE, ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, ObjectStore::GETKV_FAILED, ObjectStore::FINISHED); - proxy->Completed(std::map()); return STORE_NOT_OPEN; } SaveUserToMeta(); @@ -172,8 +177,6 @@ int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &se ZLOGE("Save to store failed, result: %{public}d", result); ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE, ObjectStore::SAVE_TO_STORE, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED); - Close(); - proxy->Completed(std::map()); return result; } ZLOGI("Sync data, bundleName: %{public}s, sessionId: %{public}s, deviceId: %{public}s", dstBundleName.c_str(), @@ -181,18 +184,17 @@ int32_t ObjectStoreManager::Save(const std::string &appId, const std::string &se SyncCallBack syncCallback = [proxy, dstBundleName, sessionId, deviceId, this](const std::map &results) { ProcessSyncCallback(results, dstBundleName, sessionId, deviceId); - proxy->Completed(results); + if (proxy != nullptr) { + proxy->Completed(results); + } }; result = SyncOnStore(GetPropertyPrefix(dstBundleName, sessionId, deviceId), { deviceId }, syncCallback); if (result != OBJECT_SUCCESS) { ZLOGE("Sync data failed, result: %{public}d", result); ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::SAVE, ObjectStore::SYNC_DATA, ObjectStore::RADAR_FAILED, result, ObjectStore::FINISHED); - Close(); - proxy->Completed(std::map()); return result; } - Close(); return PushAssets(appId, dstBundleName, sessionId, data, deviceId); } @@ -212,7 +214,7 @@ int32_t ObjectStoreManager::PushAssets(const std::string &srcBundleName, const s assetObj->srcBundleName_ = srcBundleName; assetObj->dstNetworkId_ = deviceId; assetObj->sessionId_ = sessionId; - for (const auto& asset : assets) { + for (const auto &asset : assets) { assetObj->uris_.push_back(asset.uri); } if (objectAssetsSendListener_ == nullptr) { @@ -223,6 +225,7 @@ int32_t ObjectStoreManager::PushAssets(const std::string &srcBundleName, const s return status; } +// 该接口JsRevokeSave对外呈现的错误码,统一抛出为INNER_ERROR int32_t ObjectStoreManager::RevokeSave( const std::string &appId, const std::string &sessionId, sptr callback) { @@ -232,66 +235,61 @@ int32_t ObjectStoreManager::RevokeSave( (callback == nullptr) ? "nullptr" : "not null", appId.c_str(), Anonymous::Change(sessionId).c_str()); return INVALID_ARGUMENT; } - int32_t result = Open(); + StoreGuard storeGuard; + int32_t result = storeGuard.Open(); if (result != OBJECT_SUCCESS) { ZLOGE("Open failed, errCode = %{public}d", result); - proxy->Completed(STORE_NOT_OPEN); return STORE_NOT_OPEN; } result = RevokeSaveToStore(GetPrefixWithoutDeviceId(appId, sessionId)); if (result != OBJECT_SUCCESS) { ZLOGE("RevokeSave failed, errCode = %{public}d", result); - Close(); - proxy->Completed(result); return result; } std::vector deviceList; auto deviceInfos = DmAdaper::GetInstance().GetRemoteDevices(); std::for_each(deviceInfos.begin(), deviceInfos.end(), [&deviceList](AppDistributedKv::DeviceInfo info) { deviceList.emplace_back(info.networkId); }); - if (!deviceList.empty()) { - SyncCallBack tmp = [proxy](const std::map &results) { - ZLOGI("revoke save finished"); - proxy->Completed(OBJECT_SUCCESS); - }; - result = SyncOnStore(GetPropertyPrefix(appId, sessionId), deviceList, tmp); - if (result != OBJECT_SUCCESS) { - ZLOGE("sync failed, errCode = %{public}d", result); - proxy->Completed(result); - } - } else { + if (deviceList.empty()) { + ZLOGI("no remote device, no need to sync."); + proxy->Completed(OBJECT_SUCCESS); + return OBJECT_SUCCESS; + } + SyncCallBack tmp = [proxy](const std::map &results) { + ZLOGI("revoke save finished"); proxy->Completed(OBJECT_SUCCESS); }; - Close(); + result = SyncOnStore(GetPropertyPrefix(appId, sessionId), deviceList, tmp); + if (result != OBJECT_SUCCESS) { + ZLOGE("sync failed, errCode = %{public}d", result); + } return result; } -int32_t ObjectStoreManager::Retrieve( - const std::string &bundleName, const std::string &sessionId, sptr callback, uint32_t tokenId) +// todo 同步接口,没有耗时操作,不需要注册回调,直接把结果results通过IPC传过去就好了 +// todo 这里的callback参数应该被results代替 +int32_t ObjectStoreManager::Retrieve(const std::string &bundleName, const std::string &sessionId, + std::map> &data, bool &allReady) { auto proxy = iface_cast(callback); if (proxy == nullptr) { ZLOGE("proxy is nullptr, callback is %{public}s.", (callback == nullptr) ? "nullptr" : "not null"); return INVALID_ARGUMENT; } - int32_t result = Open(); + StoreGuard storeGuard; + int32_t result = storeGuard.Open(); if (result != OBJECT_SUCCESS) { ZLOGE("Open object kvstore failed, result: %{public}d", result); - proxy->Completed(ObjectRecord(), false); return ObjectStore::GETKV_FAILED; } - ObjectRecord results{}; - int32_t status = RetrieveFromStore(bundleName, sessionId, results); + int32_t status = RetrieveFromStore(bundleName, sessionId, data); if (status != OBJECT_SUCCESS) { - ZLOGE("Retrieve from store failed, status: %{public}d, close after one minute.", status); - CloseAfterMinute(); - proxy->Completed(ObjectRecord(), false); + ZLOGI("Retrieve from store failed, status: %{public}d, close after one minute.", status); return status; } - bool allReady = false; - Assets assets = GetAssetsFromDBRecords(results); - if (assets.empty() || results.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == results.end()) { + Assets assets = GetAssetsFromDBRecords(data); + if (assets.empty() || data.find(ObjectStore::FIELDS_PREFIX + ObjectStore::DEVICEID_KEY) == data.end()) { allReady = true; } else { restoreStatus_.ComputeIfPresent(bundleName + sessionId, [&allReady](const auto &key, auto &value) { @@ -309,10 +307,8 @@ int32_t ObjectStoreManager::Retrieve( Close(); if (status != OBJECT_SUCCESS) { ZLOGE("Revoke save failed, status: %{public}d", status); - proxy->Completed(ObjectRecord(), false); return status; } - proxy->Completed(results, allReady); if (allReady) { ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED); @@ -383,7 +379,8 @@ int32_t ObjectStoreManager::InitUserMeta() int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId, int32_t user) { - int32_t result = Open(); + StoreGuard storeGuard; + int32_t result = storeGuard.Open(); if (result != OBJECT_SUCCESS) { ZLOGE("Open store failed, result: %{public}d, appId: %{public}s, user: %{public}d", result, appId.c_str(), user); @@ -394,13 +391,11 @@ int32_t ObjectStoreManager::DeleteByAppId(const std::string &appId, int32_t user ZLOGE("Revoke save failed, result: %{public}d, appId: %{public}s, user: %{public}d", result, appId.c_str(), user); } - Close(); return result; } -void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId, - pid_t pid, uint32_t tokenId, - sptr callback) +void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, const std::string &sessionId, pid_t pid, + uint32_t tokenId, sptr callback) { if (bundleName.empty() || sessionId.empty()) { ZLOGD("ObjectStoreManager::RegisterRemoteCallback empty"); @@ -418,7 +413,7 @@ void ObjectStoreManager::RegisterRemoteCallback(const std::string &bundleName, c std::string prefix = bundleName + sessionId; callbacks_.Compute(tokenId, ([pid, &proxy, &prefix](const uint32_t key, CallbackInfo &value) { if (value.pid != pid) { - value = CallbackInfo { pid }; + value = CallbackInfo{ pid }; } value.observers_.insert_or_assign(prefix, proxy); return !value.observers_.empty(); @@ -454,7 +449,7 @@ void ObjectStoreManager::UnregisterRemoteCallback( void ObjectStoreManager::RegisterProgressObserverCallback(const std::string &bundleName, const std::string &sessionId, pid_t pid, uint32_t tokenId, sptr callback) { - if (bundleName.empty() || sessionId.empty()) { + if (bundleName.empty() || sessionId.empty() || callback == nullptr) { ZLOGD("ObjectStoreManager::RegisterProgressObserverCallback empty bundleName = %{public}s, sessionId = " "%{public}s", bundleName.c_str(), DistributedData::Anonymous::Change(sessionId).c_str()); @@ -528,6 +523,9 @@ void ObjectStoreManager::UnregisterProgressObserverCallback( void ObjectStoreManager::NotifyChange(const ObjectRecord &changedData) { ZLOGI("OnChange start, size:%{public}zu", changedData.size()); + if (changedData.size() == 0) { + return; + } bool hasAsset = false; SaveInfo saveInfo; for (const auto &[key, value] : changedData) { @@ -541,7 +539,7 @@ void ObjectStoreManager::NotifyChange(const ObjectRecord &changedData) ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName); callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) { - DoNotify(tokenId, value, data, true); // no asset, data ready means all ready + DoNotify(value, data, true); // no asset, data ready means all ready return false; }); return; @@ -599,7 +597,7 @@ void ObjectStoreManager::ComputeStatus(const std::string& objectKey, const SaveI ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS); callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) { - DoNotify(tokenId, value, data, true); + DoNotify(value, data, true); return false; }); } else { @@ -607,7 +605,7 @@ void ObjectStoreManager::ComputeStatus(const std::string& objectKey, const SaveI ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::DATA_RECV, ObjectStore::RADAR_SUCCESS, ObjectStore::START, saveInfo.bundleName); callbacks_.ForEach([this, &data](uint32_t tokenId, const CallbackInfo& value) { - DoNotify(tokenId, value, data, false); + DoNotify(value, data, false); return false; }); WaitAssets(key, saveInfo, data); @@ -659,7 +657,7 @@ int32_t ObjectStoreManager::WaitAssets(const std::string& objectKey, const SaveI const std::map& data) { if (executors_ == nullptr) { - ZLOGE("executors_ is null"); + ZLOGI("executors_ is nullptr"); return OBJECT_INNER_ERROR; } auto taskId = executors_->Schedule(std::chrono::seconds(WAIT_TIME), [this, objectKey, data, saveInfo]() { @@ -668,11 +666,8 @@ int32_t ObjectStoreManager::WaitAssets(const std::string& objectKey, const SaveI DoNotifyWaitAssetTimeout(objectKey); }); - objectTimer_.ComputeIfAbsent( - objectKey, [taskId](const std::string& key) -> auto { - return taskId; - }); - return OBJECT_SUCCESS; + objectTimer_.ComputeIfAbsent(objectKey, [taskId](const std::string &key) -> auto { return taskId; }); + return OBJECT_SUCCESS; } void ObjectStoreManager::PullAssets(const std::map& data, const SaveInfo& saveInfo) @@ -699,18 +694,17 @@ void ObjectStoreManager::NotifyAssetsRecvProgress(const std::string &objectKey, assetsRecvProgress_.InsertOrAssign(objectKey, progress); std::list> observers; bool flag = false; - processCallbacks_.ForEach( - [&objectKey, &observers, &flag](uint32_t tokenId, const ProgressCallbackInfo &value) { - if (value.observers_.empty()) { - flag = true; - return false; - } - auto it = value.observers_.find(objectKey); - if (it != value.observers_.end()) { - observers.push_back(it->second); - } + processCallbacks_.ForEach([&objectKey, &observers, &flag](uint32_t tokenId, const ProgressCallbackInfo &value) { + if (value.observers_.empty()) { + flag = true; return false; - }); + } + auto it = value.observers_.find(objectKey); + if (it != value.observers_.end()) { + observers.push_back(it->second); + } + return false; + }); if (flag) { std::lock_guard lock(progressMutex_); progressInfo_.insert_or_assign(objectKey, progress); @@ -736,27 +730,28 @@ void ObjectStoreManager::NotifyAssetsReady( return; } restoreStatus_.ComputeIfAbsent( - objectKey, [](const std::string& key) -> auto { - return RestoreStatus::NONE; - }); - restoreStatus_.Compute(objectKey, [this, &bundleName] (const auto &key, auto &value) { + objectKey, [](const std::string &key) -> auto { return RestoreStatus::NONE; }); + restoreStatus_.Compute(objectKey, [this, &bundleName](const auto &key, auto &value) { if (value == RestoreStatus::DATA_NOTIFIED) { value = RestoreStatus::ALL_READY; ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS); - callbacks_.ForEach([this, key](uint32_t tokenId, const CallbackInfo& value) { - DoNotifyAssetsReady(tokenId, value, key, true); + callbacks_.ForEach([this, key](uint32_t tokenId, const CallbackInfo &value) { + DoNotifyAssetsReady(value, key, true); return false; }); } else if (value == RestoreStatus::DATA_READY) { value = RestoreStatus::ALL_READY; ObjectStore::RadarReporter::ReportStage(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::ASSETS_RECV, ObjectStore::RADAR_SUCCESS); - auto [has, taskId] = objectTimer_.Find(key); - if (has) { - executors_->Remove(taskId); - objectTimer_.Erase(key); - } + objectTimer_.ComputeIfPresent(key, [executors = executors_](const auto &key, auto &value) { + if (executors == nullptr) { + ZLOGI("executors_ is nullptr"); + return; + } + executors->Remove(value); + return false; + }); } else { value = RestoreStatus::ASSETS_READY; ObjectStore::RadarReporter::ReportStateStart(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, @@ -781,15 +776,12 @@ bool ObjectStoreManager::IsAssetKey(const std::string& key) bool ObjectStoreManager::IsAssetComplete(const ObjectRecord& result, const std::string& assetPrefix) { - if (result.find(assetPrefix + ObjectStore::NAME_SUFFIX) == result.end() || - result.find(assetPrefix + ObjectStore::URI_SUFFIX) == result.end() || - result.find(assetPrefix + ObjectStore::PATH_SUFFIX) == result.end() || - result.find(assetPrefix + ObjectStore::CREATE_TIME_SUFFIX) == result.end() || - result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX) == result.end() || - result.find(assetPrefix + ObjectStore::SIZE_SUFFIX) == result.end()) { - return false; - } - return true; + return !(result.find(assetPrefix + ObjectStore::NAME_SUFFIX) == result.end() || + result.find(assetPrefix + ObjectStore::URI_SUFFIX) == result.end() || + result.find(assetPrefix + ObjectStore::PATH_SUFFIX) == result.end() || + result.find(assetPrefix + ObjectStore::CREATE_TIME_SUFFIX) == result.end() || + result.find(assetPrefix + ObjectStore::MODIFY_TIME_SUFFIX) == result.end() || + result.find(assetPrefix + ObjectStore::SIZE_SUFFIX) == result.end()); } Assets ObjectStoreManager::GetAssetsFromDBRecords(const ObjectRecord& result) @@ -842,21 +834,21 @@ void ObjectStoreManager::DoNotify(uint32_t tokenId, const CallbackInfo& value, continue; } observer.second->Completed((*it).second, allReady); + restoreStatus_.ComputeIfPresent([allReady](auto &key, auto &value) { + if (allReady) { + return false; + } + value = RestoreStatus::DATA_NOTIFIED; + return true; + }); if (allReady) { - restoreStatus_.Erase(observer.first); ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED); - } else { - restoreStatus_.ComputeIfPresent(observer.first, [](const auto &key, auto &value) { - value = RestoreStatus::DATA_NOTIFIED; - return true; - }); } } } -void ObjectStoreManager::DoNotifyAssetsReady(uint32_t tokenId, const CallbackInfo& value, - const std::string& objectKey, bool allReady) +void ObjectStoreManager::DoNotifyAssetsReady(const CallbackInfo& value, const std::string& objectKey, bool allReady) { if (executors_ == nullptr) { ZLOGE("executors_ is nullptr"); @@ -872,11 +864,14 @@ void ObjectStoreManager::DoNotifyAssetsReady(uint32_t tokenId, const CallbackInf ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::NOTIFY, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED); } - auto [has, taskId] = objectTimer_.Find(objectKey); - if (has) { - executors_->Remove(taskId); - objectTimer_.Erase(objectKey); - } + objectTimer_.ComputeIfPresent(objectKey, [executors = executors_](const auto &key, auto &value) { + if (executors == nullptr) { + ZLOGI("executors_ is nullptr"); + return; + } + executors->Remove(value); + return false; + }); } } @@ -895,11 +890,14 @@ void ObjectStoreManager::DoNotifyWaitAssetTimeout(const std::string &objectKey) } observer.second->Completed(ObjectRecord(), true); restoreStatus_.Erase(objectKey); - auto [has, taskId] = objectTimer_.Find(objectKey); - if (has) { - executors_->Remove(taskId); - objectTimer_.Erase(objectKey); - } + objectTimer_.ComputeIfPresent(objectKey, [executors = executors_](const auto &key, auto &value) { + if (executors == nullptr) { + ZLOGI("executors_ is nullptr"); + return; + } + executors->Remove(value); + return false; + }); ObjectStore::RadarReporter::ReportStateError(std::string(__FUNCTION__), ObjectStore::DATA_RESTORE, ObjectStore::NOTIFY, ObjectStore::RADAR_FAILED, ObjectStore::TIMEOUT, ObjectStore::FINISHED); } @@ -910,10 +908,9 @@ void ObjectStoreManager::DoNotifyWaitAssetTimeout(const std::string &objectKey) void ObjectStoreManager::SetData(const std::string &dataDir, const std::string &userId) { ZLOGI("enter, user: %{public}s", userId.c_str()); - kvStoreDelegateManager_ = std::make_shared - (DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId); - DistributedDB::KvStoreConfig kvStoreConfig { dataDir }; - auto status = kvStoreDelegateManager_->SetKvStoreConfig(kvStoreConfig); + kvStoreDelegateManager_(DistributedData::Bootstrap::GetInstance().GetProcessLabel(), userId); + hasInitMananger_ = true; + auto status = kvStoreDelegateManager_.SetKvStoreConfig({ dataDir }); if (status != DistributedDB::OK) { ZLOGE("Set kvstore config failed, status: %{public}d", status); return; @@ -923,23 +920,19 @@ void ObjectStoreManager::SetData(const std::string &dataDir, const std::string & int32_t ObjectStoreManager::Open() { - if (kvStoreDelegateManager_ == nullptr) { - ZLOGE("Kvstore delegate manager not init"); - return OBJECT_INNER_ERROR; - } std::unique_lock lock(rwMutex_); - if (delegate_ == nullptr) { - delegate_ = OpenObjectKvStore(); - if (delegate_ == nullptr) { - ZLOGE("Open object kvstore failed"); - return OBJECT_DBSTATUS_ERROR; - } - syncCount_ = 1; - ZLOGI("Open object kvstore success"); - } else { + if (delegate_ != nullptr) { syncCount_++; - ZLOGI("Object kvstore syncCount: %{public}d", syncCount_.load()); + ZLOGI("Object kv store syncCount: %{public}d", syncCount_.load()); + return OBJECT_SUCCESS; + } + delegate_ = OpenObjectKvStore(); + if (delegate_ == nullptr) { + ZLOGE("Open object kv store failed"); + return OBJECT_DB_ERROR; } + syncCount_ = 1; + ZLOGI("Open object kv store success"); return OBJECT_SUCCESS; } @@ -949,8 +942,9 @@ void ObjectStoreManager::ForceClose() if (delegate_ == nullptr) { return; } - auto status = kvStoreDelegateManager_->CloseKvStore(delegate_); - if (status != DistributedDB::DBStatus::OK) { + delegate_->UnRegisterObserver(&objectDataListener_); + auto status = kvStoreDelegateManager_.CloseKvStore(delegate_); + if (status != DBStatus::OK) { ZLOGE("CloseKvStore fail %{public}d", status); return; } @@ -979,17 +973,17 @@ void ObjectStoreManager::Close() FlushClosedStore(); } -void ObjectStoreManager::SyncCompleted( - const std::map &results, uint64_t sequenceId) +void ObjectStoreManager::SyncCompleted(const std::map &results, uint64_t sequenceId) { + // todo 对results往外传的时候,需要对高斯的错误码做转换 std::string userId; SequenceSyncManager::Result result = SequenceSyncManager::GetInstance()->Process(sequenceId, results, userId); if (result == SequenceSyncManager::SUCCESS_USER_HAS_FINISHED && userId == userId_) { - SetSyncStatus(false); + isSyncing_ = false; FlushClosedStore(); } for (const auto &item : results) { - if (item.second == DistributedDB::DBStatus::OK) { + if (item.second == DBStatus::OK) { ZLOGI("Sync data success, sequenceId: 0x%{public}" PRIx64 "", sequenceId); ObjectStore::RadarReporter::ReportStateFinished(std::string(__FUNCTION__), ObjectStore::SAVE, ObjectStore::SYNC_DATA, ObjectStore::RADAR_SUCCESS, ObjectStore::FINISHED); @@ -1010,9 +1004,14 @@ void ObjectStoreManager::FlushClosedStore() std::unique_lock lock(rwMutex_); if (!isSyncing_ && syncCount_ == 0 && delegate_ != nullptr) { ZLOGD("close store"); - auto status = kvStoreDelegateManager_->CloseKvStore(delegate_); - if (status != DistributedDB::DBStatus::OK) { + delegate_->UnRegisterObserver(&objectDataListener_); + auto status = kvStoreDelegateManager_.CloseKvStore(delegate_); + if (status != DBStatus::OK) { int timeOut = 1000; + if (executors == nullptr) { + ZLOGI("executors_ is nullptr"); + return; + } executors_->Schedule(std::chrono::milliseconds(timeOut), [this]() { FlushClosedStore(); }); @@ -1025,7 +1024,7 @@ void ObjectStoreManager::FlushClosedStore() void ObjectStoreManager::ProcessOldEntry(const std::string &appId) { - std::vector entries; + std::vector entries{}; { std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { @@ -1033,7 +1032,7 @@ void ObjectStoreManager::ProcessOldEntry(const std::string &appId) return; } auto status = delegate_->GetEntries(std::vector(appId.begin(), appId.end()), entries); - if (status != DistributedDB::DBStatus::OK) { + if (status != DBStatus::OK) { ZLOGE("Get old entries failed, bundleName: %{public}s, status %{public}d", appId.c_str(), status); return; } @@ -1095,10 +1094,10 @@ int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::str std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("delegate is nullptr."); - return E_DB_ERROR; + return OBJECT_DB_ERROR; } auto status = delegate_->PutBatch(entries); - if (status != DistributedDB::DBStatus::OK) { + if (status != DBStatus::OK) { ZLOGE("PutBatch failed, bundleName: %{public}s, sessionId: %{public}s, dstNetworkId: %{public}s, " "status: %{public}d", appId.c_str(), Anonymous::Change(sessionId).c_str(), Anonymous::Change(toDeviceId).c_str(), status); @@ -1114,20 +1113,26 @@ int32_t ObjectStoreManager::SaveToStore(const std::string &appId, const std::str int32_t ObjectStoreManager::SyncOnStore( const std::string &prefix, const std::vector &deviceList, SyncCallBack &callback) { + if (callback == nullptr || deviceList.empty()) { + ZLOGE("do not need sync, prefix: %{public}s", Anonymous::Change(prefix).c_str()); + return INVALID_ARGUMENT; + } std::vector syncDevices; for (auto const &device : deviceList) { if (device == LOCAL_DEVICE) { ZLOGI("Save to local, do not need sync, prefix: %{public}s", Anonymous::Change(prefix).c_str()); - callback({{LOCAL_DEVICE, OBJECT_SUCCESS}}); + callback({ { LOCAL_DEVICE, OBJECT_SUCCESS } }); return OBJECT_SUCCESS; } syncDevices.emplace_back(DmAdaper::GetInstance().GetUuidByNetworkId(device)); } if (syncDevices.empty()) { ZLOGI("Device list is empty, prefix: %{public}s", Anonymous::Change(prefix).c_str()); - callback(std::map()); - return OBJECT_SUCCESS; + callback({}); + return OBJECT_SUCCESS; // todo 这部分不能放在客户端吗?? } + // todo SequenceSyncManager这个东西十分没必要,不应该在Sync开始之前就去注册,得在高斯的Sync之前注册 + // todo 可以一步完成的东西没必要经过一个中间的存储过程 uint64_t sequenceId = SequenceSyncManager::GetInstance()->AddNotifier(userId_, callback); int32_t id = AccountDelegate::GetInstance()->GetUserByToken(IPCSkeleton::GetCallingFullTokenID()); @@ -1152,34 +1157,28 @@ int32_t ObjectStoreManager::DoSync(const std::string &prefix, const std::vector< std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("db store was closed."); - return E_DB_ERROR; + return OBJECT_DB_ERROR; } DistributedDB::Query dbQuery = DistributedDB::Query::Select(); dbQuery.PrefixKey(std::vector(prefix.begin(), prefix.end())); ZLOGI("Start sync data, sequenceId: 0x%{public}" PRIx64 "", sequenceId); auto status = delegate_->Sync(deviceList, DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY, - [this, sequenceId](const std::map &devicesMap) { + [this, sequenceId](const std::map &devicesMap) { ZLOGI("Sync data finished, sequenceId: 0x%{public}" PRIx64 "", sequenceId); - std::map result; + std::map result; for (auto &item : devicesMap) { result[DmAdaper::GetInstance().ToNetworkID(item.first)] = item.second; } SyncCompleted(result, sequenceId); }, dbQuery, false); - if (status != DistributedDB::DBStatus::OK) { + if (status != DBStatus::OK) { ZLOGE("Sync data failed, prefix: %{public}s, sequenceId: 0x%{public}" PRIx64 ", status: %{public}d", Anonymous::Change(prefix).c_str(), sequenceId, status); std::string tmp; SequenceSyncManager::GetInstance()->DeleteNotifier(sequenceId, tmp); return status; } - SetSyncStatus(true); - return OBJECT_SUCCESS; -} - -int32_t ObjectStoreManager::SetSyncStatus(bool status) -{ - isSyncing_ = status; + isSyncing_ = true; return OBJECT_SUCCESS; } @@ -1189,16 +1188,16 @@ int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix) std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("db store was closed."); - return E_DB_ERROR; + return OBJECT_DB_ERROR; } auto status = delegate_->GetEntries(std::vector(prefix.begin(), prefix.end()), entries); - if (status == DistributedDB::DBStatus::NOT_FOUND) { + if (status == DBStatus::NOT_FOUND) { ZLOGI("Get entries empty, prefix: %{public}s", Anonymous::Change(prefix).c_str()); return OBJECT_SUCCESS; } - if (status != DistributedDB::DBStatus::OK) { + if (status != DBStatus::OK) { ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status); - return DB_ERROR; + return OBJECT_DB_ERROR; } std::vector> keys; std::for_each(entries.begin(), entries.end(), [&keys](const DistributedDB::Entry &entry) { @@ -1208,10 +1207,10 @@ int32_t ObjectStoreManager::RevokeSaveToStore(const std::string &prefix) return OBJECT_SUCCESS; } status = delegate_->DeleteBatch(keys); - if (status != DistributedDB::DBStatus::OK) { + if (status != DBStatus::OK) { ZLOGE("Delete entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status); - return DB_ERROR; + return OBJECT_DB_ERROR; } ZLOGI("Delete entries success, prefix: %{public}s, count: %{public}zu", Anonymous::Change(prefix).c_str(), keys.size()); @@ -1226,16 +1225,13 @@ int32_t ObjectStoreManager::RetrieveFromStore(const std::string &appId, const st std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("db store was closed."); - return E_DB_ERROR; + return OBJECT_DB_ERROR; } auto status = delegate_->GetEntries(std::vector(prefix.begin(), prefix.end()), entries); - if (status == DistributedDB::DBStatus::NOT_FOUND) { - ZLOGI("Get entries empty, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status); - return KEY_NOT_FOUND; - } - if (status != DistributedDB::DBStatus::OK) { + // 服务侧不关心该错误码具体是什么,该错误码不对外 + if (status != DBStatus::OK) { ZLOGE("Get entries failed, prefix: %{public}s, status: %{public}d", Anonymous::Change(prefix).c_str(), status); - return DB_ERROR; + return OBJECT_DB_ERROR; } ZLOGI("GetEntries success,prefix:%{public}s,count:%{public}zu", Anonymous::Change(prefix).c_str(), entries.size()); for (const auto &entry : entries) { @@ -1387,7 +1383,7 @@ uint64_t SequenceSyncManager::AddNotifier(const std::string &userId, SyncCallBac } SequenceSyncManager::Result SequenceSyncManager::Process( - uint64_t sequenceId, const std::map &results, std::string &userId) + uint64_t sequenceId, const std::map &results, std::string &userId) { std::lock_guard lock(notifierLock_); if (seqIdCallbackRelations_.count(sequenceId) == 0) { @@ -1396,7 +1392,7 @@ SequenceSyncManager::Result SequenceSyncManager::Process( } std::map syncResults; for (const auto &item : results) { - syncResults[item.first] = item.second == DistributedDB::DBStatus::OK ? 0 : -1; + syncResults[item.first] = item.second == DBStatus::OK ? 0 : -1; } seqIdCallbackRelations_[sequenceId](syncResults); ZLOGD("end complete"); @@ -1450,8 +1446,8 @@ int32_t ObjectStoreManager::BindAsset(const uint32_t tokenId, const std::string& return std::make_shared>>(); }); auto snapshots = snapshots_.Find(snapshotKey).second; - bindSnapshots_.Compute(storeKey, [this, &asset, snapshots] (const auto &key, auto &value) { - value->emplace(std::pair{asset.uri, snapshots}); + bindSnapshots_.Compute(storeKey, [this, &asset, snapshots](const auto &key, auto &value) { + value->emplace(std::pair{ asset.uri, snapshots }); return true; }); @@ -1468,7 +1464,7 @@ int32_t ObjectStoreManager::BindAsset(const uint32_t tokenId, const std::string& storeInfo.user = tokenInfo.userID; storeInfo.storeName = bindInfo.storeName; - snapshots_.Compute(snapshotKey, [this, &asset, &bindInfo, &storeInfo] (const auto &key, auto &value) { + snapshots_.Compute(snapshotKey, [this, &asset, &bindInfo, &storeInfo](const auto &key, auto &value) { value->BindAsset(ValueProxy::Convert(std::move(asset)), ConvertBindInfo(bindInfo), storeInfo); return true; }); @@ -1489,23 +1485,16 @@ DistributedData::AssetBindInfo ObjectStoreManager::ConvertBindInfo(ObjectStore:: int32_t ObjectStoreManager::OnAssetChanged(const uint32_t tokenId, const std::string& appId, const std::string& sessionId, const std::string& deviceId, const ObjectStore::Asset& asset) { - const int32_t userId = AccountDelegate::GetInstance()->GetUserByToken(tokenId); auto objectAsset = asset; - Asset dataAsset = ValueProxy::Convert(std::move(objectAsset)); + Asset dataAsset = ValueProxy::Convert(std::move(objectAsset)); auto snapshotKey = appId + SEPERATOR + sessionId; - int32_t res = OBJECT_SUCCESS; - bool exist = snapshots_.ComputeIfPresent(snapshotKey, - [&res, &dataAsset, &deviceId](const std::string &key, std::shared_ptr snapshot) { - if (snapshot != nullptr) { - res = snapshot->OnDataChanged(dataAsset, deviceId); // needChange - } - return snapshot != nullptr; - }); - if (exist) { - return res; + auto [exist, snapshot] = snapshots_.Find(snapshotKey); + if (exist && snapshot != nullptr) { + return snapshot->OnDataChanged(dataAsset, deviceId); // needChange } - auto block = std::make_shared>>(WAIT_TIME, std::tuple{ true, true }); + const int32_t userId = AccountDelegate::GetInstance()->GetUserByToken(tokenId); + auto block = std::make_shared>>(WAIT_TIME, std::tuple{ true, false }); ObjectAssetLoader::GetInstance().TransferAssetsAsync(userId, appId, deviceId, { dataAsset }, [block](bool ret) { block->SetValue({ false, ret }); }); @@ -1537,6 +1526,7 @@ void ObjectStoreManager::DeleteSnapshot(const std::string& bundleName, const std ZLOGD("Not find snapshot, don't need delete"); return; } + // todo 这里的snapshot传递给了RDB,object修改share_ptr,rdb读取它,并发存在问题 bindSnapshots_.ForEach([snapshot](auto& key, auto& value) { for (auto pos = value->begin(); pos != value->end();) { if (pos->second == snapshot) { diff --git a/services/distributeddataservice/service/object/src/object_service_impl.cpp b/services/distributeddataservice/service/object/src/object_service_impl.cpp index 31d8232e0360295c74a5a0f3a9b418648a0c3e7e..7cc05f409b059c9a224fcbb6f87aad78cd696817 100644 --- a/services/distributeddataservice/service/object/src/object_service_impl.cpp +++ b/services/distributeddataservice/service/object/src/object_service_impl.cpp @@ -142,7 +142,9 @@ int32_t ObjectServiceImpl::OnInitialize() } executors_->Schedule(std::chrono::seconds(WAIT_ACCOUNT_SERVICE), [this]() { StoreMetaData saveMeta; - SaveMetaData(saveMeta); + int foregroundUserId = 0; + DistributedData::AccountDelegate::GetInstance()->QueryForegroundUserId(foregroundUserId); + SaveMetaData(saveMeta, std::to_string(foregroundUserId)); ObjectStoreManager::GetInstance().SetData(saveMeta.dataDir, saveMeta.user); ObjectStoreManager::GetInstance().InitUserMeta(); RegisterObjectServiceInfo(); @@ -153,7 +155,7 @@ int32_t ObjectServiceImpl::OnInitialize() return OBJECT_SUCCESS; } -int32_t ObjectServiceImpl::SaveMetaData(StoreMetaData &saveMeta) +int32_t ObjectServiceImpl::SaveMetaData(StoreMetaData &saveMeta, const std::string &user) { auto localDeviceId = DmAdapter::GetInstance().GetLocalDevice().uuid; if (localDeviceId.empty()) { @@ -176,9 +178,7 @@ int32_t ObjectServiceImpl::SaveMetaData(StoreMetaData &saveMeta) saveMeta.storeType = ObjectDistributedType::OBJECT_SINGLE_VERSION; saveMeta.dataType = DistributedKv::DataType::TYPE_DYNAMICAL; saveMeta.authType = DistributedKv::AuthType::IDENTICAL_ACCOUNT; - int foregroundUserId = 0; - DistributedData::AccountDelegate::GetInstance()->QueryForegroundUserId(foregroundUserId); - saveMeta.user = std::to_string(foregroundUserId); + saveMeta.user = user; saveMeta.dataDir = METADATA_STORE_PATH; if (!DistributedData::DirectoryManager::GetInstance().CreateDirectory(saveMeta.dataDir)) { ZLOGE("Create directory error, dataDir: %{public}s.", Anonymous::Change(saveMeta.dataDir).c_str()); @@ -187,7 +187,7 @@ int32_t ObjectServiceImpl::SaveMetaData(StoreMetaData &saveMeta) bool isSaved = DistributedData::MetaDataManager::GetInstance().SaveMeta(saveMeta.GetKeyWithoutPath(), saveMeta) && DistributedData::MetaDataManager::GetInstance().SaveMeta(saveMeta.GetKey(), saveMeta, true); if (!isSaved) { - ZLOGE("SaveMeta failed"); + ZLOGE("SaveMeta failed, store:%{public}s", Anonymous::Change(saveMeta.GetStoreAlias()).c_str()); return OBJECT_INNER_ERROR; } DistributedData::AppIDMetaData appIdMeta; @@ -196,6 +196,7 @@ int32_t ObjectServiceImpl::SaveMetaData(StoreMetaData &saveMeta) isSaved = DistributedData::MetaDataManager::GetInstance().SaveMeta(appIdMeta.GetKey(), appIdMeta, true); if (!isSaved) { ZLOGE("Save appIdMeta failed"); + return OBJECT_INNER_ERROR; } ZLOGI("SaveMeta success appId %{public}s, storeId %{public}s", saveMeta.appId.c_str(), saveMeta.GetStoreAlias().c_str()); @@ -204,13 +205,15 @@ int32_t ObjectServiceImpl::SaveMetaData(StoreMetaData &saveMeta) int32_t ObjectServiceImpl::OnUserChange(uint32_t code, const std::string &user, const std::string &account) { + ZLOGI("code:%{public}d, user:%{public}s, account:%{public}s", code, user.c_str(), + Anonymous::Change(account).c_str()); if (code == static_cast(AccountStatus::DEVICE_ACCOUNT_SWITCHED)) { int32_t status = ObjectStoreManager::GetInstance().Clear(); if (status != OBJECT_SUCCESS) { ZLOGE("Clear fail user:%{public}s, status: %{public}d", user.c_str(), status); } StoreMetaData saveMeta; - SaveMetaData(saveMeta); + SaveMetaData(saveMeta, user); ObjectStoreManager::GetInstance().SetData(saveMeta.dataDir, saveMeta.user); } return Feature::OnUserChange(code, user, account); @@ -233,8 +236,8 @@ int32_t ObjectServiceImpl::ObjectStoreRevokeSave( return OBJECT_SUCCESS; } -int32_t ObjectServiceImpl::ObjectStoreRetrieve( - const std::string &bundleName, const std::string &sessionId, sptr callback) +int32_t ObjectServiceImpl::ObjectStoreRetrieve(const std::string &bundleName, const std::string &sessionId, + std::map> &data, bool &allReady) { ZLOGI("begin."); uint32_t tokenId = IPCSkeleton::GetCallingTokenID(); @@ -242,7 +245,7 @@ int32_t ObjectServiceImpl::ObjectStoreRetrieve( if (status != OBJECT_SUCCESS) { return status; } - status = ObjectStoreManager::GetInstance().Retrieve(bundleName, sessionId, callback, tokenId); + status = ObjectStoreManager::GetInstance().Retrieve(bundleName, sessionId, data, allReady); if (status != OBJECT_SUCCESS) { ZLOGE("retrieve fail %{public}d", status); return status; @@ -394,6 +397,7 @@ ObjectServiceImpl::ObjectServiceImpl() meta.bundleName = eventInfo.bundleName; meta.user = std::to_string(eventInfo.user); meta.deviceId = DmAdapter::GetInstance().GetLocalDevice().uuid; + // todo 这里获取不到联系人的元数据,需要孟瑶修改处理 if (!MetaDataManager::GetInstance().LoadMeta(meta.GetKeyWithoutPath(), meta)) { ZLOGE("meta empty, bundleName:%{public}s, storeId:%{public}s", meta.bundleName.c_str(), meta.GetStoreAlias().c_str()); diff --git a/services/distributeddataservice/service/object/src/object_service_stub.cpp b/services/distributeddataservice/service/object/src/object_service_stub.cpp index a5d521a390fe711e4a51577e87adf1876d4eea47..9893d14b919a5a0f7bbf38d931dc8501e1ae2ecf 100644 --- a/services/distributeddataservice/service/object/src/object_service_stub.cpp +++ b/services/distributeddataservice/service/object/src/object_service_stub.cpp @@ -241,7 +241,7 @@ bool ObjectServiceStub::CheckInterfaceToken(MessageParcel& data) int ObjectServiceStub::OnRemoteRequest(uint32_t code, MessageParcel& data, MessageParcel& reply) { - ZLOGD("code:%{public}u, callingPid:%{public}d", code, IPCSkeleton::GetCallingPid()); + ZLOGI("code:%{public}u, callingPid:%{public}d", code, IPCSkeleton::GetCallingPid()); if (!CheckInterfaceToken(data)) { return -1; } diff --git a/services/distributeddataservice/service/object/src/object_snapshot.cpp b/services/distributeddataservice/service/object/src/object_snapshot.cpp index 14775b73024ecf4f9c78c89c7e57ef2ab62a4393..8159e5002eb6c020192e007589f438b6b38bd40d 100644 --- a/services/distributeddataservice/service/object/src/object_snapshot.cpp +++ b/services/distributeddataservice/service/object/src/object_snapshot.cpp @@ -34,10 +34,7 @@ int32_t ObjectSnapshot::Upload(Asset& asset) bool ObjectSnapshot::IsBoundAsset(const Asset& asset) { auto it = changedAssets_.find(asset.uri); - if (it != changedAssets_.end()) { - return true; - } - return false; + return it != changedAssets_.end(); } int32_t ObjectSnapshot::Download(Asset& asset) @@ -88,6 +85,7 @@ int32_t ObjectSnapshot::BindAsset(const Asset& asset, const DistributedData::Ass ZLOGD("Asset is bound. asset.uri:%{public}s :", Anonymous::Change(asset.uri).c_str()); return E_OK; } + // todo 加锁保护 changedAssets_[asset.uri] = ChangedAssetInfo(asset, bindInfo, storeInfo); return E_OK; } diff --git a/services/distributeddataservice/service/test/object_manager_test.cpp b/services/distributeddataservice/service/test/object_manager_test.cpp index c0d4b94cf410c238547ca4d648a685338268bc02..a3d53224d10aa8293b06ffe8e899d82f33ca8fe0 100644 --- a/services/distributeddataservice/service/test/object_manager_test.cpp +++ b/services/distributeddataservice/service/test/object_manager_test.cpp @@ -1123,7 +1123,7 @@ HWTEST_F(ObjectManagerTest, BindAsset001, TestSize.Level0) std::string bundleName = "BindAsset"; uint32_t tokenId = IPCSkeleton::GetCallingTokenID(); auto result = manager.BindAsset(tokenId, bundleName, sessionId_, assetValue_, assetBindInfo_); - ASSERT_EQ(result, DistributedObject::OBJECT_DBSTATUS_ERROR); + ASSERT_EQ(result, DistributedObject::OBJECT_DB_ERROR); } /**