From 376b155cca3606dccf8fd65be7e3402e03723e3d Mon Sep 17 00:00:00 2001 From: Hollokin Date: Tue, 8 Feb 2022 11:37:57 +0800 Subject: [PATCH 1/2] add inkeys and syncCallbacks --- .../include/ikvstore_sync_callback.h | 4 +- .../distributeddatafwk/src/data_query.cpp | 19 +++++ .../src/ikvstore_single.cpp | 1 + .../src/ikvstore_sync_callback.cpp | 12 ++- .../src/kvstore_sync_callback_client.cpp | 28 ++++++- .../src/kvstore_sync_callback_client.h | 8 +- .../src/single_kvstore_client.cpp | 25 +++++- .../src/single_kvstore_client.h | 2 +- .../distributeddata/include/data_query.h | 10 +++ .../distributeddata/include/single_kvstore.h | 4 +- .../app/src/query_helper.cpp | 63 +++++++++++++- .../app/src/query_helper.h | 7 ++ .../app/src/single_kvstore_impl.cpp | 84 ++++++++----------- .../app/src/single_kvstore_impl.h | 3 +- 14 files changed, 208 insertions(+), 62 deletions(-) diff --git a/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_sync_callback.h b/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_sync_callback.h index 285509698..ecd2cb61f 100644 --- a/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_sync_callback.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_sync_callback.h @@ -28,7 +28,7 @@ namespace DistributedKv { class IKvStoreSyncCallback : public IRemoteBroker { public: DECLARE_INTERFACE_DESCRIPTOR(u"OHOS.DistributedKv.IKvStoreSyncCallback"); - virtual void SyncCompleted(const std::map &results) = 0; + virtual void SyncCompleted(const std::map &results, const std::string &label) = 0; }; class KvStoreSyncCallbackStub : public IRemoteStub { @@ -41,7 +41,7 @@ class KvStoreSyncCallbackProxy : public IRemoteProxy { public: explicit KvStoreSyncCallbackProxy(const sptr &impl); ~KvStoreSyncCallbackProxy() = default; - void SyncCompleted(const std::map &results) override; + void SyncCompleted(const std::map &results, const std::string &label) override; private: static inline BrokerDelegator delegator_; }; diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/data_query.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/data_query.cpp index 6ff0a512d..fad378875 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/data_query.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/data_query.cpp @@ -56,6 +56,7 @@ const std::string DataQuery::TYPE_BOOLEAN = "BOOL"; const std::string DataQuery::VALUE_TRUE = "true"; const std::string DataQuery::VALUE_FALSE = "false"; const std::string DataQuery::SUGGEST_INDEX = "^SUGGEST_INDEX"; +const std::string DataQuery::IN_KEYS = "^IN_KEYS"; constexpr int MAX_QUERY_LENGTH = 5 * 1024; // Max query string length 5k DataQuery::DataQuery() @@ -537,6 +538,24 @@ DataQuery& DataQuery::SetSuggestIndex(const std::string &index) return *this; } +DataQuery& DataQuery::InKeys(const std::vector &keys) +{ + str_.append(SPACE); + str_.append(IN_KEYS); + str_.append(SPACE); + str_.append(START_IN); + str_.append(SPACE); + for (std::string key : keys) { + if (ValidateField(key)) { + EscapeSpace(key); + str_.append(key); + str_.append(SPACE); + } + } + str_.append(END_IN); + return *this; +} + std::string DataQuery::ToString() const { if (str_.length() > MAX_QUERY_LENGTH) { diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp index 6482d72bc..e7ed04cae 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp @@ -487,6 +487,7 @@ Status SingleKvStoreProxy::Sync(const std::vector &deviceIds, SyncM } return static_cast(reply.ReadInt32()); } + Status SingleKvStoreProxy::Sync(const std::vector &deviceIds, SyncMode mode, const std::string &query) { MessageParcel data; diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_sync_callback.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_sync_callback.cpp index c2ede3608..c1c5907ee 100644 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_sync_callback.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_sync_callback.cpp @@ -18,8 +18,11 @@ #include "ikvstore_sync_callback.h" #include #include +#include #include "log_print.h" #include "message_parcel.h" +#include "message_option.h" +#include "types.h" namespace OHOS { namespace DistributedKv { @@ -31,7 +34,7 @@ KvStoreSyncCallbackProxy::KvStoreSyncCallbackProxy(const sptr &im : IRemoteProxy(impl) {} -void KvStoreSyncCallbackProxy::SyncCompleted(const std::map &results) +void KvStoreSyncCallbackProxy::SyncCompleted(const std::map &results, const std::string &label) { MessageParcel data; MessageParcel reply; @@ -50,6 +53,10 @@ void KvStoreSyncCallbackProxy::SyncCompleted(const std::map return; } } + if (!data.WriteString(label)) { + ZLOGW("write label error."); + return; + } MessageOption mo { MessageOption::TF_SYNC }; int error = Remote()->SendRequest(SYNCCOMPLETED, data, reply, mo); if (error != 0) { @@ -78,7 +85,8 @@ int32_t KvStoreSyncCallbackStub::OnRemoteRequest(uint32_t code, MessageParcel &d results.insert(std::pair(data.ReadString(), static_cast(data.ReadInt32()))); } - SyncCompleted(results); + std::string label = data.ReadString(); + SyncCompleted(results, label); return 0; } default: diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp index 9800daadb..0e87a83ae 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp @@ -15,10 +15,14 @@ #define LOG_TAG "KvStoreSyncCallbackClient" +#include +#include +#include "log_print.h" #include "kvstore_sync_callback_client.h" namespace OHOS { namespace DistributedKv { +std::map> KvStoreSyncCallbackClient::kvStoreSyncCallbackInfo_; KvStoreSyncCallbackClient::KvStoreSyncCallbackClient(std::shared_ptr kvStoreSyncCallback) : kvStoreSyncCallback_(kvStoreSyncCallback) {} @@ -26,10 +30,30 @@ KvStoreSyncCallbackClient::KvStoreSyncCallbackClient(std::shared_ptr &results) +//void KvStoreSyncCallbackClient::SyncCompleted(const std::map &results) { +// if (kvStoreSyncCallback_ != nullptr) { +// kvStoreSyncCallback_->SyncCompleted(results); +// } +//} + +void KvStoreSyncCallbackClient::SyncCompleted(const std::map &results, const std::string &label) { - if (kvStoreSyncCallback_ != nullptr) { + if (label.empty() && kvStoreSyncCallback_ != nullptr) { kvStoreSyncCallback_->SyncCompleted(results); + } else if (kvStoreSyncCallbackInfo_.find(label) != kvStoreSyncCallbackInfo_.end()) { + ZLOGI("label = %{public}s", label.c_str()); + kvStoreSyncCallbackInfo_[label]->SyncCompleted(results); + } +} + +void KvStoreSyncCallbackClient::AddKvStoreSyncCallback(const std::shared_ptr kvStoreSyncCallback, + const std::string &label) +{ + std::mutex mtx; + if(kvStoreSyncCallbackInfo_.find(label) == kvStoreSyncCallbackInfo_.end()) { + mtx.lock(); + kvStoreSyncCallbackInfo_.insert({label, kvStoreSyncCallback}); + mtx.unlock(); } } } // namespace DistributedKv diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h index c8ba5fa86..70a9c26ab 100644 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h @@ -28,12 +28,16 @@ public: virtual ~KvStoreSyncCallbackClient(); - void SyncCompleted(const std::map &results) override; +// void SyncCompleted(const std::map &results); + void SyncCompleted(const std::map &results, const std::string &label) override; + + void AddKvStoreSyncCallback(const std::shared_ptr kvStoreSyncCallback, + const std::string &label); private: std::shared_ptr kvStoreSyncCallback_; + static std::map> kvStoreSyncCallbackInfo_; }; - } // namespace DistributedKv } // namespace OHOS diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp index 6ee6c46bd..9eab30058 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp @@ -449,8 +449,8 @@ Status SingleKvStoreClient::GetSecurityLevel(SecurityLevel &securityLevel) const return Status::SERVER_UNAVAILABLE; } -Status SingleKvStoreClient::SyncWithCondition(const std::vector &deviceIds, SyncMode mode, - const DataQuery &query) +Status SingleKvStoreClient::SyncWithCondition(const std::vector &deviceIds, SyncMode mode, + const DataQuery &query, std::shared_ptr syncCallback) { if (kvStoreProxy_ == nullptr) { ZLOGE("singleKvstore proxy is nullptr."); @@ -460,6 +460,27 @@ Status SingleKvStoreClient::SyncWithCondition(const std::vector &de ZLOGW("deviceIds is empty."); return Status::INVALID_ARGUMENT; } + if (syncCallback == nullptr) { + ZLOGE("syncCallback is nullptr."); + return kvStoreProxy_->Sync(deviceIds, mode, query.ToString()); + } + // remove storeId after remove SubscribeKvStore function in manager. currently reserve for convenience. + sptr ipcCallback = + new (std::nothrow) KvStoreSyncCallbackClient(syncCallback); + if (ipcCallback == nullptr) { + ZLOGW("new KvStoreSyncCallbackClient failed"); + return Status::ERROR; + } + for (const std::string &deviceId : deviceIds) { + std::string label = deviceId + query.ToString(); + ZLOGI("label = %{public}s", label.c_str()); + ipcCallback->AddKvStoreSyncCallback(syncCallback, label); + } + auto status = kvStoreProxy_->RegisterSyncCallback(ipcCallback); + if (status != Status::SUCCESS) { + ZLOGE("RegisterSyncCallback is not success."); + return status; + } return kvStoreProxy_->Sync(deviceIds, mode, query.ToString()); } diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h index 2a15f3c26..2558be285 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h @@ -85,7 +85,7 @@ public: Status GetSecurityLevel(SecurityLevel &securityLevel) const override; Status SyncWithCondition(const std::vector &deviceIds, SyncMode mode, - const DataQuery &query) override; + const DataQuery &query, std::shared_ptr syncCallback) override; Status SubscribeWithQuery(const std::vector &deviceIds, const DataQuery &query) override; Status UnSubscribeWithQuery(const std::vector &deviceIds, const DataQuery &query) override; diff --git a/interfaces/innerkits/distributeddata/include/data_query.h b/interfaces/innerkits/distributeddata/include/data_query.h index dc8a36f86..cebd86dd6 100755 --- a/interfaces/innerkits/distributeddata/include/data_query.h +++ b/interfaces/innerkits/distributeddata/include/data_query.h @@ -399,6 +399,13 @@ public: // This Query. KVSTORE_API DataQuery& SetSuggestIndex(const std::string &index); + // Select results with many keys. + // Parameters: + // keys: the vector of keys for query + // Return: + // This Query. + KVSTORE_API DataQuery& InKeys(const std::vector &keys); + // Get string representation // Return: // String representation of this query. @@ -511,6 +518,9 @@ public: // suggested index static const std::string SUGGEST_INDEX; + + //in keys + static const std::string IN_KEYS; private: std::string str_; diff --git a/interfaces/innerkits/distributeddata/include/single_kvstore.h b/interfaces/innerkits/distributeddata/include/single_kvstore.h index 312f7f1c8..a42c16a12 100755 --- a/interfaces/innerkits/distributeddata/include/single_kvstore.h +++ b/interfaces/innerkits/distributeddata/include/single_kvstore.h @@ -185,7 +185,7 @@ public: * Status of this Sync operation. */ KVSTORE_API virtual Status SyncWithCondition(const std::vector &deviceIds, SyncMode mode, - const DataQuery &query) = 0; + const DataQuery &query, std::shared_ptr syncCallback) = 0; /* * Subscribe store with other devices consistently Synchronize the data which is satisfied with the condition. @@ -196,7 +196,7 @@ public: * Status of this Subscribe operation. */ KVSTORE_API virtual Status SubscribeWithQuery(const std::vector &deviceIds, - const DataQuery &query) = 0; + const DataQuery &query) = 0; /* * UnSubscribe store with other devices which is satisfied with the condition. diff --git a/services/distributeddataservice/app/src/query_helper.cpp b/services/distributeddataservice/app/src/query_helper.cpp index fa0494240..e7ba59a13 100644 --- a/services/distributeddataservice/app/src/query_helper.cpp +++ b/services/distributeddataservice/app/src/query_helper.cpp @@ -22,6 +22,8 @@ #include "kvstore_utils.h" #include "data_query.h" #include "log_print.h" +#include +#include "types.h" namespace OHOS::DistributedKv { constexpr int QUERY_SKIP_SIZE = 1; @@ -29,11 +31,12 @@ constexpr int QUERY_WORD_SIZE = 2; constexpr int MAX_QUERY_LENGTH = 5 * 1024; // Max query string length 5k constexpr int MAX_QUERY_COMPLEXITY = 500; // Max query complexity 500 bool QueryHelper::hasPrefixKey_{}; +bool QueryHelper::hasInKeys_{}; std::string QueryHelper::deviceId_{}; DistributedDB::Query QueryHelper::StringToDbQuery(const std::string &query, bool &isSuccess) { - ZLOGI("query string length:%zu", query.length()); + ZLOGI("query string length:%{public}zu", query.length()); DistributedDB::Query dbQuery = DistributedDB::Query::Select(); if (query.size() == 0) { ZLOGI("Query string is empty."); @@ -47,6 +50,7 @@ DistributedDB::Query QueryHelper::StringToDbQuery(const std::string &query, bool } deviceId_.clear(); hasPrefixKey_ = (query.find(DataQuery::KEY_PREFIX) != std::string::npos); + hasInKeys_ = (query.find(DataQuery::IN_KEYS) != std::string::npos); size_t pos = query.find_first_not_of(DataQuery::SPACE); std::string inputTrim = (pos == std::string::npos) ? "" : query.substr(pos); std::regex regex(" "); @@ -127,6 +131,8 @@ void QueryHelper::HandleExtra(const std::vector &words, int &pointe HandleDeviceId(words, pointer, end, isSuccess, dbQuery); } else if (keyword == DataQuery::SUGGEST_INDEX) { HandleSetSuggestIndex(words, pointer, end, isSuccess, dbQuery); + } else if (keyword == DataQuery::IN_KEYS) { + HandleInKeys(words, pointer, end, isSuccess, dbQuery); } else { ZLOGE("Invalid keyword."); isSuccess = false; @@ -497,6 +503,29 @@ void QueryHelper::HandleKeyPrefix(const std::vector &words, int &po pointer += 2; // Pointer goes to next keyword } +void QueryHelper::HandleInKeys(const std::vector &words, int &pointer, + const int &end, bool &isSuccess, DistributedDB::Query &dbQuery) { + if (pointer + 2 > end || words.at(pointer + 1) != DataQuery::START_IN) { // This keyword has at least 2 params + ZLOGE("In not enough params."); + isSuccess = false; + return; + } + int elementPointer = pointer + 2; + const std::vector inKeys = GetInKeyList(words, elementPointer, end); + std::set> inDbKeys; + for(const std::string &inKey : inKeys) { + ZLOGI("inKey=%{public}s", inKey.c_str()); + std::vector dbKey; + dbKey.assign(inKey.begin(), inKey.end()); + inDbKeys.insert(dbKey); + } + int size = inDbKeys.size(); + ZLOGI("size of inKeys=%{public}d", size); + dbQuery.InKeys(inDbKeys); + isSuccess = true; + pointer = elementPointer + 1; // Pointer goes to next keyword +} + void QueryHelper::HandleSetSuggestIndex(const std::vector &words, int &pointer, const int &end, bool &isSuccess, DistributedDB::Query &dbQuery) { if (pointer + QUERY_SKIP_SIZE > end) { @@ -528,6 +557,16 @@ void QueryHelper::HandleDeviceId(const std::vector &words, int &poi } else { ZLOGD("Join deviceId with user specified prefixkey later."); } + if (!hasInKeys_) { + ZLOGD("DeviceId as the only inkey."); + std::set> inDbKeys; + std::vector dbKey; + dbKey.assign(deviceId_.begin(), deviceId_.begin()); + inDbKeys.insert(dbKey); + dbQuery.InKeys(inDbKeys); + } else { + ZLOGD("Join deviceId with user specified inkeys later."); + } isSuccess = true; pointer += 2; // Pointer goes to next keyword } @@ -667,4 +706,26 @@ std::vector QueryHelper::GetStringList(const std::vector(); } } + +std::vector QueryHelper::GetInKeyList(const std::vector &words, + int &elementPointer, const int &end) { + std::vector values; + bool isEndFound = false; + while (elementPointer <= end) { + if (words.at(elementPointer) == DataQuery::END_IN) { + isEndFound = true; + break; + } + std::string inKeyStr = deviceId_ + StringToString(words.at(elementPointer)); + values.push_back(inKeyStr); + ZLOGI("value=%{public}s", inKeyStr.c_str()); + elementPointer++; + } + if (isEndFound) { + return values; + } else { + ZLOGE("GetStringList failed."); + return std::vector(); + } +} } // namespace OHOS::DistributedKv diff --git a/services/distributeddataservice/app/src/query_helper.h b/services/distributeddataservice/app/src/query_helper.h index 158b25b55..241b24db8 100755 --- a/services/distributeddataservice/app/src/query_helper.h +++ b/services/distributeddataservice/app/src/query_helper.h @@ -17,6 +17,8 @@ #define QUERY_HELPER_H #include "query.h" +#include +#include "types.h" namespace OHOS::DistributedKv { class QueryHelper { @@ -25,6 +27,7 @@ public: private: static std::string deviceId_; static bool hasPrefixKey_; + static bool hasInKeys_; static void Handle(const std::vector &words, int &pointer, const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); static void HandleExtra(const std::vector &words, int &pointer, @@ -69,6 +72,8 @@ private: const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); static void HandleKeyPrefix(const std::vector &words, int &pointer, const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); + static void HandleInKeys(const std::vector &words, int &pointer, + const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); static void HandleSetSuggestIndex(const std::vector &words, int &pointer, const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); static void HandleDeviceId(const std::vector &words, int &pointer, @@ -84,6 +89,8 @@ private: int &elementPointer, const int &end); static std::vector GetStringList(const std::vector &words, int &elementPointer, const int &end); + static std::vector GetInKeyList(const std::vector &words, + int &elementPointer, const int &end); }; } // namespace OHOS::DistributedKv #endif // QUERY_HELPER_H diff --git a/services/distributeddataservice/app/src/single_kvstore_impl.cpp b/services/distributeddataservice/app/src/single_kvstore_impl.cpp index c9ba96be8..1807dc705 100755 --- a/services/distributeddataservice/app/src/single_kvstore_impl.cpp +++ b/services/distributeddataservice/app/src/single_kvstore_impl.cpp @@ -128,6 +128,9 @@ Status SingleKvStoreImpl::Put(const Key &key, const Value &value) Status SingleKvStoreImpl::ConvertDbStatus(DistributedDB::DBStatus status) { switch (status) { + case DistributedDB::DBStatus::BUSY: + case DistributedDB::DBStatus::DB_ERROR: + return Status::DB_ERROR; case DistributedDB::DBStatus::OK: return Status::SUCCESS; case DistributedDB::DBStatus::INVALID_ARGS: @@ -307,19 +310,19 @@ int SingleKvStoreImpl::ConvertToDbObserverMode(const SubscribeType subscribeType return dbObserverMode; } - // Convert KvStore sync mode to DistributeDB sync mode. - DistributedDB::SyncMode SingleKvStoreImpl::ConvertToDbSyncMode(SyncMode syncMode) const - { - DistributedDB::SyncMode dbSyncMode; - if (syncMode == SyncMode::PUSH) { - dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY; - } else if (syncMode == SyncMode::PULL) { - dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY; - } else { - dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL; - } - return dbSyncMode; +// Convert KvStore sync mode to DistributeDB sync mode. +DistributedDB::SyncMode SingleKvStoreImpl::ConvertToDbSyncMode(SyncMode syncMode) const +{ + DistributedDB::SyncMode dbSyncMode; + if (syncMode == SyncMode::PUSH) { + dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY; + } else if (syncMode == SyncMode::PULL) { + dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY; + } else { + dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL; } + return dbSyncMode; +} Status SingleKvStoreImpl::UnSubscribeKvStore(const SubscribeType subscribeType, sptr observer) { @@ -467,34 +470,11 @@ Status SingleKvStoreImpl::GetEntriesWithQuery(const std::string &query, std::vec } return Status::SUCCESS; } - switch (status) { - case DistributedDB::DBStatus::BUSY: - case DistributedDB::DBStatus::DB_ERROR: { - return Status::DB_ERROR; - } - case DistributedDB::DBStatus::INVALID_ARGS: { - return Status::INVALID_ARGUMENT; - } - case DistributedDB::DBStatus::INVALID_QUERY_FORMAT: { - return Status::INVALID_QUERY_FORMAT; - } - case DistributedDB::DBStatus::INVALID_QUERY_FIELD: { - return Status::INVALID_QUERY_FIELD; - } - case DistributedDB::DBStatus::NOT_SUPPORT: { - return Status::NOT_SUPPORT; - } - case DistributedDB::DBStatus::NOT_FOUND: { - ZLOGI("DB return NOT_FOUND, no matching result. Return success with empty list."); - return Status::SUCCESS; - } - case DistributedDB::DBStatus::EKEYREVOKED_ERROR: // fallthrough - case DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR: - return Status::SECURITY_LEVEL_ERROR; - default: { - return Status::ERROR; - } + if (status == DistributedDB::DBStatus::NOT_FOUND) { + ZLOGI("DB return NOT_FOUND, no matching result. Return success with empty list."); + return Status::SUCCESS; } + return ConvertDbStatus(status); } void SingleKvStoreImpl::GetResultSet(const Key &prefixKey, @@ -815,9 +795,10 @@ Status SingleKvStoreImpl::AddSync(const std::vector &deviceIds, Syn { ZLOGD("start."); waitingSyncCount_++; + const std::string query; return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, std::bind(&SingleKvStoreImpl::DoSync, this, deviceIds, mode, std::placeholders::_1), - std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, query)); } Status SingleKvStoreImpl::AddSync(const std::vector &deviceIds, SyncMode mode, @@ -827,7 +808,7 @@ Status SingleKvStoreImpl::AddSync(const std::vector &deviceIds, Syn waitingSyncCount_++; return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, std::bind(&SingleKvStoreImpl::DoQuerySync, this, deviceIds, mode, query, std::placeholders::_1), - std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, query)); } uint32_t SingleKvStoreImpl::GetSyncDelayTime(uint32_t allowedDelayMs) const @@ -855,7 +836,8 @@ Status SingleKvStoreImpl::RemoveAllSyncOperation() return KvStoreSyncManager::GetInstance()->RemoveSyncOperation(reinterpret_cast(this)); } -void SingleKvStoreImpl::DoSyncComplete(const std::map &devicesSyncResult) +void SingleKvStoreImpl::DoSyncComplete(const std::map &devicesSyncResult, + const std::string &query) { DdsTrace trace(std::string("DdsTrace " LOG_TAG "::") + std::string(__FUNCTION__)); std::map resultMap; @@ -865,7 +847,16 @@ void SingleKvStoreImpl::DoSyncComplete(const std::mapSyncCompleted(resultMap); + for (const auto &deviceSyncResult : devicesSyncResult) { + auto deviceId = deviceSyncResult.first; + ZLOGI("deviceId = %{public}s", deviceId.c_str()); + // map UUID to nodeId + std::string deviceNodeId = KvStoreUtils::GetProviderInstance().ToNodeId(deviceId); + ZLOGI("deviceNodeId = %{public}s", deviceNodeId.c_str()); + std::string label = deviceNodeId + query; + ZLOGI("label = %{public}s", label.c_str()); + syncCallback_->SyncCompleted(resultMap, label); + } } } @@ -909,8 +900,7 @@ Status SingleKvStoreImpl::DoQuerySync(const std::vector &deviceIds, if (status == DistributedDB::DBStatus::BUSY) { if (syncRetries_ < KvStoreSyncManager::SYNC_RETRY_MAX_COUNT) { syncRetries_++; - auto addStatus = AddSync(deviceIds, mode, query, - KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS); + auto addStatus = AddSync(deviceIds, mode, query, KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS); if (addStatus == Status::SUCCESS) { return addStatus; } @@ -1033,7 +1023,7 @@ Status SingleKvStoreImpl::AddSubscribeWithQuery(const std::vector & ZLOGD("start."); return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, std::bind(&SingleKvStoreImpl::DoSubscribeWithQuery, this, deviceIds, query, std::placeholders::_1), - std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, "")); } Status SingleKvStoreImpl::AddUnSubscribeWithQuery(const std::vector &deviceIds, @@ -1042,7 +1032,7 @@ Status SingleKvStoreImpl::AddUnSubscribeWithQuery(const std::vector ZLOGD("start."); return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, std::bind(&SingleKvStoreImpl::DoUnSubscribeWithQuery, this, deviceIds, query, std::placeholders::_1), - std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, "")); } Status SingleKvStoreImpl::SubscribeWithQuery(const std::vector &deviceIds, diff --git a/services/distributeddataservice/app/src/single_kvstore_impl.h b/services/distributeddataservice/app/src/single_kvstore_impl.h index 124bd7e9f..ef9a442f8 100755 --- a/services/distributeddataservice/app/src/single_kvstore_impl.h +++ b/services/distributeddataservice/app/src/single_kvstore_impl.h @@ -87,7 +87,8 @@ private: Status AddSync(const std::vector &deviceIds, SyncMode mode, const std::string &query, uint32_t delayMs); Status RemoveAllSyncOperation(); - void DoSyncComplete(const std::map &devicesSyncResult); + void DoSyncComplete(const std::map &devicesSyncResult, + const std::string &query); Status DoSync(const std::vector &deviceIds, SyncMode mode, const KvStoreSyncManager::SyncEnd &syncEnd); Status DoQuerySync(const std::vector &deviceIds, SyncMode mode, const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd); -- Gitee From 2ca0fed613ff032b07b9dde96e8e6e4882438e7e Mon Sep 17 00:00:00 2001 From: Hollokin Date: Wed, 9 Feb 2022 14:56:17 +0800 Subject: [PATCH 2/2] modify registerSyncCallback in SyncWithCondition, add commonSyncCallbackLabel in KvStoreSyncCallbackClient --- .../src/kvstore_sync_callback_client.cpp | 38 +++++++++++++------ .../src/kvstore_sync_callback_client.h | 10 +++-- .../src/single_kvstore_client.cpp | 27 +++++++------ 3 files changed, 46 insertions(+), 29 deletions(-) diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp index 0e87a83ae..bae23a46d 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp @@ -23,23 +23,23 @@ namespace OHOS { namespace DistributedKv { std::map> KvStoreSyncCallbackClient::kvStoreSyncCallbackInfo_; -KvStoreSyncCallbackClient::KvStoreSyncCallbackClient(std::shared_ptr kvStoreSyncCallback) - : kvStoreSyncCallback_(kvStoreSyncCallback) -{} +const std::string KvStoreSyncCallbackClient::CommonSyncCallbackLabel("CommonSyncCallbackLabel"); -KvStoreSyncCallbackClient::~KvStoreSyncCallbackClient() -{} +KvStoreSyncCallbackClient::KvStoreSyncCallbackClient() = default; -//void KvStoreSyncCallbackClient::SyncCompleted(const std::map &results) { -// if (kvStoreSyncCallback_ != nullptr) { -// kvStoreSyncCallback_->SyncCompleted(results); -// } -//} +KvStoreSyncCallbackClient::KvStoreSyncCallbackClient(std::shared_ptr kvStoreSyncCallback) +{ + if (kvStoreSyncCallbackInfo_.find(CommonSyncCallbackLabel) == kvStoreSyncCallbackInfo_.end()) { + AddKvStoreSyncCallback(std::move(kvStoreSyncCallback), CommonSyncCallbackLabel); + } +} + +KvStoreSyncCallbackClient::~KvStoreSyncCallbackClient() = default; void KvStoreSyncCallbackClient::SyncCompleted(const std::map &results, const std::string &label) { - if (label.empty() && kvStoreSyncCallback_ != nullptr) { - kvStoreSyncCallback_->SyncCompleted(results); + if (label.empty() && kvStoreSyncCallbackInfo_.find(CommonSyncCallbackLabel) != kvStoreSyncCallbackInfo_.end()) { + kvStoreSyncCallbackInfo_[CommonSyncCallbackLabel]->SyncCompleted(results); } else if (kvStoreSyncCallbackInfo_.find(label) != kvStoreSyncCallbackInfo_.end()) { ZLOGI("label = %{public}s", label.c_str()); kvStoreSyncCallbackInfo_[label]->SyncCompleted(results); @@ -56,5 +56,19 @@ void KvStoreSyncCallbackClient::AddKvStoreSyncCallback(const std::shared_ptr KvStoreSyncCallbackClient::GetCommonSyncCallback() +{ + if (kvStoreSyncCallbackInfo_.find(CommonSyncCallbackLabel) != kvStoreSyncCallbackInfo_.end()) { + return kvStoreSyncCallbackInfo_[CommonSyncCallbackLabel]; + } else { + return nullptr; + } +} + +std::string KvStoreSyncCallbackClient::GetCommonSyncCallbackLabel() +{ + return CommonSyncCallbackLabel; +} } // namespace DistributedKv } // namespace OHOS diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h index 70a9c26ab..0c50bca6f 100644 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h @@ -24,18 +24,22 @@ namespace DistributedKv { class KvStoreSyncCallbackClient : public KvStoreSyncCallbackStub { public: + explicit KvStoreSyncCallbackClient(); + explicit KvStoreSyncCallbackClient(std::shared_ptr kvStoreSyncCallback); virtual ~KvStoreSyncCallbackClient(); -// void SyncCompleted(const std::map &results); - void SyncCompleted(const std::map &results, const std::string &label) override; void AddKvStoreSyncCallback(const std::shared_ptr kvStoreSyncCallback, const std::string &label); + + std::shared_ptr GetCommonSyncCallback(); + + std::string GetCommonSyncCallbackLabel(); private: - std::shared_ptr kvStoreSyncCallback_; + static const std::string CommonSyncCallbackLabel; static std::map> kvStoreSyncCallbackInfo_; }; } // namespace DistributedKv diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp index 9eab30058..b0c06a4bf 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp @@ -460,21 +460,20 @@ Status SingleKvStoreClient::SyncWithCondition(const std::vector &de ZLOGW("deviceIds is empty."); return Status::INVALID_ARGUMENT; } + sptr ipcCallback; if (syncCallback == nullptr) { - ZLOGE("syncCallback is nullptr."); - return kvStoreProxy_->Sync(deviceIds, mode, query.ToString()); - } - // remove storeId after remove SubscribeKvStore function in manager. currently reserve for convenience. - sptr ipcCallback = - new (std::nothrow) KvStoreSyncCallbackClient(syncCallback); - if (ipcCallback == nullptr) { - ZLOGW("new KvStoreSyncCallbackClient failed"); - return Status::ERROR; - } - for (const std::string &deviceId : deviceIds) { - std::string label = deviceId + query.ToString(); - ZLOGI("label = %{public}s", label.c_str()); - ipcCallback->AddKvStoreSyncCallback(syncCallback, label); + ipcCallback = new KvStoreSyncCallbackClient(); + if (ipcCallback->GetCommonSyncCallback() != nullptr) { + syncCallback = ipcCallback->GetCommonSyncCallback(); + ipcCallback = new (std::nothrow) KvStoreSyncCallbackClient(syncCallback); + } + } else { + ipcCallback = new (std::nothrow) KvStoreSyncCallbackClient(syncCallback); + for (const std::string &deviceId : deviceIds) { + std::string label = deviceId + query.ToString(); + ZLOGI("label = %{public}s", label.c_str()); + ipcCallback->AddKvStoreSyncCallback(syncCallback, label); + } } auto status = kvStoreProxy_->RegisterSyncCallback(ipcCallback); if (status != Status::SUCCESS) { -- Gitee