diff --git a/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_single.h b/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_single.h index 996697f7d31b0a51c22936888127df938f801d8a..2d333cb63e4d03f551265c3e90c2191153824664 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_single.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_single.h @@ -57,8 +57,8 @@ public: SETCAPABILITYRANGE, SETSECURITLEVEL, SYNC_WITH_CONDITION, - SUBSCRIBE_WITH_QUERY, - UNSUBSCRIBE_WITH_QUERY, + SUBSCRIBE, + UNSUBSCRIBE, SINGLE_CMD_LAST, }; DECLARE_INTERFACE_DESCRIPTOR(u"OHOS.DistributedKv.ISingleKvStore") @@ -75,7 +75,8 @@ public: std::function)> callback) = 0; virtual Status CloseResultSet(sptr resultSet) = 0; virtual Status GetCountWithQuery(const std::string &query, int &result) = 0; - virtual Status Sync(const std::vector &deviceIds, SyncMode mode, uint32_t allowedDelayMs) = 0; + virtual Status Sync(const std::vector &deviceIds, SyncMode mode, + uint32_t allowedDelayMs, uint64_t sequenceId) = 0; virtual Status RemoveDeviceData(const std::string &device) = 0; virtual Status RegisterSyncCallback(sptr callback) = 0; virtual Status UnRegisterSyncCallback() = 0; @@ -89,9 +90,12 @@ public: virtual Status SetCapabilityRange(const std::vector &localLabels, const std::vector &remoteSupportLabels) = 0; virtual Status GetSecurityLevel(SecurityLevel &securityLevel) = 0; - virtual Status Sync(const std::vector &deviceIds, SyncMode mode, const std::string &query) = 0; - virtual Status SubscribeWithQuery(const std::vector &deviceIds, const std::string &query) = 0; - virtual Status UnSubscribeWithQuery(const std::vector &deviceIds, const std::string &query) = 0; + virtual Status Sync(const std::vector &deviceIds, SyncMode mode, + const std::string &query, uint64_t sequenceId) = 0; + virtual Status Subscribe(const std::vector &deviceIds, const std::string &query, + uint64_t sequenceId) = 0; + virtual Status UnSubscribe(const std::vector &deviceIds, const std::string &query, + uint64_t sequenceId) = 0; }; class SingleKvStoreStub : public IRemoteStub { @@ -125,8 +129,8 @@ private: int OnSecurityLevelRequest(MessageParcel &data, MessageParcel &reply); int OnSyncRequest(MessageParcel &data, MessageParcel &reply); - int OnSubscribeWithQueryRequest(MessageParcel &data, MessageParcel &reply); - int OnUnSubscribeWithQueryRequest(MessageParcel &data, MessageParcel &reply); + int OnSubscribeRequest(MessageParcel &data, MessageParcel &reply); + int OnUnSubscribeRequest(MessageParcel &data, MessageParcel &reply); int WriteEntriesParcelable(MessageParcel &reply, Status status, std::vector entries, int bufferSize); int GetTotalEntriesSize(std::vector entries); @@ -157,8 +161,8 @@ private: [SETCAPABILITYRANGE] = &SingleKvStoreStub::OnCapabilityRangeRequest, [SETSECURITLEVEL] = &SingleKvStoreStub::OnSecurityLevelRequest, [SYNC_WITH_CONDITION] = &SingleKvStoreStub::OnSyncRequest, - [SUBSCRIBE_WITH_QUERY] = &SingleKvStoreStub::OnSubscribeWithQueryRequest, - [UNSUBSCRIBE_WITH_QUERY] = &SingleKvStoreStub::OnUnSubscribeWithQueryRequest, + [SUBSCRIBE] = &SingleKvStoreStub::OnSubscribeRequest, + [UNSUBSCRIBE] = &SingleKvStoreStub::OnUnSubscribeRequest, }; }; @@ -178,8 +182,10 @@ public: std::function)> callback); virtual Status CloseResultSet(sptr resultSet); virtual Status GetCountWithQuery(const std::string &query, int &result); - virtual Status Sync(const std::vector &deviceIds, SyncMode mode, uint32_t allowedDelayMs); - virtual Status Sync(const std::vector &deviceIds, SyncMode mode, const std::string &query); + virtual Status Sync(const std::vector &deviceIds, SyncMode mode, uint32_t allowedDelayMs, + uint64_t sequenceId); + virtual Status Sync(const std::vector &deviceIds, SyncMode mode, + const std::string &query, uint64_t sequenceId); virtual Status RemoveDeviceData(const std::string &device); virtual Status RegisterSyncCallback(sptr callback); virtual Status UnRegisterSyncCallback(); @@ -193,8 +199,10 @@ public: virtual Status SetCapabilityRange(const std::vector &localLabels, const std::vector &remoteSupportLabels); virtual Status GetSecurityLevel(SecurityLevel &securityLevel); - virtual Status SubscribeWithQuery(const std::vector &deviceIds, const std::string &query); - virtual Status UnSubscribeWithQuery(const std::vector &deviceIds, const std::string &query); + virtual Status Subscribe(const std::vector &deviceIds, const std::string &query, + uint64_t sequenceId); + virtual Status UnSubscribe(const std::vector &deviceIds, const std::string &query, + uint64_t sequenceId); private: static inline BrokerDelegator delegator_; }; diff --git a/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_sync_callback.h b/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_sync_callback.h index 28550969806859972f8d2276b2d5a535eea07155..343760a8cfab008fb9d22ccedeaa962aa2604561 100644 --- a/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_sync_callback.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_sync_callback.h @@ -28,7 +28,7 @@ namespace DistributedKv { class IKvStoreSyncCallback : public IRemoteBroker { public: DECLARE_INTERFACE_DESCRIPTOR(u"OHOS.DistributedKv.IKvStoreSyncCallback"); - virtual void SyncCompleted(const std::map &results) = 0; + virtual void SyncCompleted(const std::map &results, uint64_t sequenceId) = 0; }; class KvStoreSyncCallbackStub : public IRemoteStub { @@ -41,7 +41,7 @@ class KvStoreSyncCallbackProxy : public IRemoteProxy { public: explicit KvStoreSyncCallbackProxy(const sptr &impl); ~KvStoreSyncCallbackProxy() = default; - void SyncCompleted(const std::map &results) override; + void SyncCompleted(const std::map &results, uint64_t sequenceId) override; private: static inline BrokerDelegator delegator_; }; diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/data_query.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/data_query.cpp index 6ff0a512d2d90d4ca95e61557a76a8a51aa9992a..c8fd4ea18ff8ce071900e85c15ed6fc1b12d1a1c 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/data_query.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/data_query.cpp @@ -56,6 +56,7 @@ const std::string DataQuery::TYPE_BOOLEAN = "BOOL"; const std::string DataQuery::VALUE_TRUE = "true"; const std::string DataQuery::VALUE_FALSE = "false"; const std::string DataQuery::SUGGEST_INDEX = "^SUGGEST_INDEX"; +const std::string DataQuery::IN_KEYS = "^IN_KEYS"; constexpr int MAX_QUERY_LENGTH = 5 * 1024; // Max query string length 5k DataQuery::DataQuery() @@ -64,6 +65,7 @@ DataQuery::DataQuery() DataQuery& DataQuery::Reset() { str_ = ""; + inkeysFlag_ = false; return *this; } @@ -537,6 +539,33 @@ DataQuery& DataQuery::SetSuggestIndex(const std::string &index) return *this; } +DataQuery& DataQuery::InKeys(const std::vector &keys) +{ + if (keys.empty()) { + ZLOGE("Invalid number param"); + return *this; + } + if (inkeysFlag_) { + ZLOGE("cannot set inkeys more than once"); + return *this; + } + inkeysFlag_ = true; + str_.append(SPACE); + str_.append(IN_KEYS); + str_.append(SPACE); + str_.append(START_IN); + str_.append(SPACE); + for (std::string key : keys) { + if (ValidateField(key)) { + EscapeSpace(key); + str_.append(key); + str_.append(SPACE); + } + } + str_.append(END_IN); + return *this; +} + std::string DataQuery::ToString() const { if (str_.length() > MAX_QUERY_LENGTH) { diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp index a2fd8e0f555a912aca260af1f7d26c9f3200efe4..5a15c0a14515f25dcb7833759d5f0d231973f559 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp @@ -462,7 +462,7 @@ Status SingleKvStoreProxy::CloseResultSet(sptr resultSetPtr) } Status SingleKvStoreProxy::Sync(const std::vector &deviceIds, SyncMode mode, - uint32_t allowedDelayMs) + uint32_t allowedDelayMs, uint64_t sequenceId) { MessageParcel data; MessageParcel reply; @@ -487,7 +487,9 @@ Status SingleKvStoreProxy::Sync(const std::vector &deviceIds, SyncM } return static_cast(reply.ReadInt32()); } -Status SingleKvStoreProxy::Sync(const std::vector &deviceIds, SyncMode mode, const std::string &query) + +Status SingleKvStoreProxy::Sync(const std::vector &deviceIds, SyncMode mode, + const std::string &query, uint64_t sequenceId) { MessageParcel data; if (!data.WriteInterfaceToken(SingleKvStoreProxy::GetDescriptor())) { @@ -504,6 +506,10 @@ Status SingleKvStoreProxy::Sync(const std::vector &deviceIds, SyncM ZLOGE("write query fail"); return Status::IPC_ERROR; } + if (!data.WriteUint64(sequenceId)) { + ZLOGE("write label fail"); + return Status::IPC_ERROR; + } MessageParcel reply; int32_t error = Remote()->SendRequest(SYNC_WITH_CONDITION, data, reply, mo); if (error != 0) { @@ -848,7 +854,8 @@ Status SingleKvStoreProxy::GetSecurityLevel(SecurityLevel &securityLevel) return status; } -Status SingleKvStoreProxy::SubscribeWithQuery(const std::vector &deviceIds, const std::string &query) +Status SingleKvStoreProxy::Subscribe(const std::vector &deviceIds, const std::string &query, + uint64_t sequenceId) { MessageParcel data; if (!data.WriteInterfaceToken(SingleKvStoreProxy::GetDescriptor())) { @@ -863,9 +870,13 @@ Status SingleKvStoreProxy::SubscribeWithQuery(const std::vector &de ZLOGE("write query fail"); return Status::IPC_ERROR; } + if (!data.WriteUint64(sequenceId)) { + ZLOGE("write query fail"); + return Status::IPC_ERROR; + } MessageParcel reply; MessageOption mo { MessageOption::TF_SYNC }; - int32_t error = Remote()->SendRequest(SUBSCRIBE_WITH_QUERY, data, reply, mo); + int32_t error = Remote()->SendRequest(SUBSCRIBE, data, reply, mo); if (error != 0) { ZLOGE("SendRequest returned %d", error); return Status::IPC_ERROR; @@ -873,7 +884,8 @@ Status SingleKvStoreProxy::SubscribeWithQuery(const std::vector &de return static_cast(reply.ReadInt32()); } -Status SingleKvStoreProxy::UnSubscribeWithQuery(const std::vector &deviceIds, const std::string &query) +Status SingleKvStoreProxy::UnSubscribe(const std::vector &deviceIds, const std::string &query, + uint64_t sequenceId) { MessageParcel data; if (!data.WriteInterfaceToken(SingleKvStoreProxy::GetDescriptor())) { @@ -888,9 +900,13 @@ Status SingleKvStoreProxy::UnSubscribeWithQuery(const std::vector & ZLOGE("write query fail"); return Status::IPC_ERROR; } + if (!data.WriteUint64(sequenceId)) { + ZLOGE("write query fail"); + return Status::IPC_ERROR; + } MessageParcel reply; MessageOption mo { MessageOption::TF_SYNC }; - int32_t error = Remote()->SendRequest(UNSUBSCRIBE_WITH_QUERY, data, reply, mo); + int32_t error = Remote()->SendRequest(UNSUBSCRIBE, data, reply, mo); if (error != 0) { ZLOGE("SendRequest returned %d", error); return Status::IPC_ERROR; @@ -1218,7 +1234,8 @@ int SingleKvStoreStub::SyncOnRemote(MessageParcel &data, MessageParcel &reply) } auto mode = static_cast(data.ReadInt32()); auto allowedDelayMs = static_cast(data.ReadInt32()); - Status status = Sync(devices, mode, allowedDelayMs); + auto sequenceId = data.ReadUint64(); + Status status = Sync(devices, mode, allowedDelayMs, sequenceId); if (!reply.WriteInt32(static_cast(status))) { ZLOGW("write sync status fail"); return -1; @@ -1593,7 +1610,8 @@ int SingleKvStoreStub::OnSyncRequest(MessageParcel &data, MessageParcel &reply) } auto mode = static_cast(data.ReadInt32()); auto query = data.ReadString(); - Status status = Sync(devices, mode, query); + auto sequenceId = data.ReadUint64(); + Status status = Sync(devices, mode, query, sequenceId); if (!reply.WriteInt32(static_cast(status))) { ZLOGE("write sync status fail"); return -1; @@ -1619,7 +1637,7 @@ int SingleKvStoreStub::OnRemoteRequest(uint32_t code, MessageParcel &data, Messa } } -int SingleKvStoreStub::OnSubscribeWithQueryRequest(MessageParcel &data, MessageParcel &reply) +int SingleKvStoreStub::OnSubscribeRequest(MessageParcel &data, MessageParcel &reply) { std::vector devices; if (!data.ReadStringVector(&devices) || devices.empty()) { @@ -1631,7 +1649,8 @@ int SingleKvStoreStub::OnSubscribeWithQueryRequest(MessageParcel &data, MessageP return 0; } auto query = data.ReadString(); - Status status = SubscribeWithQuery(devices, query); + auto sequenceId = data.ReadUint64(); + Status status = Subscribe(devices, query, sequenceId); if (!reply.WriteInt32(static_cast(status))) { ZLOGE("write sync status fail"); return -1; @@ -1639,7 +1658,7 @@ int SingleKvStoreStub::OnSubscribeWithQueryRequest(MessageParcel &data, MessageP return 0; } -int SingleKvStoreStub::OnUnSubscribeWithQueryRequest(MessageParcel &data, MessageParcel &reply) +int SingleKvStoreStub::OnUnSubscribeRequest(MessageParcel &data, MessageParcel &reply) { std::vector devices; if (!data.ReadStringVector(&devices) || devices.empty()) { @@ -1651,7 +1670,8 @@ int SingleKvStoreStub::OnUnSubscribeWithQueryRequest(MessageParcel &data, Messag return 0; } auto query = data.ReadString(); - Status status = UnSubscribeWithQuery(devices, query); + auto sequenceId = data.ReadUint64(); + Status status = UnSubscribe(devices, query, sequenceId); if (!reply.WriteInt32(static_cast(status))) { ZLOGE("write sync status fail"); return -1; diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_sync_callback.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_sync_callback.cpp index c2ede3608d40b47e3bfd63a296bcf66c4ab66d3f..863547b13042a86746893254a40d3461e6b73d5a 100644 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_sync_callback.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_sync_callback.cpp @@ -18,8 +18,11 @@ #include "ikvstore_sync_callback.h" #include #include +#include #include "log_print.h" #include "message_parcel.h" +#include "message_option.h" +#include "types.h" namespace OHOS { namespace DistributedKv { @@ -31,7 +34,7 @@ KvStoreSyncCallbackProxy::KvStoreSyncCallbackProxy(const sptr &im : IRemoteProxy(impl) {} -void KvStoreSyncCallbackProxy::SyncCompleted(const std::map &results) +void KvStoreSyncCallbackProxy::SyncCompleted(const std::map &results, uint64_t sequenceId) { MessageParcel data; MessageParcel reply; @@ -50,6 +53,10 @@ void KvStoreSyncCallbackProxy::SyncCompleted(const std::map return; } } + if (!data.WriteUint64(sequenceId)) { + ZLOGW("write label error."); + return; + } MessageOption mo { MessageOption::TF_SYNC }; int error = Remote()->SendRequest(SYNCCOMPLETED, data, reply, mo); if (error != 0) { @@ -78,7 +85,8 @@ int32_t KvStoreSyncCallbackStub::OnRemoteRequest(uint32_t code, MessageParcel &d results.insert(std::pair(data.ReadString(), static_cast(data.ReadInt32()))); } - SyncCompleted(results); + uint64_t sequenceId = data.ReadUint64(); + SyncCompleted(results, sequenceId); return 0; } default: diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp index 9800daadb4a31e3048ac3fcaf1714d1e63bb00ba..a0d112be65e546f1e7a6f17777bfdf62398012cd 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp @@ -15,22 +15,35 @@ #define LOG_TAG "KvStoreSyncCallbackClient" +#include +#include +#include "log_print.h" #include "kvstore_sync_callback_client.h" namespace OHOS { namespace DistributedKv { -KvStoreSyncCallbackClient::KvStoreSyncCallbackClient(std::shared_ptr kvStoreSyncCallback) - : kvStoreSyncCallback_(kvStoreSyncCallback) -{} - -KvStoreSyncCallbackClient::~KvStoreSyncCallbackClient() -{} +KvStoreSyncCallbackClient::~KvStoreSyncCallbackClient() = default; +void KvStoreSyncCallbackClient::SyncCompleted(const std::map &results, uint64_t sequenceId) +{ + auto finded = syncCallbackInfo_.Find(sequenceId); + if (finded.first) { + finded.second->SyncCompleted(results); + DeleteSyncCallback(sequenceId); + } +} -void KvStoreSyncCallbackClient::SyncCompleted(const std::map &results) +void KvStoreSyncCallbackClient::AddSyncCallback(const std::shared_ptr callback, + uint64_t sequenceId) { - if (kvStoreSyncCallback_ != nullptr) { - kvStoreSyncCallback_->SyncCompleted(results); + auto inserted = syncCallbackInfo_.Insert(sequenceId, callback); + if (!inserted) { + ZLOGE("The sequeuceId %{public}" PRIu64 "is repeat!", sequenceId); } } + +void KvStoreSyncCallbackClient::DeleteSyncCallback(uint64_t sequenceId) +{ + syncCallbackInfo_.Erase(sequenceId); +} } // namespace DistributedKv -} // namespace OHOS +} // namespace OHOS \ No newline at end of file diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h index c8ba5fa8640e0c152fb3237683b8100ebfaf90ed..52ac38e68695ba2b55957339089298c2b99e25af 100644 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.h @@ -16,6 +16,8 @@ #ifndef KVSTORE_SYNC_CALLBACK_CLIENT_H #define KVSTORE_SYNC_CALLBACK_CLIENT_H +#include +#include "concurrent_map.h" #include "ikvstore_sync_callback.h" #include "kvstore_sync_callback.h" @@ -24,16 +26,18 @@ namespace DistributedKv { class KvStoreSyncCallbackClient : public KvStoreSyncCallbackStub { public: - explicit KvStoreSyncCallbackClient(std::shared_ptr kvStoreSyncCallback); - + KvStoreSyncCallbackClient() = default; virtual ~KvStoreSyncCallbackClient(); - void SyncCompleted(const std::map &results) override; + void SyncCompleted(const std::map &results, uint64_t sequenceId) override; + + void AddSyncCallback(const std::shared_ptr callback, + uint64_t sequenceId); + void DeleteSyncCallback(uint64_t sequenceId); private: - std::shared_ptr kvStoreSyncCallback_; + ConcurrentMap> syncCallbackInfo_; }; - } // namespace DistributedKv } // namespace OHOS diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp index 6ee6c46bd36f91752613a17efe94f47048e97e2d..e8e1cbf9a9010b1a4b8850f95302fd46032ebd95 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp @@ -22,11 +22,13 @@ #include "kvstore_resultset_client.h" #include "kvstore_sync_callback_client.h" #include "log_print.h" +#include "kvstore_utils.h" namespace OHOS::DistributedKv { SingleKvStoreClient::SingleKvStoreClient(sptr kvStoreProxy, const std::string &storeId) - : kvStoreProxy_(kvStoreProxy), storeId_(storeId) -{} + :kvStoreProxy_(kvStoreProxy), storeId_(storeId), syncObserver_(std::make_shared()) +{ +} StoreId SingleKvStoreClient::GetStoreId() const { @@ -167,7 +169,6 @@ Status SingleKvStoreClient::GetCountWithQuery(const DataQuery &query, int &resul Status SingleKvStoreClient::Sync(const std::vector &deviceIds, SyncMode mode, uint32_t allowedDelayMs) { DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__), true); - if (kvStoreProxy_ == nullptr) { ZLOGE("kvstore proxy is nullptr."); return Status::SERVER_UNAVAILABLE; @@ -176,7 +177,10 @@ Status SingleKvStoreClient::Sync(const std::vector &deviceIds, Sync ZLOGW("deviceIds is empty."); return Status::INVALID_ARGUMENT; } - return kvStoreProxy_->Sync(deviceIds, mode, allowedDelayMs); + uint64_t sequenceId = KvStoreUtils::GenerateSequenceId(); + syncCallbackClient_.AddSyncCallback(syncObserver_, sequenceId); + RegisterCallback(); + return kvStoreProxy_->Sync(deviceIds, mode, allowedDelayMs, sequenceId); } Status SingleKvStoreClient::RemoveDeviceData(const std::string &device) @@ -304,20 +308,33 @@ Status SingleKvStoreClient::RegisterSyncCallback(std::shared_ptr ipcCallback = - new (std::nothrow) KvStoreSyncCallbackClient(callback); - if (ipcCallback == nullptr) { - ZLOGW("new KvStoreSyncCallbackClient failed"); - return Status::ERROR; + syncObserver_->Add(callback); + return Status::SUCCESS; +} + +Status SingleKvStoreClient::RegisterCallback() +{ + if (isRegisterSyncCallback_) { + return Status::SUCCESS; + } + std::lock_guard lg(registerCallbackMutex_); + if (isRegisterSyncCallback_) { + return Status::SUCCESS; } - return kvStoreProxy_->RegisterSyncCallback(ipcCallback); + auto status = kvStoreProxy_->RegisterSyncCallback(&syncCallbackClient_); + if (status != Status::SUCCESS) { + ZLOGE("RegisterSyncCallback is not success."); + return status; + } + isRegisterSyncCallback_ = true; + return Status::SUCCESS; } Status SingleKvStoreClient::UnRegisterSyncCallback() { ZLOGI("begin."); - return kvStoreProxy_->UnRegisterSyncCallback(); + syncObserver_->Clean(); + return Status::SUCCESS; } Status SingleKvStoreClient::PutBatch(const std::vector &entries) @@ -450,7 +467,7 @@ Status SingleKvStoreClient::GetSecurityLevel(SecurityLevel &securityLevel) const } Status SingleKvStoreClient::SyncWithCondition(const std::vector &deviceIds, SyncMode mode, - const DataQuery &query) + const DataQuery &query, std::shared_ptr callback) { if (kvStoreProxy_ == nullptr) { ZLOGE("singleKvstore proxy is nullptr."); @@ -460,10 +477,17 @@ Status SingleKvStoreClient::SyncWithCondition(const std::vector &de ZLOGW("deviceIds is empty."); return Status::INVALID_ARGUMENT; } - return kvStoreProxy_->Sync(deviceIds, mode, query.ToString()); + uint64_t sequenceId = KvStoreUtils::GenerateSequenceId(); + if (callback != nullptr) { + syncCallbackClient_.AddSyncCallback(callback, sequenceId); + RegisterCallback(); + } else { + syncCallbackClient_.AddSyncCallback(syncObserver_, sequenceId); + } + return kvStoreProxy_->Sync(deviceIds, mode, query.ToString(), sequenceId); } -Status SingleKvStoreClient::SubscribeWithQuery(const std::vector& deviceIds, const DataQuery& query) +Status SingleKvStoreClient::SubscribeWithQuery(const std::vector &deviceIds, const DataQuery &query) { if (kvStoreProxy_ == nullptr) { ZLOGE("singleKvstore proxy is nullptr."); @@ -473,10 +497,13 @@ Status SingleKvStoreClient::SubscribeWithQuery(const std::vector& d ZLOGW("deviceIds is empty."); return Status::INVALID_ARGUMENT; } - return kvStoreProxy_->SubscribeWithQuery(deviceIds, query.ToString()); + uint64_t sequenceId = KvStoreUtils::GenerateSequenceId(); + syncCallbackClient_.AddSyncCallback(syncObserver_, sequenceId); + RegisterCallback(); + return kvStoreProxy_->Subscribe(deviceIds, query.ToString(), sequenceId); } -Status SingleKvStoreClient::UnSubscribeWithQuery(const std::vector& deviceIds, const DataQuery& query) +Status SingleKvStoreClient::UnsubscribeWithQuery(const std::vector &deviceIds, const DataQuery &query) { if (kvStoreProxy_ == nullptr) { ZLOGE("singleKvstore proxy is nullptr."); @@ -486,7 +513,9 @@ Status SingleKvStoreClient::UnSubscribeWithQuery(const std::vector& ZLOGW("deviceIds is empty."); return Status::INVALID_ARGUMENT; } - return kvStoreProxy_->UnSubscribeWithQuery(deviceIds, query.ToString()); + uint64_t sequenceId = KvStoreUtils::GenerateSequenceId(); + syncCallbackClient_.AddSyncCallback(syncObserver_, sequenceId); + return kvStoreProxy_->UnSubscribe(deviceIds, query.ToString(), sequenceId); } Status SingleKvStoreClient::GetKvStoreSnapshot(std::shared_ptr observer, diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h index 2a15f3c26abbf7c1c165183fc7b1e20091cb1c58..9a6db3f0458fc76b471580c547a5efdc7f1d7fb3 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h @@ -16,9 +16,12 @@ #ifndef DISTRIBUTEDDATAMGR2_SINGLE_KVSTORE_CLIENT_H #define DISTRIBUTEDDATAMGR2_SINGLE_KVSTORE_CLIENT_H +#include #include "data_query.h" #include "ikvstore_single.h" #include "single_kvstore.h" +#include "kvstore_sync_callback_client.h" +#include "sync_observer.h" namespace OHOS::DistributedKv { class SingleKvStoreClient : public SingleKvStore { @@ -64,6 +67,8 @@ public: Status RegisterSyncCallback(std::shared_ptr callback) override; + Status RegisterCallback(); + Status UnRegisterSyncCallback() override; Status PutBatch(const std::vector &entries) override; @@ -84,11 +89,11 @@ public: const std::vector &remoteSupportLabels) const override; Status GetSecurityLevel(SecurityLevel &securityLevel) const override; - Status SyncWithCondition(const std::vector &deviceIds, SyncMode mode, - const DataQuery &query) override; + Status SyncWithCondition(const std::vector &deviceIds, SyncMode mode, const DataQuery &query, + std::shared_ptr syncCallback = nullptr) override; Status SubscribeWithQuery(const std::vector &deviceIds, const DataQuery &query) override; - Status UnSubscribeWithQuery(const std::vector &deviceIds, const DataQuery &query) override; + Status UnsubscribeWithQuery(const std::vector &deviceIds, const DataQuery &query) override; Status GetKvStoreSnapshot(std::shared_ptr observer, std::shared_ptr &snapshot) const override; Status ReleaseKvStoreSnapshot(std::shared_ptr &snapshot) override; @@ -101,6 +106,10 @@ private: std::map> registeredObservers_; std::mutex observerMapMutex_; std::string storeId_; + KvStoreSyncCallbackClient syncCallbackClient_; + std::shared_ptr syncObserver_; + bool isRegisterSyncCallback_ = false; + std::mutex registerCallbackMutex_; }; } // namespace OHOS::DistributedKv #endif // DISTRIBUTEDDATAMGR2_SINGLE_KVSTORE_CLIENT_H diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..608383ff870175abf64005e5c301b466ee4f55d8 --- /dev/null +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.cpp @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "sync_observer.h" + +namespace OHOS::DistributedKv { +SyncObserver::SyncObserver(const std::vector> &callbacks) + :callbacks_(callbacks) +{}; + +bool SyncObserver::Add(const std::shared_ptr callback) +{ + callbacks_.push_back(callback); + return true; +} + +bool SyncObserver::Clean() +{ + callbacks_.clear(); + return true; +} + +void SyncObserver::SyncCompleted(const std::map &results) +{ + for (auto &callback : callbacks_) { + callback->SyncCompleted(results); + } +} +} diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.h b/frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.h new file mode 100644 index 0000000000000000000000000000000000000000..02051b8050afb503931947eb3b98e5aadc91d476 --- /dev/null +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DISTRIBUTEDDATAMGR_DATAMGR_SYNC_OBSERVER_H +#define DISTRIBUTEDDATAMGR_DATAMGR_SYNC_OBSERVER_H + +#include +#include +#include "kvstore_sync_callback.h" + +namespace OHOS::DistributedKv { +class SyncObserver : public KvStoreSyncCallback { +public: + explicit SyncObserver(const std::vector> &callbacks); + + SyncObserver() = default; + + virtual ~SyncObserver() = default; + + bool Add(const std::shared_ptr callback); + + bool Clean(); + + void SyncCompleted(const std::map &results) override; + +private: + std::vector> callbacks_; +}; +} +#endif // DISTRIBUTEDDATAMGR_DATAMGR_SYNC_OBSERVER_H diff --git a/frameworks/innerkitsimpl/distributeddatafwk/test/unittest/single_kvstore_client_query_test.cpp b/frameworks/innerkitsimpl/distributeddatafwk/test/unittest/single_kvstore_client_query_test.cpp index 3e4f83c94affe05dc0be92d841c8ac4608c2b722..ae6fff015f673e5b44e0741ce79ac86442520320 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/test/unittest/single_kvstore_client_query_test.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/test/unittest/single_kvstore_client_query_test.cpp @@ -589,3 +589,19 @@ HWTEST_F(SingleKvStoreClientQueryTest, TestQueryC018, TestSize.Level1) EXPECT_TRUE(query.ToString().length() > 0); ZLOGD("TestQueryC018 end"); } + +/** +* @tc.name: TestQueryC019 +* @tc.desc: Query InKeys. +* @tc.type: FUNC +* @tc.require: AR000GOHO7 +* @tc.author: taoyuxin +*/ +HWTEST_F(SingleKvStoreClientQueryTest, TestQueryC019, TestSize.Level1) +{ + ZLOGD("TestQueryC019 start"); + DataQuery query; + query.InKeys( {"test_field_name"} ); + EXPECT_TRUE(query.ToString().length() > 0); + ZLOGD("TestQueryC019 end"); +} diff --git a/frameworks/innerkitsimpl/distributeddatafwk/test/unittest/single_kvstore_client_test.cpp b/frameworks/innerkitsimpl/distributeddatafwk/test/unittest/single_kvstore_client_test.cpp index 08a1a52e4e4831208f55548f7d780ab44e40e3e5..846d227d7076480f8b0072fc177eb992c3591b20 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/test/unittest/single_kvstore_client_test.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/test/unittest/single_kvstore_client_test.cpp @@ -1101,7 +1101,7 @@ HWTEST_F(SingleKvStoreClientTest, UnSubscribeWithQuery001, TestSize.Level1) std::vector deviceIds = {"invalid_device_id1", "invalid_device_id2"}; DataQuery dataQuery; dataQuery.KeyPrefix("name"); - auto unSubscribeStatus = singleKvStorePtr->UnSubscribeWithQuery(deviceIds, dataQuery); + auto unSubscribeStatus = singleKvStorePtr->UnsubscribeWithQuery(deviceIds, dataQuery); EXPECT_NE(unSubscribeStatus, Status::SUCCESS) << "sync device should not return success"; } diff --git a/interfaces/innerkits/distributeddata/BUILD.gn b/interfaces/innerkits/distributeddata/BUILD.gn index 9e39436065f5018bfd63403c268dd404f2f403fa..b53e0e84edfbc339ca7c4166b8ab3761c4b1712c 100755 --- a/interfaces/innerkits/distributeddata/BUILD.gn +++ b/interfaces/innerkits/distributeddata/BUILD.gn @@ -72,6 +72,7 @@ ohos_shared_library("distributeddata_inner") { "../../../frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_snapshot_client.cpp", "../../../frameworks/innerkitsimpl/distributeddatafwk/src/kvstore_sync_callback_client.cpp", "../../../frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp", + "../../../frameworks/innerkitsimpl/distributeddatafwk/src/sync_observer.cpp", "include/types.h", ] @@ -91,6 +92,7 @@ ohos_shared_library("distributeddata_inner") { deps = [ "//foundation/distributeddatamgr/distributeddatamgr/services/distributeddataservice/adapter:distributeddata_adapter", + "//foundation/distributeddatamgr/distributeddatamgr/services/distributeddataservice/adapter/utils:distributeddata_utils_static", "//utils/native/base:utils", ] external_deps = [ diff --git a/interfaces/innerkits/distributeddata/include/data_query.h b/interfaces/innerkits/distributeddata/include/data_query.h index dc8a36f865e285ed57fc1316c843444fdc9208cb..f99592bde9259c748eba3eb2d0de2d81e01f47f0 100755 --- a/interfaces/innerkits/distributeddata/include/data_query.h +++ b/interfaces/innerkits/distributeddata/include/data_query.h @@ -399,6 +399,13 @@ public: // This Query. KVSTORE_API DataQuery& SetSuggestIndex(const std::string &index); + // Select results with many keys. + // Parameters: + // keys: the vector of keys for query + // Return: + // This Query. + KVSTORE_API DataQuery& InKeys(const std::vector &keys); + // Get string representation // Return: // String representation of this query. @@ -511,9 +518,14 @@ public: // suggested index static const std::string SUGGEST_INDEX; + + // in keys + static const std::string IN_KEYS; private: std::string str_; + bool inkeysFlag_; + template void AppendCommon(const std::string &keyword, const std::string &fieldType, std::string &field, const T &value); diff --git a/interfaces/innerkits/distributeddata/include/single_kvstore.h b/interfaces/innerkits/distributeddata/include/single_kvstore.h index 312f7f1c86d2155c64b173d82b8476f5164e2c78..e816575c3cfad7126f8ef07ae438e9e5634e50fc 100755 --- a/interfaces/innerkits/distributeddata/include/single_kvstore.h +++ b/interfaces/innerkits/distributeddata/include/single_kvstore.h @@ -185,7 +185,7 @@ public: * Status of this Sync operation. */ KVSTORE_API virtual Status SyncWithCondition(const std::vector &deviceIds, SyncMode mode, - const DataQuery &query) = 0; + const DataQuery &query, std::shared_ptr syncCallback = nullptr) = 0; /* * Subscribe store with other devices consistently Synchronize the data which is satisfied with the condition. @@ -196,7 +196,7 @@ public: * Status of this Subscribe operation. */ KVSTORE_API virtual Status SubscribeWithQuery(const std::vector &deviceIds, - const DataQuery &query) = 0; + const DataQuery &query) = 0; /* * UnSubscribe store with other devices which is satisfied with the condition. @@ -206,8 +206,8 @@ public: * Return: * Status of this UnSubscribe operation. */ - KVSTORE_API virtual Status UnSubscribeWithQuery(const std::vector &deviceIds, - const DataQuery &query) = 0; + KVSTORE_API virtual Status UnsubscribeWithQuery(const std::vector &deviceIds, + const DataQuery &query) = 0; protected: // control this store. diff --git a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp index 259fdb2b0185e3fc25c7f664e5b0e0fa89c2ed16..54d2fb31fa82a6600833d78a569c2dcde25b0dfa 100755 --- a/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp +++ b/services/distributeddataservice/adapter/communicator/src/communication_provider_impl.cpp @@ -108,7 +108,7 @@ std::vector CommunicationProviderImpl::GetRemoteNodesBasicInfo() con std::string CommunicationProviderImpl::ToNodeId(const std::string &id) const { - std::string ret = appDeviceHandler_.ToNodeID(id, ""); + std::string ret = appDeviceHandler_.ToNodeID("", id); if (ret.empty()) { ZLOGD("toNodeId failed."); } diff --git a/services/distributeddataservice/adapter/include/utils/kvstore_utils.h b/services/distributeddataservice/adapter/include/utils/kvstore_utils.h index bfa9ab9a1e2e71ead8815eb0f86d85a0c00ce6d2..65f8fb16552d4ec03f843e27956aa327a2b82d4c 100755 --- a/services/distributeddataservice/adapter/include/utils/kvstore_utils.h +++ b/services/distributeddataservice/adapter/include/utils/kvstore_utils.h @@ -17,6 +17,7 @@ #define KVSTORE_UTILS_H #include +#include #include "visibility.h" #include "communication_provider.h" @@ -30,9 +31,12 @@ public: KVSTORE_API static std::string ToBeAnonymous(const std::string &name); KVSTORE_API static AppDistributedKv::CommunicationProvider &GetProviderInstance(); + + KVSTORE_API static uint64_t GenerateSequenceId(); private: static constexpr int MAIN_USER_ID = 0; static constexpr int SYSTEM_UID = 1000; + static std::atomic sequenceId_; }; } // namespace DistributedKv } // namespace OHOS diff --git a/services/distributeddataservice/adapter/utils/src/kvstore_utils.cpp b/services/distributeddataservice/adapter/utils/src/kvstore_utils.cpp index 89dc7333e1d1b5bb9ad9279887060692dbc5c142..097aaf0433d1210ffdbf4d6701d4e6dd2cf99023 100755 --- a/services/distributeddataservice/adapter/utils/src/kvstore_utils.cpp +++ b/services/distributeddataservice/adapter/utils/src/kvstore_utils.cpp @@ -25,6 +25,7 @@ constexpr int32_t END_SIZE = 3; constexpr int32_t MIN_SIZE = HEAD_SIZE + END_SIZE + 3; constexpr const char *REPLACE_CHAIN = "***"; constexpr const char *DEFAULT_ANONYMOUS = "******"; +std::atomic KvStoreUtils::sequenceId_{ 0 }; std::string KvStoreUtils::ToBeAnonymous(const std::string &name) { if (name.length() <= HEAD_SIZE) { @@ -46,5 +47,10 @@ AppDistributedKv::CommunicationProvider &KvStoreUtils::GetProviderInstance() return *(AppDistributedKv::CommunicationProvider::MakeCommunicationProvider().get()); #endif } + +uint64_t KvStoreUtils::GenerateSequenceId() +{ + return ++sequenceId_; +} } // namespace DistributedKv } // namespace OHOS diff --git a/services/distributeddataservice/app/src/query_helper.cpp b/services/distributeddataservice/app/src/query_helper.cpp index fa04942407074cd87a50993c190ff832c1f56e8e..cb56bf8a5349bb24e346dcdc34442482d41760cd 100644 --- a/services/distributeddataservice/app/src/query_helper.cpp +++ b/services/distributeddataservice/app/src/query_helper.cpp @@ -15,25 +15,26 @@ #define LOG_TAG "QueryHelper" -#include "query_helper.h" #include #include #include #include "kvstore_utils.h" #include "data_query.h" #include "log_print.h" +#include "types.h" +#include "query_helper.h" namespace OHOS::DistributedKv { constexpr int QUERY_SKIP_SIZE = 1; constexpr int QUERY_WORD_SIZE = 2; constexpr int MAX_QUERY_LENGTH = 5 * 1024; // Max query string length 5k constexpr int MAX_QUERY_COMPLEXITY = 500; // Max query complexity 500 -bool QueryHelper::hasPrefixKey_{}; +bool QueryHelper::hasPrefixKey_{ }; std::string QueryHelper::deviceId_{}; DistributedDB::Query QueryHelper::StringToDbQuery(const std::string &query, bool &isSuccess) { - ZLOGI("query string length:%zu", query.length()); + ZLOGI("query string length:%{public}zu", query.length()); DistributedDB::Query dbQuery = DistributedDB::Query::Select(); if (query.size() == 0) { ZLOGI("Query string is empty."); @@ -127,6 +128,8 @@ void QueryHelper::HandleExtra(const std::vector &words, int &pointe HandleDeviceId(words, pointer, end, isSuccess, dbQuery); } else if (keyword == DataQuery::SUGGEST_INDEX) { HandleSetSuggestIndex(words, pointer, end, isSuccess, dbQuery); + } else if (keyword == DataQuery::IN_KEYS) { + HandleInKeys(words, pointer, end, isSuccess, dbQuery); } else { ZLOGE("Invalid keyword."); isSuccess = false; @@ -497,6 +500,33 @@ void QueryHelper::HandleKeyPrefix(const std::vector &words, int &po pointer += 2; // Pointer goes to next keyword } +void QueryHelper::HandleInKeys(const std::vector &words, int &pointer, + const int &end, bool &isSuccess, DistributedDB::Query &dbQuery) { + // pointer points at keyword "IN_KEYS", (pointer + 1) points at keyword "START_IN" + int startInOffSet = pointer + 1; + int queryLen = end - pointer; + if (queryLen < 2 || words.at(startInOffSet) != DataQuery::START_IN) { // This keyword has at least 2 params + ZLOGE("In not enough params."); + isSuccess = false; + return; + } + int inkeyOffSet = startInOffSet + 1; // inkeyOffSet points at the first inkey value + const std::vector inKeys = GetStringList(words, inkeyOffSet, end); + std::set> inDbKeys; + for (const std::string &inKey : inKeys) { + ZLOGI("inKey=%{public}s", inKey.c_str()); + std::vector dbKey; + dbKey.assign(inKey.begin(), inKey.end()); + inDbKeys.insert(dbKey); + } + int size = inDbKeys.size(); + ZLOGI("size of inKeys=%{public}d", size); + dbQuery.InKeys(inDbKeys); + isSuccess = true; + int endOffSet = inkeyOffSet; + pointer = endOffSet + 1; // endOffSet points at keyword "END", Pointer goes to next keyword +} + void QueryHelper::HandleSetSuggestIndex(const std::vector &words, int &pointer, const int &end, bool &isSuccess, DistributedDB::Query &dbQuery) { if (pointer + QUERY_SKIP_SIZE > end) { diff --git a/services/distributeddataservice/app/src/query_helper.h b/services/distributeddataservice/app/src/query_helper.h index 158b25b551a2e7a4a58c1c0f9a832c2ca0c0277f..c87bea95a26f5c22823c421ce91632f53a120f4c 100755 --- a/services/distributeddataservice/app/src/query_helper.h +++ b/services/distributeddataservice/app/src/query_helper.h @@ -16,7 +16,9 @@ #ifndef QUERY_HELPER_H #define QUERY_HELPER_H +#include #include "query.h" +#include "types.h" namespace OHOS::DistributedKv { class QueryHelper { @@ -25,6 +27,7 @@ public: private: static std::string deviceId_; static bool hasPrefixKey_; + static bool hasInKeys_; static void Handle(const std::vector &words, int &pointer, const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); static void HandleExtra(const std::vector &words, int &pointer, @@ -69,6 +72,8 @@ private: const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); static void HandleKeyPrefix(const std::vector &words, int &pointer, const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); + static void HandleInKeys(const std::vector &words, int &pointer, + const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); static void HandleSetSuggestIndex(const std::vector &words, int &pointer, const int &end, bool &isSuccess, DistributedDB::Query &dbQuery); static void HandleDeviceId(const std::vector &words, int &pointer, diff --git a/services/distributeddataservice/app/src/single_kvstore_impl.cpp b/services/distributeddataservice/app/src/single_kvstore_impl.cpp index 4389691c4443081d60434059f36ca6b03bf57e52..984fe07bccf7a11a3e3c48d106bcc121f1f70e32 100755 --- a/services/distributeddataservice/app/src/single_kvstore_impl.cpp +++ b/services/distributeddataservice/app/src/single_kvstore_impl.cpp @@ -297,19 +297,19 @@ int SingleKvStoreImpl::ConvertToDbObserverMode(const SubscribeType subscribeType return dbObserverMode; } - // Convert KvStore sync mode to DistributeDB sync mode. - DistributedDB::SyncMode SingleKvStoreImpl::ConvertToDbSyncMode(SyncMode syncMode) const - { - DistributedDB::SyncMode dbSyncMode; - if (syncMode == SyncMode::PUSH) { - dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY; - } else if (syncMode == SyncMode::PULL) { - dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY; - } else { - dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL; - } - return dbSyncMode; +// Convert KvStore sync mode to DistributeDB sync mode. +DistributedDB::SyncMode SingleKvStoreImpl::ConvertToDbSyncMode(SyncMode syncMode) const +{ + DistributedDB::SyncMode dbSyncMode; + if (syncMode == SyncMode::PUSH) { + dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY; + } else if (syncMode == SyncMode::PULL) { + dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY; + } else { + dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL; } + return dbSyncMode; +} Status SingleKvStoreImpl::UnSubscribeKvStore(const SubscribeType subscribeType, sptr observer) { @@ -457,34 +457,11 @@ Status SingleKvStoreImpl::GetEntriesWithQuery(const std::string &query, std::vec } return Status::SUCCESS; } - switch (status) { - case DistributedDB::DBStatus::BUSY: - case DistributedDB::DBStatus::DB_ERROR: { - return Status::DB_ERROR; - } - case DistributedDB::DBStatus::INVALID_ARGS: { - return Status::INVALID_ARGUMENT; - } - case DistributedDB::DBStatus::INVALID_QUERY_FORMAT: { - return Status::INVALID_QUERY_FORMAT; - } - case DistributedDB::DBStatus::INVALID_QUERY_FIELD: { - return Status::INVALID_QUERY_FIELD; - } - case DistributedDB::DBStatus::NOT_SUPPORT: { - return Status::NOT_SUPPORT; - } - case DistributedDB::DBStatus::NOT_FOUND: { - ZLOGI("DB return NOT_FOUND, no matching result. Return success with empty list."); - return Status::SUCCESS; - } - case DistributedDB::DBStatus::EKEYREVOKED_ERROR: // fallthrough - case DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR: - return Status::SECURITY_LEVEL_ERROR; - default: { - return Status::ERROR; - } + if (status == DistributedDB::DBStatus::NOT_FOUND) { + ZLOGI("DB return NOT_FOUND, no matching result. Return success with empty list."); + return Status::SUCCESS; } + return ConvertDbStatus(status); } void SingleKvStoreImpl::GetResultSet(const Key &prefixKey, @@ -735,7 +712,8 @@ Status SingleKvStoreImpl::RemoveDeviceData(const std::string &device) return Status::ERROR; } -Status SingleKvStoreImpl::Sync(const std::vector &deviceIds, SyncMode mode, uint32_t allowedDelayMs) +Status SingleKvStoreImpl::Sync(const std::vector &deviceIds, SyncMode mode, + uint32_t allowedDelayMs, uint64_t sequenceId) { DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__)); ZLOGD("start."); @@ -754,10 +732,11 @@ Status SingleKvStoreImpl::Sync(const std::vector &deviceIds, SyncMo lastSyncMode_ = mode; lastSyncDelayMs_ = delayMs; } - return AddSync(deviceIds, mode, delayMs); + return AddSync(deviceIds, mode, delayMs, sequenceId); } -Status SingleKvStoreImpl::Sync(const std::vector &deviceIds, SyncMode mode, const std::string &query) +Status SingleKvStoreImpl::Sync(const std::vector &deviceIds, SyncMode mode, + const std::string &query, uint64_t sequenceId) { DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__)); ZLOGD("start."); @@ -766,27 +745,27 @@ Status SingleKvStoreImpl::Sync(const std::vector &deviceIds, SyncMo return Status::EXCEED_MAX_ACCESS_RATE; } uint32_t delayMs = GetSyncDelayTime(0); - return AddSync(deviceIds, mode, query, delayMs); + return AddSync(deviceIds, mode, query, delayMs, sequenceId); } -Status SingleKvStoreImpl::AddSync(const std::vector &deviceIds, SyncMode mode, - uint32_t delayMs) +Status SingleKvStoreImpl::AddSync(const std::vector &deviceIds, SyncMode mode, uint32_t delayMs, + uint64_t sequenceId) { ZLOGD("start."); waitingSyncCount_++; return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, - std::bind(&SingleKvStoreImpl::DoSync, this, deviceIds, mode, std::placeholders::_1), - std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); + std::bind(&SingleKvStoreImpl::DoSync, this, deviceIds, mode, std::placeholders::_1, sequenceId), + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, "", sequenceId)); } Status SingleKvStoreImpl::AddSync(const std::vector &deviceIds, SyncMode mode, - const std::string &query, uint32_t delayMs) + const std::string &query, uint32_t delayMs, uint64_t sequenceId) { ZLOGD("start."); waitingSyncCount_++; return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, - std::bind(&SingleKvStoreImpl::DoQuerySync, this, deviceIds, mode, query, std::placeholders::_1), - std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); + std::bind(&SingleKvStoreImpl::DoQuerySync, this, deviceIds, mode, query, std::placeholders::_1, sequenceId), + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, query, sequenceId)); } uint32_t SingleKvStoreImpl::GetSyncDelayTime(uint32_t allowedDelayMs) const @@ -814,7 +793,8 @@ Status SingleKvStoreImpl::RemoveAllSyncOperation() return KvStoreSyncManager::GetInstance()->RemoveSyncOperation(reinterpret_cast(this)); } -void SingleKvStoreImpl::DoSyncComplete(const std::map &devicesSyncResult) +void SingleKvStoreImpl::DoSyncComplete(const std::map &devicesSyncResult, + const std::string &query, uint64_t sequenceId) { DdsTrace trace(std::string("DdsTrace " LOG_TAG "::") + std::string(__FUNCTION__)); std::map resultMap; @@ -824,12 +804,12 @@ void SingleKvStoreImpl::DoSyncComplete(const std::mapSyncCompleted(resultMap); + syncCallback_->SyncCompleted(resultMap, sequenceId); } } Status SingleKvStoreImpl::DoQuerySync(const std::vector &deviceIds, SyncMode mode, - const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd) + const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd, uint64_t sequenceId) { ZLOGD("start."); std::vector deviceUuids = MapNodeIdToUuids(deviceIds); @@ -868,8 +848,7 @@ Status SingleKvStoreImpl::DoQuerySync(const std::vector &deviceIds, if (status == DistributedDB::DBStatus::BUSY) { if (syncRetries_ < KvStoreSyncManager::SYNC_RETRY_MAX_COUNT) { syncRetries_++; - auto addStatus = AddSync(deviceIds, mode, query, - KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS); + auto addStatus = AddSync(deviceIds, mode, query, KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS, sequenceId); if (addStatus == Status::SUCCESS) { return addStatus; } @@ -879,7 +858,7 @@ Status SingleKvStoreImpl::DoQuerySync(const std::vector &deviceIds, } Status SingleKvStoreImpl::DoSync(const std::vector &deviceIds, SyncMode mode, - const KvStoreSyncManager::SyncEnd &syncEnd) + const KvStoreSyncManager::SyncEnd &syncEnd, uint64_t sequenceId) { ZLOGD("start."); std::vector deviceUuids = MapNodeIdToUuids(deviceIds); @@ -904,7 +883,7 @@ Status SingleKvStoreImpl::DoSync(const std::vector &deviceIds, Sync if (status == DistributedDB::DBStatus::BUSY) { if (syncRetries_ < KvStoreSyncManager::SYNC_RETRY_MAX_COUNT) { syncRetries_++; - auto addStatus = AddSync(deviceUuids, mode, KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS); + auto addStatus = AddSync(deviceUuids, mode, KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS, sequenceId); if (addStatus == Status::SUCCESS) { return addStatus; } @@ -925,8 +904,8 @@ std::vector SingleKvStoreImpl::MapNodeIdToUuids(const std::vector &deviceIds, - const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd) +Status SingleKvStoreImpl::DoSubscribe(const std::vector &deviceIds, + const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd) { ZLOGD("start."); std::vector deviceUuids = MapNodeIdToUuids(deviceIds); @@ -956,8 +935,8 @@ Status SingleKvStoreImpl::DoSubscribeWithQuery(const std::vector &d return ConvertDbStatus(status); } -Status SingleKvStoreImpl::DoUnSubscribeWithQuery(const std::vector &deviceIds, const std::string &query, - const KvStoreSyncManager::SyncEnd &syncEnd) +Status SingleKvStoreImpl::DoUnSubscribe(const std::vector &deviceIds, const std::string &query, + const KvStoreSyncManager::SyncEnd &syncEnd) { ZLOGD("start."); std::vector deviceUuids = MapNodeIdToUuids(deviceIds); @@ -986,26 +965,26 @@ Status SingleKvStoreImpl::DoUnSubscribeWithQuery(const std::vector return ConvertDbStatus(status); } -Status SingleKvStoreImpl::AddSubscribeWithQuery(const std::vector &deviceIds, - const std::string &query, uint32_t delayMs) +Status SingleKvStoreImpl::AddSubscribe(const std::vector &deviceIds, const std::string &query, + uint32_t delayMs, uint64_t sequenceId) { ZLOGD("start."); return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, - std::bind(&SingleKvStoreImpl::DoSubscribeWithQuery, this, deviceIds, query, std::placeholders::_1), - std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); + std::bind(&SingleKvStoreImpl::DoSubscribe, this, deviceIds, query, std::placeholders::_1), + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, "", sequenceId)); } -Status SingleKvStoreImpl::AddUnSubscribeWithQuery(const std::vector &deviceIds, - const std::string &query, uint32_t delayMs) +Status SingleKvStoreImpl::AddUnSubscribe(const std::vector &deviceIds, const std::string &query, + uint32_t delayMs, uint64_t sequenceId) { ZLOGD("start."); return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, - std::bind(&SingleKvStoreImpl::DoUnSubscribeWithQuery, this, deviceIds, query, std::placeholders::_1), - std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); + std::bind(&SingleKvStoreImpl::DoUnSubscribe, this, deviceIds, query, std::placeholders::_1), + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1, "", sequenceId)); } -Status SingleKvStoreImpl::SubscribeWithQuery(const std::vector &deviceIds, - const std::string &query) +Status SingleKvStoreImpl::Subscribe(const std::vector &deviceIds, + const std::string &query, uint64_t sequenceId) { DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__)); ZLOGD("start."); @@ -1014,11 +993,11 @@ Status SingleKvStoreImpl::SubscribeWithQuery(const std::vector &dev return Status::EXCEED_MAX_ACCESS_RATE; } uint32_t delayMs = GetSyncDelayTime(0); - return AddSubscribeWithQuery(deviceIds, query, delayMs); + return AddSubscribe(deviceIds, query, delayMs, sequenceId); } -Status SingleKvStoreImpl::UnSubscribeWithQuery(const std::vector &deviceIds, - const std::string &query) +Status SingleKvStoreImpl::UnSubscribe(const std::vector &deviceIds, + const std::string &query, uint64_t sequenceId) { DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__)); ZLOGD("start."); @@ -1027,7 +1006,7 @@ Status SingleKvStoreImpl::UnSubscribeWithQuery(const std::vector &d return Status::EXCEED_MAX_ACCESS_RATE; } uint32_t delayMs = GetSyncDelayTime(0); - return AddUnSubscribeWithQuery(deviceIds, query, delayMs); + return AddUnSubscribe(deviceIds, query, delayMs, sequenceId); } InnerStatus SingleKvStoreImpl::Close(DistributedDB::KvStoreDelegateManager *kvStoreDelegateManager) diff --git a/services/distributeddataservice/app/src/single_kvstore_impl.h b/services/distributeddataservice/app/src/single_kvstore_impl.h index 2e2eaae5b8474d8a9ae51cc6c8104d3d9c71cc57..e1c2d5179572a71a1bbf0d238842924f222df14e 100755 --- a/services/distributeddataservice/app/src/single_kvstore_impl.h +++ b/services/distributeddataservice/app/src/single_kvstore_impl.h @@ -51,8 +51,10 @@ public: std::function)> callback) override; Status GetCountWithQuery(const std::string &query, int &result) override; Status CloseResultSet(sptr resultSet) override; - Status Sync(const std::vector &deviceIds, SyncMode mode, uint32_t allowedDelayMs) override; - Status Sync(const std::vector &deviceIds, SyncMode mode, const std::string &query) override; + Status Sync(const std::vector &deviceIds, SyncMode mode, uint32_t allowedDelayMs, + uint64_t sequenceId) override; + Status Sync(const std::vector &deviceIds, SyncMode mode, + const std::string &query, uint64_t sequenceId) override; Status RemoveDeviceData(const std::string &device) override; Status RegisterSyncCallback(sptr callback) override; Status UnRegisterSyncCallback() override; @@ -84,32 +86,35 @@ protected: private: Status ConvertDbStatus(DistributedDB::DBStatus dbStatus); uint32_t GetSyncDelayTime(uint32_t allowedDelayMs) const; - Status AddSync(const std::vector &deviceIds, SyncMode mode, uint32_t delayMs); + Status AddSync(const std::vector &deviceIds, SyncMode mode, uint32_t delayMs, + uint64_t sequenceId); Status AddSync(const std::vector &deviceIds, SyncMode mode, - const std::string &query, uint32_t delayMs); + const std::string &query, uint32_t delayMs, uint64_t sequenceId); Status RemoveAllSyncOperation(); - void DoSyncComplete(const std::map &devicesSyncResult); - Status DoSync(const std::vector &deviceIds, SyncMode mode, const KvStoreSyncManager::SyncEnd &syncEnd); + void DoSyncComplete(const std::map &devicesSyncResult, + const std::string &query, uint64_t sequenceId); + Status DoSync(const std::vector &deviceIds, SyncMode mode, const KvStoreSyncManager::SyncEnd &syncEnd, + uint64_t sequenceId); Status DoQuerySync(const std::vector &deviceIds, SyncMode mode, const std::string &query, - const KvStoreSyncManager::SyncEnd &syncEnd); + const KvStoreSyncManager::SyncEnd &syncEnd, uint64_t sequenceId); Status AddAutoSync(); Status DoAutoSync(const KvStoreSyncManager::SyncEnd &); Status RebuildKvStoreObserver(DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate); Status RebuildKvStoreResultSet(); int ConvertToDbObserverMode(SubscribeType subscribeType) const; DistributedDB::SyncMode ConvertToDbSyncMode(SyncMode syncMode) const; - Status DoSubscribeWithQuery(const std::vector &deviceIds, - const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd); - Status AddSubscribeWithQuery(const std::vector &deviceIds, - const std::string &query, uint32_t delayMs); - Status SubscribeWithQuery(const std::vector &deviceIds, - const std::string &query) override; - Status DoUnSubscribeWithQuery(const std::vector &deviceIds, - const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd); - Status AddUnSubscribeWithQuery(const std::vector &deviceIds, - const std::string &query, uint32_t delayMs); - Status UnSubscribeWithQuery(const std::vector &deviceIds, - const std::string &query) override; + Status DoSubscribe(const std::vector &deviceIds, + const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd); + Status AddSubscribe(const std::vector &deviceIds, + const std::string &query, uint32_t delayMs, uint64_t sequenceId); + Status Subscribe(const std::vector &deviceIds, + const std::string &query, uint64_t sequenceId) override; + Status DoUnSubscribe(const std::vector &deviceIds, + const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd); + Status AddUnSubscribe(const std::vector &deviceIds, + const std::string &query, uint32_t delayMs, uint64_t sequenceId); + Status UnSubscribe(const std::vector &deviceIds, + const std::string &query, uint64_t sequenceId) override; std::vector MapNodeIdToUuids(const std::vector &deviceIds); // kvstore options.