diff --git a/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_sync_callback.h b/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_sync_callback.h index 28550969806859972f8d2276b2d5a535eea07155..ecd2cb61fd6638fc5cbacb543d91e502cdb91e11 100644 --- a/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_sync_callback.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_sync_callback.h @@ -28,7 +28,7 @@ namespace DistributedKv { class IKvStoreSyncCallback : public IRemoteBroker { public: DECLARE_INTERFACE_DESCRIPTOR(u"OHOS.DistributedKv.IKvStoreSyncCallback"); - virtual void SyncCompleted(const std::map &results) = 0; + virtual void SyncCompleted(const std::map &results, const std::string &label) = 0; }; class KvStoreSyncCallbackStub : public IRemoteStub { @@ -41,7 +41,7 @@ class KvStoreSyncCallbackProxy : public IRemoteProxy { public: explicit KvStoreSyncCallbackProxy(const sptr &impl); ~KvStoreSyncCallbackProxy() = default; - void SyncCompleted(const std::map &results) override; + void SyncCompleted(const std::map &results, const std::string &label) override; private: static inline BrokerDelegator delegator_; }; diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/data_query.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/data_query.cpp index 6ff0a512d2d90d4ca95e61557a76a8a51aa9992a..fad378875a92beb1e1497ae2bd8fd6564149c9b6 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/data_query.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/data_query.cpp @@ -56,6 +56,7 @@ const std::string DataQuery::TYPE_BOOLEAN = "BOOL"; const std::string DataQuery::VALUE_TRUE = "true"; const std::string DataQuery::VALUE_FALSE = "false"; const std::string DataQuery::SUGGEST_INDEX = "^SUGGEST_INDEX"; +const std::string DataQuery::IN_KEYS = "^IN_KEYS"; constexpr int MAX_QUERY_LENGTH = 5 * 1024; // Max query string length 5k DataQuery::DataQuery() @@ -537,6 +538,24 @@ DataQuery& DataQuery::SetSuggestIndex(const std::string &index) return *this; } +DataQuery& DataQuery::InKeys(const std::vector &keys) +{ + str_.append(SPACE); + str_.append(IN_KEYS); + str_.append(SPACE); + str_.append(START_IN); + str_.append(SPACE); + for (std::string key : keys) { + if (ValidateField(key)) { + EscapeSpace(key); + str_.append(key); + str_.append(SPACE); + } + } + str_.append(END_IN); + return *this; +} + std::string DataQuery::ToString() const { if (str_.length() > MAX_QUERY_LENGTH) { diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp index 6482d72bc543312e16c9977f948ed1ecaf251982..e7ed04cae9327da1d2f4ef57c6395f9eaad9fce6 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp @@ -487,6 +487,7 @@ Status SingleKvStoreProxy::Sync(const std::vector &deviceIds, SyncM } return static_cast(reply.ReadInt32()); } + Status SingleKvStoreProxy::Sync(const std::vector &deviceIds, SyncMode mode, const std::string &query) { MessageParcel data; diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_sync_callback.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_sync_callback.cpp index c2ede3608d40b47e3bfd63a296bcf66c4ab66d3f..c1c5907ee34a4204ae060190f4fd8face94da31d 100644 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_sync_callback.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_sync_callback.cpp @@ -18,8 +18,11 @@ #include "ikvstore_sync_callback.h" #include #include +#include #include "log_print.h" #include "message_parcel.h" +#include "message_option.h" +#include "types.h" namespace OHOS { namespace DistributedKv { @@ -31,7 +34,7 @@ KvStoreSyncCallbackProxy::KvStoreSyncCallbackProxy(const sptr &im : IRemoteProxy(impl) {} -void KvStoreSyncCallbackProxy::SyncCompleted(const std::map &results) +void KvStoreSyncCallbackProxy::SyncCompleted(const std::map &results, const std::string &label) { MessageParcel data; MessageParcel reply; @@ -50,6 +53,10 @@ void KvStoreSyncCallbackProxy::SyncCompleted(const std::map return; } } + if (!data.WriteString(label)) { + ZLOGW("write label error."); + return; + } MessageOption mo { MessageOption::TF_SYNC }; int error = Remote()->SendRequest(SYNCCOMPLETED, data, reply, mo); if (error != 0) { @@ -78,7 +85,8 @@ int32_t KvStoreSyncCallbackStub::OnRemoteRequest(uint32_t code, MessageParcel &d results.insert(std::pair(data.ReadString(), static_cast(data.ReadInt32()))); } - SyncCompleted(results); + std::string label = data.ReadString(); + SyncCompleted(results, label); return 0; } default: diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp index 9800daadb4a31e3048ac3fcaf1714d1e63bb00ba..bae23a46de80a1ace038d5a8fe9c5980b18a46ec 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp @@ -15,22 +15,60 @@ #define LOG_TAG "KvStoreSyncCallbackClient" +#include +#include +#include "log_print.h" #include "kvstore_sync_callback_client.h" namespace OHOS { namespace DistributedKv { +std::map> KvStoreSyncCallbackClient::kvStoreSyncCallbackInfo_; +const std::string KvStoreSyncCallbackClient::CommonSyncCallbackLabel("CommonSyncCallbackLabel"); + +KvStoreSyncCallbackClient::KvStoreSyncCallbackClient() = default; + KvStoreSyncCallbackClient::KvStoreSyncCallbackClient(std::shared_ptr kvStoreSyncCallback) - : kvStoreSyncCallback_(kvStoreSyncCallback) -{} +{ + if (kvStoreSyncCallbackInfo_.find(CommonSyncCallbackLabel) == kvStoreSyncCallbackInfo_.end()) { + AddKvStoreSyncCallback(std::move(kvStoreSyncCallback), CommonSyncCallbackLabel); + } +} + +KvStoreSyncCallbackClient::~KvStoreSyncCallbackClient() = default; + +void KvStoreSyncCallbackClient::SyncCompleted(const std::map &results, const std::string &label) +{ + if (label.empty() && kvStoreSyncCallbackInfo_.find(CommonSyncCallbackLabel) != kvStoreSyncCallbackInfo_.end()) { + kvStoreSyncCallbackInfo_[CommonSyncCallbackLabel]->SyncCompleted(results); + } else if (kvStoreSyncCallbackInfo_.find(label) != kvStoreSyncCallbackInfo_.end()) { + ZLOGI("label = %{public}s", label.c_str()); + kvStoreSyncCallbackInfo_[label]->SyncCompleted(results); + } +} -KvStoreSyncCallbackClient::~KvStoreSyncCallbackClient() -{} +void KvStoreSyncCallbackClient::AddKvStoreSyncCallback(const std::shared_ptr kvStoreSyncCallback, + const std::string &label) +{ + std::mutex mtx; + if(kvStoreSyncCallbackInfo_.find(label) == kvStoreSyncCallbackInfo_.end()) { + mtx.lock(); + kvStoreSyncCallbackInfo_.insert({label, kvStoreSyncCallback}); + mtx.unlock(); + } +} -void KvStoreSyncCallbackClient::SyncCompleted(const std::map &results) +std::shared_ptr KvStoreSyncCallbackClient::GetCommonSyncCallback() { - if (kvStoreSyncCallback_ != nullptr) { - kvStoreSyncCallback_->SyncCompleted(results); + if (kvStoreSyncCallbackInfo_.find(CommonSyncCallbackLabel) != kvStoreSyncCallbackInfo_.end()) { + return kvStoreSyncCallbackInfo_[CommonSyncCallbackLabel]; + } else { + return nullptr; } } + +std::string KvStoreSyncCallbackClient::GetCommonSyncCallbackLabel() +{ + return CommonSyncCallbackLabel; +} } // namespace DistributedKv } // namespace OHOS diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h index c8ba5fa8640e0c152fb3237683b8100ebfaf90ed..0c50bca6f69c96171ec79c53ffcff726351c2db7 100644 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h @@ -24,16 +24,24 @@ namespace DistributedKv { class KvStoreSyncCallbackClient : public KvStoreSyncCallbackStub { public: + explicit KvStoreSyncCallbackClient(); + explicit KvStoreSyncCallbackClient(std::shared_ptr kvStoreSyncCallback); virtual ~KvStoreSyncCallbackClient(); - void SyncCompleted(const std::map &results) override; + void SyncCompleted(const std::map &results, const std::string &label) override; + + void AddKvStoreSyncCallback(const std::shared_ptr kvStoreSyncCallback, + const std::string &label); + std::shared_ptr GetCommonSyncCallback(); + + std::string GetCommonSyncCallbackLabel(); private: - std::shared_ptr kvStoreSyncCallback_; + static const std::string CommonSyncCallbackLabel; + static std::map> kvStoreSyncCallbackInfo_; }; - } // namespace DistributedKv } // namespace OHOS diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp index 6ee6c46bd36f91752613a17efe94f47048e97e2d..b0c06a4bfd21059232f7e0a83b01357515c0ee93 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp @@ -449,8 +449,8 @@ Status SingleKvStoreClient::GetSecurityLevel(SecurityLevel &securityLevel) const return Status::SERVER_UNAVAILABLE; } -Status SingleKvStoreClient::SyncWithCondition(const std::vector &deviceIds, SyncMode mode, - const DataQuery &query) +Status SingleKvStoreClient::SyncWithCondition(const std::vector &deviceIds, SyncMode mode, + const DataQuery &query, std::shared_ptr syncCallback) { if (kvStoreProxy_ == nullptr) { ZLOGE("singleKvstore proxy is nullptr."); @@ -460,6 +460,26 @@ Status SingleKvStoreClient::SyncWithCondition(const std::vector &de ZLOGW("deviceIds is empty."); return Status::INVALID_ARGUMENT; } + sptr ipcCallback; + if (syncCallback == nullptr) { + ipcCallback = new KvStoreSyncCallbackClient(); + if (ipcCallback->GetCommonSyncCallback() != nullptr) { + syncCallback = ipcCallback->GetCommonSyncCallback(); + ipcCallback = new (std::nothrow) KvStoreSyncCallbackClient(syncCallback); + } + } else { + ipcCallback = new (std::nothrow) KvStoreSyncCallbackClient(syncCallback); + for (const std::string &deviceId : deviceIds) { + std::string label = deviceId + query.ToString(); + ZLOGI("label = %{public}s", label.c_str()); + ipcCallback->AddKvStoreSyncCallback(syncCallback, label); + } + } + auto status = kvStoreProxy_->RegisterSyncCallback(ipcCallback); + if (status != Status::SUCCESS) { + ZLOGE("RegisterSyncCallback is not success."); + return status; + } return kvStoreProxy_->Sync(deviceIds, mode, query.ToString()); } diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h index 2a15f3c26abbf7c1c165183fc7b1e20091cb1c58..2558be2857a9cb05e758c7aa0cc9d21d1ed616d2 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h @@ -85,7 +85,7 @@ public: Status GetSecurityLevel(SecurityLevel &securityLevel) const override; Status SyncWithCondition(const std::vector &deviceIds, SyncMode mode, - const DataQuery &query) override; + const DataQuery &query, std::shared_ptr syncCallback) override; Status SubscribeWithQuery(const std::vector &deviceIds, const DataQuery &query) override; Status UnSubscribeWithQuery(const std::vector &deviceIds, const DataQuery &query) override; diff --git a/interfaces/innerkits/distributeddata/include/data_query.h b/interfaces/innerkits/distributeddata/include/data_query.h index dc8a36f865e285ed57fc1316c843444fdc9208cb..cebd86dd680536d5f6f410b0262944b3524cf57d 100755 --- a/interfaces/innerkits/distributeddata/include/data_query.h +++ b/interfaces/innerkits/distributeddata/include/data_query.h @@ -399,6 +399,13 @@ public: // This Query. KVSTORE_API DataQuery& SetSuggestIndex(const std::string &index); + // Select results with many keys. + // Parameters: + // keys: the vector of keys for query + // Return: + // This Query. + KVSTORE_API DataQuery& InKeys(const std::vector &keys); + // Get string representation // Return: // String representation of this query. @@ -511,6 +518,9 @@ public: // suggested index static const std::string SUGGEST_INDEX; + + //in keys + static const std::string IN_KEYS; private: std::string str_; diff --git a/interfaces/innerkits/distributeddata/include/single_kvstore.h b/interfaces/innerkits/distributeddata/include/single_kvstore.h index 312f7f1c86d2155c64b173d82b8476f5164e2c78..a42c16a1233d135d9f9d8a532f0eec158c21195d 100755 --- a/interfaces/innerkits/distributeddata/include/single_kvstore.h +++ b/interfaces/innerkits/distributeddata/include/single_kvstore.h @@ -185,7 +185,7 @@ public: * Status of this Sync operation. */ KVSTORE_API virtual Status SyncWithCondition(const std::vector &deviceIds, SyncMode mode, - const DataQuery &query) = 0; + const DataQuery &query, std::shared_ptr syncCallback) = 0; /* * Subscribe store with other devices consistently Synchronize the data which is satisfied with the condition. @@ -196,7 +196,7 @@ public: * Status of this Subscribe operation. */ KVSTORE_API virtual Status SubscribeWithQuery(const std::vector &deviceIds, - const DataQuery &query) = 0; + const DataQuery &query) = 0; /* * UnSubscribe store with other devices which is satisfied with the condition. diff --git a/services/distributeddataservice/app/src/query_helper.cpp b/services/distributeddataservice/app/src/query_helper.cpp index fa04942407074cd87a50993c190ff832c1f56e8e..e7ba59a1314b0817ea1d34f10e55c0bb6e0e56fb 100644 --- a/services/distributeddataservice/app/src/query_helper.cpp +++ b/services/distributeddataservice/app/src/query_helper.cpp @@ -22,6 +22,8 @@ #include "kvstore_utils.h" #include "data_query.h" #include "log_print.h" +#include +#include "types.h" namespace OHOS::DistributedKv { constexpr int QUERY_SKIP_SIZE = 1; @@ -29,11 +31,12 @@ constexpr int QUERY_WORD_SIZE = 2; constexpr int MAX_QUERY_LENGTH = 5 * 1024; // Max query string length 5k constexpr int MAX_QUERY_COMPLEXITY = 500; // Max query complexity 500 bool QueryHelper::hasPrefixKey_{}; +bool QueryHelper::hasInKeys_{}; std::string QueryHelper::deviceId_{}; DistributedDB::Query QueryHelper::StringToDbQuery(const std::string &query, bool &isSuccess) { - ZLOGI("query string length:%zu", query.length()); + ZLOGI("query string length:%{public}zu", query.length()); DistributedDB::Query dbQuery = DistributedDB::Query::Select(); if (query.size() == 0) { ZLOGI("Query string is empty."); @@ -47,6 +50,7 @@ DistributedDB::Query QueryHelper::StringToDbQuery(const std::string &query, bool } deviceId_.clear(); hasPrefixKey_ = (query.find(DataQuery::KEY_PREFIX) != std::string::npos); + hasInKeys_ = (query.find(DataQuery::IN_KEYS) != std::string::npos); size_t pos = query.find_first_not_of(DataQuery::SPACE); std::string inputTrim = (pos == std::string::npos) ? "" : query.substr(pos); std::regex regex(" "); @@ -127,6 +131,8 @@ void QueryHelper::HandleExtra(const std::vector &words, int &pointe HandleDeviceId(words, pointer, end, isSuccess, dbQuery); } else if (keyword == DataQuery::SUGGEST_INDEX) { HandleSetSuggestIndex(words, pointer, end, isSuccess, dbQuery); + } else if (keyword == DataQuery::IN_KEYS) { + HandleInKeys(words, pointer, end, isSuccess, dbQuery); } else { ZLOGE("Invalid keyword."); isSuccess = false; @@ -497,6 +503,29 @@ void QueryHelper::HandleKeyPrefix(const std::vector &words, int &po pointer += 2; // Pointer goes to next keyword } +void QueryHelper::HandleInKeys(const std::vector &words, int &pointer, + const int &end, bool &isSuccess, DistributedDB::Query &dbQuery) { + if (pointer + 2 > end || words.at(pointer + 1) != DataQuery::START_IN) { // This keyword has at least 2 params + ZLOGE("In not enough params."); + isSuccess = false; + return; + } + int elementPointer = pointer + 2; + const std::vector inKeys = GetInKeyList(words, elementPointer, end); + std::set> inDbKeys; + for(const std::string &inKey : inKeys) { + ZLOGI("inKey=%{public}s", inKey.c_str()); + std::vector dbKey; + dbKey.assign(inKey.begin(), inKey.end()); + inDbKeys.insert(dbKey); + } + int size = inDbKeys.size(); + ZLOGI("size of inKeys=%{public}d", size); + dbQuery.InKeys(inDbKeys); + isSuccess = true; + pointer = elementPointer + 1; // Pointer goes to next keyword +} + void QueryHelper::HandleSetSuggestIndex(const std::vector &words, int &pointer, const int &end, bool &isSuccess, DistributedDB::Query &dbQuery) { if (pointer + QUERY_SKIP_SIZE > end) { @@ -528,6 +557,16 @@ void QueryHelper::HandleDeviceId(const std::vector &words, int &poi } else { ZLOGD("Join deviceId with user specified prefixkey later."); } + if (!hasInKeys_) { + ZLOGD("DeviceId as the only inkey."); + std::set> inDbKeys; + std::vector dbKey; + dbKey.assign(deviceId_.begin(), deviceId_.begin()); + inDbKeys.insert(dbKey); + dbQuery.InKeys(inDbKeys); + } else { + ZLOGD("Join deviceId with user specified inkeys later."); + } isSuccess = true; pointer += 2; // Pointer goes to next keyword } @@ -667,4 +706,26 @@ std::vector QueryHelper::GetStringList(const std::vector(); } } + +std::vector QueryHelper::GetInKeyList(const std::vector &words, + int &elementPointer, const int &end) { + std::vector values; + bool isEndFound = false; + while (elementPointer <= end) { + if (words.at(elementPointer) == DataQuery::END_IN) { + isEndFound = true; + break; + } + std::string inKeyStr = deviceId_ + StringToString(words.at(elementPointer)); + values.push_back(inKeyStr); + ZLOGI("value=%{public}s", inKeyStr.c_str()); + elementPointer++; + } + if (isEndFound) { + return values; + } else { + ZLOGE("GetStringList failed."); + return std::vector(); + } +} } // namespace OHOS::DistributedKv diff --git a/services/distributeddataservice/app/src/query_helper.h b/services/distributeddataservice/app/src/query_helper.h index 158b25b551a2e7a4a58c1c0f9a832c2ca0c0277f..241b24db8f8fea8331dfc3f6205c6fb5761a1b80 100755 --- a/services/distributeddataservice/app/src/query_helper.h +++ b/services/distributeddataservice/app/src/query_helper.h @@ -17,6 +17,8 @@ #define QUERY_HELPER_H #include "query.h" +#include +#include "types.h" namespace OHOS::DistributedKv { class QueryHelper { @@ -25,6 +27,7 @@ public: private: static std::string deviceId_; static bool hasPrefixKey_; + static bool hasInKeys_; static void Handle(const std::vector &words, int &pointer, const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); static void HandleExtra(const std::vector &words, int &pointer, @@ -69,6 +72,8 @@ private: const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); static void HandleKeyPrefix(const std::vector &words, int &pointer, const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); + static void HandleInKeys(const std::vector &words, int &pointer, + const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); static void HandleSetSuggestIndex(const std::vector &words, int &pointer, const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); static void HandleDeviceId(const std::vector &words, int &pointer, @@ -84,6 +89,8 @@ private: int &elementPointer, const int &end); static std::vector GetStringList(const std::vector &words, int &elementPointer, const int &end); + static std::vector GetInKeyList(const std::vector &words, + int &elementPointer, const int &end); }; } // namespace OHOS::DistributedKv #endif // QUERY_HELPER_H diff --git a/services/distributeddataservice/app/src/single_kvstore_impl.cpp b/services/distributeddataservice/app/src/single_kvstore_impl.cpp index c9ba96be8593aac1875e61c971aff04d56324997..1807dc705de30532360a50daba7e4f68781349ff 100755 --- a/services/distributeddataservice/app/src/single_kvstore_impl.cpp +++ b/services/distributeddataservice/app/src/single_kvstore_impl.cpp @@ -128,6 +128,9 @@ Status SingleKvStoreImpl::Put(const Key &key, const Value &value) Status SingleKvStoreImpl::ConvertDbStatus(DistributedDB::DBStatus status) { switch (status) { + case DistributedDB::DBStatus::BUSY: + case DistributedDB::DBStatus::DB_ERROR: + return Status::DB_ERROR; case DistributedDB::DBStatus::OK: return Status::SUCCESS; case DistributedDB::DBStatus::INVALID_ARGS: @@ -307,19 +310,19 @@ int SingleKvStoreImpl::ConvertToDbObserverMode(const SubscribeType subscribeType return dbObserverMode; } - // Convert KvStore sync mode to DistributeDB sync mode. - DistributedDB::SyncMode SingleKvStoreImpl::ConvertToDbSyncMode(SyncMode syncMode) const - { - DistributedDB::SyncMode dbSyncMode; - if (syncMode == SyncMode::PUSH) { - dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY; - } else if (syncMode == SyncMode::PULL) { - dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY; - } else { - dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL; - } - return dbSyncMode; +// Convert KvStore sync mode to DistributeDB sync mode. +DistributedDB::SyncMode SingleKvStoreImpl::ConvertToDbSyncMode(SyncMode syncMode) const +{ + DistributedDB::SyncMode dbSyncMode; + if (syncMode == SyncMode::PUSH) { + dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY; + } else if (syncMode == SyncMode::PULL) { + dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY; + } else { + dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL; } + return dbSyncMode; +} Status SingleKvStoreImpl::UnSubscribeKvStore(const SubscribeType subscribeType, sptr observer) { @@ -467,34 +470,11 @@ Status SingleKvStoreImpl::GetEntriesWithQuery(const std::string &query, std::vec } return Status::SUCCESS; } - switch (status) { - case DistributedDB::DBStatus::BUSY: - case DistributedDB::DBStatus::DB_ERROR: { - return Status::DB_ERROR; - } - case DistributedDB::DBStatus::INVALID_ARGS: { - return Status::INVALID_ARGUMENT; - } - case DistributedDB::DBStatus::INVALID_QUERY_FORMAT: { - return Status::INVALID_QUERY_FORMAT; - } - case DistributedDB::DBStatus::INVALID_QUERY_FIELD: { - return Status::INVALID_QUERY_FIELD; - } - case DistributedDB::DBStatus::NOT_SUPPORT: { - return Status::NOT_SUPPORT; - } - case DistributedDB::DBStatus::NOT_FOUND: { - ZLOGI("DB return NOT_FOUND, no matching result. Return success with empty list."); - return Status::SUCCESS; - } - case DistributedDB::DBStatus::EKEYREVOKED_ERROR: // fallthrough - case DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR: - return Status::SECURITY_LEVEL_ERROR; - default: { - return Status::ERROR; - } + if (status == DistributedDB::DBStatus::NOT_FOUND) { + ZLOGI("DB return NOT_FOUND, no matching result. Return success with empty list."); + return Status::SUCCESS; } + return ConvertDbStatus(status); } void SingleKvStoreImpl::GetResultSet(const Key &prefixKey, @@ -815,9 +795,10 @@ Status SingleKvStoreImpl::AddSync(const std::vector &deviceIds, Syn { ZLOGD("start."); waitingSyncCount_++; + const std::string query; return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, std::bind(&SingleKvStoreImpl::DoSync, this, deviceIds, mode, std::placeholders::_1), - std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, query)); } Status SingleKvStoreImpl::AddSync(const std::vector &deviceIds, SyncMode mode, @@ -827,7 +808,7 @@ Status SingleKvStoreImpl::AddSync(const std::vector &deviceIds, Syn waitingSyncCount_++; return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, std::bind(&SingleKvStoreImpl::DoQuerySync, this, deviceIds, mode, query, std::placeholders::_1), - std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, query)); } uint32_t SingleKvStoreImpl::GetSyncDelayTime(uint32_t allowedDelayMs) const @@ -855,7 +836,8 @@ Status SingleKvStoreImpl::RemoveAllSyncOperation() return KvStoreSyncManager::GetInstance()->RemoveSyncOperation(reinterpret_cast(this)); } -void SingleKvStoreImpl::DoSyncComplete(const std::map &devicesSyncResult) +void SingleKvStoreImpl::DoSyncComplete(const std::map &devicesSyncResult, + const std::string &query) { DdsTrace trace(std::string("DdsTrace " LOG_TAG "::") + std::string(__FUNCTION__)); std::map resultMap; @@ -865,7 +847,16 @@ void SingleKvStoreImpl::DoSyncComplete(const std::mapSyncCompleted(resultMap); + for (const auto &deviceSyncResult : devicesSyncResult) { + auto deviceId = deviceSyncResult.first; + ZLOGI("deviceId = %{public}s", deviceId.c_str()); + // map UUID to nodeId + std::string deviceNodeId = KvStoreUtils::GetProviderInstance().ToNodeId(deviceId); + ZLOGI("deviceNodeId = %{public}s", deviceNodeId.c_str()); + std::string label = deviceNodeId + query; + ZLOGI("label = %{public}s", label.c_str()); + syncCallback_->SyncCompleted(resultMap, label); + } } } @@ -909,8 +900,7 @@ Status SingleKvStoreImpl::DoQuerySync(const std::vector &deviceIds, if (status == DistributedDB::DBStatus::BUSY) { if (syncRetries_ < KvStoreSyncManager::SYNC_RETRY_MAX_COUNT) { syncRetries_++; - auto addStatus = AddSync(deviceIds, mode, query, - KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS); + auto addStatus = AddSync(deviceIds, mode, query, KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS); if (addStatus == Status::SUCCESS) { return addStatus; } @@ -1033,7 +1023,7 @@ Status SingleKvStoreImpl::AddSubscribeWithQuery(const std::vector & ZLOGD("start."); return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, std::bind(&SingleKvStoreImpl::DoSubscribeWithQuery, this, deviceIds, query, std::placeholders::_1), - std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, "")); } Status SingleKvStoreImpl::AddUnSubscribeWithQuery(const std::vector &deviceIds, @@ -1042,7 +1032,7 @@ Status SingleKvStoreImpl::AddUnSubscribeWithQuery(const std::vector ZLOGD("start."); return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, std::bind(&SingleKvStoreImpl::DoUnSubscribeWithQuery, this, deviceIds, query, std::placeholders::_1), - std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, "")); } Status SingleKvStoreImpl::SubscribeWithQuery(const std::vector &deviceIds, diff --git a/services/distributeddataservice/app/src/single_kvstore_impl.h b/services/distributeddataservice/app/src/single_kvstore_impl.h index 124bd7e9f74d51abc238f10bb38de237040a8f4b..ef9a442f84c1e1245a19bb2a365c572ce89c8fcb 100755 --- a/services/distributeddataservice/app/src/single_kvstore_impl.h +++ b/services/distributeddataservice/app/src/single_kvstore_impl.h @@ -87,7 +87,8 @@ private: Status AddSync(const std::vector &deviceIds, SyncMode mode, const std::string &query, uint32_t delayMs); Status RemoveAllSyncOperation(); - void DoSyncComplete(const std::map &devicesSyncResult); + void DoSyncComplete(const std::map &devicesSyncResult, + const std::string &query); Status DoSync(const std::vector &deviceIds, SyncMode mode, const KvStoreSyncManager::SyncEnd &syncEnd); Status DoQuerySync(const std::vector &deviceIds, SyncMode mode, const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd);