From 1231cc86feea8125d3cc5281a77b05d5771d13f3 Mon Sep 17 00:00:00 2001 From: Hollokin Date: Wed, 16 Feb 2022 17:14:35 +0800 Subject: [PATCH] kvstoreclient Signed-off-by: Hollokin --- frameworks/common/concurrent_map.h | 6 +++++- .../src/kvstore_sync_callback_client.cpp | 6 +++++- .../src/single_kvstore_client.cpp | 20 ++++++++++++------- .../src/single_kvstore_client.h | 5 ++--- .../distributeddatafwk/src/sync_observer.cpp | 9 +++++++++ .../distributeddatafwk/src/sync_observer.h | 4 +++- 6 files changed, 37 insertions(+), 13 deletions(-) diff --git a/frameworks/common/concurrent_map.h b/frameworks/common/concurrent_map.h index 7ac32370d..06f0fd9b5 100644 --- a/frameworks/common/concurrent_map.h +++ b/frameworks/common/concurrent_map.h @@ -30,7 +30,11 @@ public: using const_reference = typename std::map<_Key, _Tp>::const_reference; ConcurrentMap() = default; - ~ConcurrentMap() = default; + ~ConcurrentMap() + { + Clear(); + } + ConcurrentMap(const ConcurrentMap &other) { operator=(std::move(other)); diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp index a0d112be6..028362ba5 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp @@ -22,7 +22,11 @@ namespace OHOS { namespace DistributedKv { -KvStoreSyncCallbackClient::~KvStoreSyncCallbackClient() = default; +KvStoreSyncCallbackClient::~KvStoreSyncCallbackClient() +{ + syncCallbackInfo_.Clear(); +} + void KvStoreSyncCallbackClient::SyncCompleted(const std::map &results, uint64_t sequenceId) { auto finded = syncCallbackInfo_.Find(sequenceId); diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp index e8e1cbf9a..80761bd12 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp @@ -26,8 +26,14 @@ namespace OHOS::DistributedKv { SingleKvStoreClient::SingleKvStoreClient(sptr kvStoreProxy, const std::string &storeId) - :kvStoreProxy_(kvStoreProxy), storeId_(storeId), syncObserver_(std::make_shared()) + : kvStoreProxy_(kvStoreProxy), storeId_(storeId), syncCallbackClient_(new KvStoreSyncCallbackClient()), + syncObserver_(std::make_shared()) +{} + +SingleKvStoreClient::~SingleKvStoreClient() { + kvStoreProxy_->UnRegisterSyncCallback(); + syncObserver_->Clean(); } StoreId SingleKvStoreClient::GetStoreId() const @@ -178,7 +184,7 @@ Status SingleKvStoreClient::Sync(const std::vector &deviceIds, Sync return Status::INVALID_ARGUMENT; } uint64_t sequenceId = KvStoreUtils::GenerateSequenceId(); - syncCallbackClient_.AddSyncCallback(syncObserver_, sequenceId); + syncCallbackClient_->AddSyncCallback(syncObserver_, sequenceId); RegisterCallback(); return kvStoreProxy_->Sync(deviceIds, mode, allowedDelayMs, sequenceId); } @@ -321,7 +327,7 @@ Status SingleKvStoreClient::RegisterCallback() if (isRegisterSyncCallback_) { return Status::SUCCESS; } - auto status = kvStoreProxy_->RegisterSyncCallback(&syncCallbackClient_); + auto status = kvStoreProxy_->RegisterSyncCallback(syncCallbackClient_); if (status != Status::SUCCESS) { ZLOGE("RegisterSyncCallback is not success."); return status; @@ -479,10 +485,10 @@ Status SingleKvStoreClient::SyncWithCondition(const std::vector &de } uint64_t sequenceId = KvStoreUtils::GenerateSequenceId(); if (callback != nullptr) { - syncCallbackClient_.AddSyncCallback(callback, sequenceId); + syncCallbackClient_->AddSyncCallback(callback, sequenceId); RegisterCallback(); } else { - syncCallbackClient_.AddSyncCallback(syncObserver_, sequenceId); + syncCallbackClient_->AddSyncCallback(syncObserver_, sequenceId); } return kvStoreProxy_->Sync(deviceIds, mode, query.ToString(), sequenceId); } @@ -498,7 +504,7 @@ Status SingleKvStoreClient::SubscribeWithQuery(const std::vector &d return Status::INVALID_ARGUMENT; } uint64_t sequenceId = KvStoreUtils::GenerateSequenceId(); - syncCallbackClient_.AddSyncCallback(syncObserver_, sequenceId); + syncCallbackClient_->AddSyncCallback(syncObserver_, sequenceId); RegisterCallback(); return kvStoreProxy_->Subscribe(deviceIds, query.ToString(), sequenceId); } @@ -514,7 +520,7 @@ Status SingleKvStoreClient::UnsubscribeWithQuery(const std::vector return Status::INVALID_ARGUMENT; } uint64_t sequenceId = KvStoreUtils::GenerateSequenceId(); - syncCallbackClient_.AddSyncCallback(syncObserver_, sequenceId); + syncCallbackClient_->AddSyncCallback(syncObserver_, sequenceId); return kvStoreProxy_->UnSubscribe(deviceIds, query.ToString(), sequenceId); } diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h index 9a6db3f04..c6794ee24 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h @@ -28,8 +28,7 @@ class SingleKvStoreClient : public SingleKvStore { public: explicit SingleKvStoreClient(sptr kvStoreProxy, const std::string &storeId); - ~SingleKvStoreClient() - {} + ~SingleKvStoreClient(); StoreId GetStoreId() const override; @@ -106,7 +105,7 @@ private: std::map> registeredObservers_; std::mutex observerMapMutex_; std::string storeId_; - KvStoreSyncCallbackClient syncCallbackClient_; + sptr syncCallbackClient_; std::shared_ptr syncObserver_; bool isRegisterSyncCallback_ = false; std::mutex registerCallbackMutex_; diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.cpp index 608383ff8..7ea7c1f25 100644 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.cpp @@ -20,20 +20,29 @@ SyncObserver::SyncObserver(const std::vector lock(mutex_); + callbacks_.clear(); +} + bool SyncObserver::Add(const std::shared_ptr callback) { + std::lock_guard lock(mutex_); callbacks_.push_back(callback); return true; } bool SyncObserver::Clean() { + std::lock_guard lock(mutex_); callbacks_.clear(); return true; } void SyncObserver::SyncCompleted(const std::map &results) { + std::lock_guard lock(mutex_); for (auto &callback : callbacks_) { callback->SyncCompleted(results); } diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.h b/frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.h index 02051b805..d077eefa2 100644 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.h @@ -18,6 +18,7 @@ #include #include +#include #include "kvstore_sync_callback.h" namespace OHOS::DistributedKv { @@ -25,7 +26,7 @@ class SyncObserver : public KvStoreSyncCallback { public: explicit SyncObserver(const std::vector> &callbacks); - SyncObserver() = default; + SyncObserver(); virtual ~SyncObserver() = default; @@ -36,6 +37,7 @@ public: void SyncCompleted(const std::map &results) override; private: + std::recursive_mutex mutex_; std::vector> callbacks_; }; } -- Gitee