diff --git a/services/distributeddataservice/service/object/BUILD.gn b/services/distributeddataservice/service/object/BUILD.gn index 35412b7339eb950897272089f118acf3bffbec75..7ebc673313d939321486b3932fbabfe1ffd857b4 100644 --- a/services/distributeddataservice/service/object/BUILD.gn +++ b/services/distributeddataservice/service/object/BUILD.gn @@ -18,6 +18,7 @@ config("object_public_config") { include_dirs = [ "${data_service_path}/service/common", + "${data_service_path}/service/matrix/include", "${data_service_path}/adapter/include/communicator", "${data_service_path}/adapter/include/utils", ] diff --git a/services/distributeddataservice/service/object/include/object_manager.h b/services/distributeddataservice/service/object/include/object_manager.h index 7be1336b1a7f57d54f8faf9d3af98981f42cf27e..d78188035bb7a480134f94a685dc459421e58b79 100644 --- a/services/distributeddataservice/service/object/include/object_manager.h +++ b/services/distributeddataservice/service/object/include/object_manager.h @@ -21,6 +21,7 @@ #include "device_manager_adapter.h" #include "kv_store_delegate_manager.h" #include "kvstore_sync_callback.h" +#include "metadata/store_meta_data.h" #include "object_asset_loader.h" #include "object_callback.h" #include "object_callback_proxy.h" @@ -64,6 +65,7 @@ class ObjectStoreManager { public: using DmAdaper = OHOS::DistributedData::DeviceManagerAdapter; using UriToSnapshot = std::shared_ptr>>; + using StoreMetaData = OHOS::DistributedData::StoreMetaData; enum RestoreStatus : int32_t { NONE = 0, @@ -157,8 +159,8 @@ private: std::string ToPropertyPrefix(); }; DistributedDB::KvStoreNbDelegate *OpenObjectKvStore(); - void FlushClosedStore(); - void Close(); + void FlushClosedStore(bool forceClose = false); + void Close(bool forceClose = false); int32_t SetSyncStatus(bool status); int32_t SaveToStore(const std::string &appId, const std::string &sessionId, const std::string &toDeviceId, const ObjectRecord &data); @@ -171,6 +173,8 @@ private: void ProcessSyncCallback(const std::map &results, const std::string &appId, const std::string &sessionId, const std::string &deviceId); void SaveUserToMeta(); + bool IsNeedMetaSync(const StoreMetaData &meta, const std::vector &networkIds); + int32_t DoSync(const std::string &prefix, const std::vector &deviceList, uint64_t sequenceId); std::string GetCurrentUser(); void DoNotify(uint32_t tokenId, const CallbackInfo& value, const std::map& data, bool allReady); diff --git a/services/distributeddataservice/service/object/src/object_manager.cpp b/services/distributeddataservice/service/object/src/object_manager.cpp index cc7f041ae5bcac083b6142b73056753360e2848d..2abc48cb93937ca8bd4ea734e41433e3fca55428 100644 --- a/services/distributeddataservice/service/object/src/object_manager.cpp +++ b/services/distributeddataservice/service/object/src/object_manager.cpp @@ -26,8 +26,11 @@ #include "common/string_utils.h" #include "datetime_ex.h" #include "distributed_file_daemon_manager.h" +#include "device_matrix.h" +#include "ipc_skeleton.h" #include "kvstore_utils.h" #include "log_print.h" +#include "metadata/capability_meta_data.h" #include "metadata/meta_data_manager.h" #include "metadata/object_user_meta_data.h" #include "metadata/store_meta_data.h" @@ -44,6 +47,8 @@ using Account = OHOS::DistributedData::AccountDelegate; using AccessTokenKit = Security::AccessToken::AccessTokenKit; using ValueProxy = OHOS::DistributedData::ValueProxy; using DistributedFileDaemonManager = Storage::DistributedFile::DistributedFileDaemonManager; +using DmAdapter = OHOS::DistributedData::DeviceManagerAdapter; + constexpr const char *SAVE_INFO = "p_###SAVEINFO###"; constexpr int32_t PROGRESS_MAX = 100; constexpr int32_t PROGRESS_INVALID = -1; @@ -321,7 +326,7 @@ int32_t ObjectStoreManager::Clear() result = RevokeSaveToStore(""); callbacks_.Clear(); processCallbacks_.Clear(); - Close(); + Close(true); return result; } @@ -573,6 +578,35 @@ void ObjectStoreManager::ComputeStatus(const std::string& objectKey, const SaveI }); } +bool ObjectStoreManager::IsNeedMetaSync(const StoreMetaData &meta, const std::vector &uuids) +{ + using namespace OHOS::DistributedData; + bool isAfterMeta = false; + for (const auto &uuid : uuids) { + auto metaData = meta; + metaData.deviceId = uuid; + CapMetaData capMeta; + auto capKey = CapMetaRow::GetKeyFor(uuid); + bool flag = !MetaDataManager::GetInstance().LoadMeta(std::string(capKey.begin(), capKey.end()), capMeta) || + !MetaDataManager::GetInstance().LoadMeta(metaData.GetKey(), metaData); + if (flag) { + isAfterMeta = true; + break; + } + auto [exist, mask] = DeviceMatrix::GetInstance().GetRemoteMask(uuid); + if ((mask & DeviceMatrix::META_STORE_MASK) == DeviceMatrix::META_STORE_MASK) { + isAfterMeta = true; + break; + } + auto [existLocal, localMask] = DeviceMatrix::GetInstance().GetMask(uuid); + if ((localMask & DeviceMatrix::META_STORE_MASK) == DeviceMatrix::META_STORE_MASK) { + isAfterMeta = true; + break; + } + } + return isAfterMeta; +} + void ObjectStoreManager::NotifyDataChanged(const std::map& data, const SaveInfo& saveInfo) { for (auto const& [objectKey, results] : data) { @@ -854,21 +888,21 @@ int32_t ObjectStoreManager::Open() return OBJECT_SUCCESS; } -void ObjectStoreManager::Close() +void ObjectStoreManager::Close(bool forceClose) { std::lock_guard lock(kvStoreMutex_); if (delegate_ == nullptr) { return; } int32_t taskCount = delegate_->GetTaskCount(); - if (taskCount > 0 && syncCount_ == 1) { + if (taskCount > 0 && syncCount_ == 1 && !forceClose) { CloseAfterMinute(); ZLOGW("Store is busy, close after a minute, task count: %{public}d", taskCount); return; } syncCount_--; ZLOGI("closed a store, syncCount = %{public}d", syncCount_); - FlushClosedStore(); + FlushClosedStore(forceClose); } void ObjectStoreManager::SyncCompleted( @@ -894,7 +928,7 @@ void ObjectStoreManager::SyncCompleted( } } -void ObjectStoreManager::FlushClosedStore() +void ObjectStoreManager::FlushClosedStore(bool forceClose) { std::lock_guard lock(kvStoreMutex_); if (!isSyncing_ && syncCount_ == 0 && delegate_ != nullptr) { @@ -902,8 +936,8 @@ void ObjectStoreManager::FlushClosedStore() auto status = kvStoreDelegateManager_->CloseKvStore(delegate_); if (status != DistributedDB::DBStatus::OK) { int timeOut = 1000; - executors_->Schedule(std::chrono::milliseconds(timeOut), [this]() { - FlushClosedStore(); + executors_->Schedule(std::chrono::milliseconds(timeOut), [this, forceClose]() { + FlushClosedStore(forceClose); }); ZLOGE("GetEntries fail %{public}d", status); return; @@ -913,6 +947,19 @@ void ObjectStoreManager::FlushClosedStore() delete objectDataListener_; objectDataListener_ = nullptr; } + } else if (forceClose && delegate_ != nullptr) { + ZLOGW("Force closing store: isSyncing_ = %{public}s, syncCount_ = %{public}d.", + isSyncing_ ? "true" : "false", syncCount_); + auto status = kvStoreDelegateManager_->CloseKvStore(delegate_); + ZLOGW("Force close result: %{public}d.", status); + delegate_ = nullptr; + if (objectDataListener_ != nullptr) { + delete objectDataListener_; + objectDataListener_ = nullptr; + } + + isSyncing_ = false; + syncCount_ = 0; } } @@ -1010,10 +1057,39 @@ int32_t ObjectStoreManager::SyncOnStore( return OBJECT_SUCCESS; } uint64_t sequenceId = SequenceSyncManager::GetInstance()->AddNotifier(userId_, callback); + + int32_t id = AccountDelegate::GetInstance()->GetUserByToken(IPCSkeleton::GetCallingFullTokenID()); + StoreMetaData meta = StoreMetaData(std::to_string(id), Bootstrap::GetInstance().GetProcessLabel(), + DistributedObject::ObjectCommon::OBJECTSTORE_DB_STOREID); + auto uuids = DmAdapter::GetInstance().ToUUID(syncDevices); + bool isNeedMetaSync = IsNeedMetaSync(meta, uuids); + if (isNeedMetaSync) { + bool metaRes = MetaDataManager::GetInstance().Sync(uuids, [this, prefix, syncDevices, sequenceId] (auto &results) { + if (DoSync(prefix, syncDevices, sequenceId) != OBJECT_SUCCESS) { + ZLOGE("Store sync failed"); + } + }); + if (!metaRes) { + ZLOGE("prefix:%{public}s, meta sync failed", prefix.c_str()); + return E_DB_ERROR; + } else { + ZLOGI("meta sync success"); + return OBJECT_SUCCESS; + } + } + return DoSync(prefix, syncDevices, sequenceId); +} + +int32_t ObjectStoreManager::DoSync(const std::string &prefix, const std::vector &deviceList, uint64_t sequenceId) +{ DistributedDB::Query dbQuery = DistributedDB::Query::Select(); dbQuery.PrefixKey(std::vector(prefix.begin(), prefix.end())); ZLOGI("Start sync data, sequenceId: 0x%{public}" PRIx64 "", sequenceId); - auto status = delegate_->Sync(syncDevices, DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY, + if (delegate_ == nullptr) { + ZLOGE("delegate_ == nullptr"); + return E_DB_ERROR; + } + auto status = delegate_->Sync(deviceList, DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY, [this, sequenceId](const std::map &devicesMap) { ZLOGI("Sync data finished, sequenceId: 0x%{public}" PRIx64 "", sequenceId); std::map result;