diff --git a/services/distributeddataservice/service/data_share/common/db_delegate.h b/services/distributeddataservice/service/data_share/common/db_delegate.h index f3b344fa270077d84e2842bd6746242ecf10989c..790a8a028a82f9e3b51c6202b3cc43c9cf5eb472 100644 --- a/services/distributeddataservice/service/data_share/common/db_delegate.h +++ b/services/distributeddataservice/service/data_share/common/db_delegate.h @@ -27,6 +27,7 @@ #include "metadata/store_meta_data.h" #include "result_set.h" #include "serializable/serializable.h" +#include "value_object.h" namespace OHOS::DataShare { class DBDelegate { @@ -44,6 +45,7 @@ public: virtual std::string Query( const std::string &sql, const std::vector &selectionArgs = std::vector()) = 0; virtual std::shared_ptr QuerySql(const std::string &sql) = 0; + virtual std::pair UpdateSql(const std::string &sql) = 0; virtual bool IsInvalid() = 0; static void SetExecutorPool(std::shared_ptr executor); static void EraseStoreCache(const int32_t tokenId); diff --git a/services/distributeddataservice/service/data_share/common/rdb_delegate.cpp b/services/distributeddataservice/service/data_share/common/rdb_delegate.cpp index 38cd734329f1784ed4da390d40e5194a9869c592..1940f715cf1fcebedb62965c3d940e3080c89fa2 100644 --- a/services/distributeddataservice/service/data_share/common/rdb_delegate.cpp +++ b/services/distributeddataservice/service/data_share/common/rdb_delegate.cpp @@ -350,6 +350,22 @@ std::shared_ptr RdbDelegate::QuerySql(const std::string &s return resultSet; } +std::pair RdbDelegate::UpdateSql(const std::string &sql) +{ + if (store_ == nullptr) { + ZLOGE("store is null"); + return std::make_pair(E_ERROR, 0); + } + auto[ret, outValue] = store_->Execute(sql); + if (ret != E_OK) { + ZLOGE("execute update sql failed, err:%{public}d", ret); + return std::make_pair(ret, 0); + } + int64_t rowCount = 0; + outValue.GetLong(rowCount); + return std::make_pair(ret, rowCount); +} + bool RdbDelegate::IsInvalid() { return store_ == nullptr; diff --git a/services/distributeddataservice/service/data_share/common/rdb_delegate.h b/services/distributeddataservice/service/data_share/common/rdb_delegate.h index 3e578aa96feca19378be79127b8ec217f385a57e..b77a7a74c667572366b69f1d7c487e077f2ab9ae 100644 --- a/services/distributeddataservice/service/data_share/common/rdb_delegate.h +++ b/services/distributeddataservice/service/data_share/common/rdb_delegate.h @@ -42,6 +42,7 @@ public: const int32_t callingPid) override; std::string Query(const std::string &sql, const std::vector &selectionArgs) override; std::shared_ptr QuerySql(const std::string &sql) override; + std::pair UpdateSql(const std::string &sql) override; bool IsInvalid() override; std::pair InsertEx(const std::string &tableName, const DataShareValuesBucket &valuesBucket) override; diff --git a/services/distributeddataservice/service/data_share/common/scheduler_manager.cpp b/services/distributeddataservice/service/data_share/common/scheduler_manager.cpp index e02fec3b05182860bf7d1e712b5bdb061416df5f..739aedb795f11daa4358b46605d64e610b2ae015 100644 --- a/services/distributeddataservice/service/data_share/common/scheduler_manager.cpp +++ b/services/distributeddataservice/service/data_share/common/scheduler_manager.cpp @@ -65,6 +65,64 @@ void SchedulerManager::Execute(const Key &key, const int32_t userId, const std:: ExecuteSchedulerSQL(rdbDir, userId, version, key, delegate); } +void SchedulerManager::Start(const Key &key, int32_t userId, const DistributedData::StoreMetaData &metaData) +{ + { + std::lock_guard lock(mutex_); + auto it = schedulerStatusCache_.find(key); + if (it == schedulerStatusCache_.end()) { + schedulerStatusCache_.emplace(key, true); + } + } + Execute(key, userId, metaData); +} + +void SchedulerManager::Stop(const Key &key) +{ + std::lock_guard lock(mutex_); + RemoveTimer(key); + auto it = schedulerStatusCache_.find(key); + if (it != schedulerStatusCache_.end()) { + schedulerStatusCache_.erase(it); + } +} + +void SchedulerManager::Enable(const Key &key, int32_t userId, const DistributedData::StoreMetaData &metaData) +{ + Template tpl; + if (!TemplateManager::GetInstance().Get(key, userId, tpl) || + tpl.scheduler_.empty() || tpl.scheduler_.find(REMIND_TIMER_FUNC) == std::string::npos) { + ZLOGE("find template scheduler failed, %{public}s, %{public}" PRId64 ", %{public}s", + DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str()); + return; + } + bool isTimerStopped = false; + { + std::lock_guard lock(mutex_); + auto it = schedulerStatusCache_.find(key); + if (it != schedulerStatusCache_.end()) { + it->second = true; + } + auto timer = timerCache_.find(key); + if (timer == timerCache_.end()) { + isTimerStopped = true; + } + } + if (isTimerStopped) { + Execute(key, userId, metaData); + RdbSubscriberManager::GetInstance().EmitByKey(key, userId, metaData); + } +} + +void SchedulerManager::Disable(const Key &key) +{ + std::lock_guard lock(mutex_); + auto it = schedulerStatusCache_.find(key); + if (it != schedulerStatusCache_.end()) { + it->second = false; + } +} + bool SchedulerManager::SetTimerTask(uint64_t &timerId, const std::function &callback, int64_t reminderTime) { @@ -95,6 +153,29 @@ void SchedulerManager::ResetTimerTask(int64_t timerId, int64_t reminderTime) TimeServiceClient::GetInstance()->StartTimer(timerId, static_cast(reminderTime)); } +int64_t SchedulerManager::EraseTimerTaskId(const Key &key) +{ + int64_t timerId = -1; + std::lock_guard lock(mutex_); + auto it = timerCache_.find(key); + if (it != timerCache_.end()) { + timerId = it->second; + timerCache_.erase(key); + } + return timerId; +} + +bool SchedulerManager::GetSchedulerStatus(const Key &key) +{ + bool enabled = false; + std::lock_guard lock(mutex_); + auto it = schedulerStatusCache_.find(key); + if (it != schedulerStatusCache_.end()) { + enabled = it->second; + } + return enabled; +} + void SchedulerManager::SetTimer( const std::string &dbPath, const int32_t userId, int version, const Key &key, int64_t reminderTime) { @@ -125,18 +206,12 @@ void SchedulerManager::SetTimer( ZLOGI("schedule notify start, uri is %{private}s, subscriberId is %{public}" PRId64 ", bundleName is " "%{public}s", DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str()); - int64_t timerId = -1; - { - std::lock_guard lock(mutex_); - auto it = timerCache_.find(key); - if (it != timerCache_.end()) { - timerId = it->second; - timerCache_.erase(key); - } - } + int64_t timerId = EraseTimerTaskId(key); DestoryTimerTask(timerId); - Execute(key, userId, dbPath, version); - RdbSubscriberManager::GetInstance().EmitByKey(key, userId, dbPath, version); + if (GetSchedulerStatus(key)) { + Execute(key, userId, metaData); + RdbSubscriberManager::GetInstance().EmitByKey(key, userId, metaData); + } }; uint64_t timerId = 0; if (!SetTimerTask(timerId, callback, reminderTime)) { @@ -198,7 +273,6 @@ void SchedulerManager::GenRemindTimerFuncParams( void SchedulerManager::RemoveTimer(const Key &key) { - std::lock_guard lock(mutex_); if (executor_ == nullptr) { ZLOGE("executor_ is nullptr"); return; diff --git a/services/distributeddataservice/service/data_share/common/scheduler_manager.h b/services/distributeddataservice/service/data_share/common/scheduler_manager.h index c112dcb58bb6eac00ceb948727c76e226c59eed0..94947293d90eac4d6647fd3412a777f1eb97d272 100644 --- a/services/distributeddataservice/service/data_share/common/scheduler_manager.h +++ b/services/distributeddataservice/service/data_share/common/scheduler_manager.h @@ -34,6 +34,10 @@ public: void RemoveTimer(const Key &key); void ClearTimer(); void SetExecutorPool(std::shared_ptr executor); + void Start(const Key &key, int32_t userId, const DistributedData::StoreMetaData &metaData); + void Stop(const Key &key); + void Enable(const Key &key, int32_t userId, const DistributedData::StoreMetaData &metaData); + void Disable(const Key &key); private: static constexpr const char *REMIND_TIMER_FUNC = "remindTimer("; @@ -47,9 +51,12 @@ private: bool SetTimerTask(uint64_t &timerId, const std::function &callback, int64_t reminderTime); void DestoryTimerTask(int64_t timerId); void ResetTimerTask(int64_t timerId, int64_t reminderTime); + int64_t EraseTimerTaskId(const Key &key); + bool GetSchedulerStatus(const Key &key); std::mutex mutex_; std::map timerCache_; + std::map schedulerStatusCache_; std::shared_ptr executor_ = nullptr; }; } // namespace OHOS::DataShare diff --git a/services/distributeddataservice/service/data_share/data/template_data.cpp b/services/distributeddataservice/service/data_share/data/template_data.cpp index f99bec09c24f851811f1371be7302f2260bb8426..baf5dc4d5dda77ae2d5773dba7ec7c9410ec25f7 100644 --- a/services/distributeddataservice/service/data_share/data/template_data.cpp +++ b/services/distributeddataservice/service/data_share/data/template_data.cpp @@ -18,18 +18,20 @@ namespace OHOS::DataShare { bool TemplateNode::Marshal(DistributedData::Serializable::json &node) const { - bool ret = SetValue(node[GET_NAME(predicates)], predicates); + bool ret = SetValue(node[GET_NAME(update)], update); + ret = SetValue(node[GET_NAME(predicates)], predicates); ret = ret && SetValue(node[GET_NAME(scheduler)], scheduler); return ret; } bool TemplateNode::Unmarshal(const DistributedData::Serializable::json &node) { - bool ret = GetValue(node, GET_NAME(predicates), predicates); + bool ret = GetValue(node, GET_NAME(update), update); + ret = GetValue(node, GET_NAME(predicates), predicates); return ret && GetValue(node, GET_NAME(scheduler), scheduler); } -TemplateNode::TemplateNode(const Template &tpl) : scheduler(tpl.scheduler_) +TemplateNode::TemplateNode(const Template &tpl) : update(tpl.update_), scheduler(tpl.scheduler_) { for (auto &item:tpl.predicates_) { predicates.emplace_back(item.key_, item.selectSql_); @@ -42,7 +44,7 @@ Template TemplateNode::ToTemplate() const for (const auto &predicate: predicates) { nodes.emplace_back(predicate.key, predicate.selectSql); } - return Template(nodes, scheduler); + return Template(update, nodes, scheduler); } bool TemplateRootNode::Marshal(DistributedData::Serializable::json &node) const diff --git a/services/distributeddataservice/service/data_share/data/template_data.h b/services/distributeddataservice/service/data_share/data/template_data.h index 859f0d2d3fe02515bc87390246e7ed48249788dd..f0fe0b30fb5ae4791c89874dba245c148c809078 100644 --- a/services/distributeddataservice/service/data_share/data/template_data.h +++ b/services/distributeddataservice/service/data_share/data/template_data.h @@ -36,6 +36,7 @@ struct TemplateNode final: public DistributedData::Serializable { bool Unmarshal(const json &node) override; Template ToTemplate() const; private: + std::string update; std::vector predicates; std::string scheduler; }; diff --git a/services/distributeddataservice/service/data_share/data_share_obs_proxy.cpp b/services/distributeddataservice/service/data_share/data_share_obs_proxy.cpp index ce6440327d51beccbf659e386d6c39620186670e..769ea0b3e10b9d24ca35c0208f7c5c534ea34fb7 100644 --- a/services/distributeddataservice/service/data_share/data_share_obs_proxy.cpp +++ b/services/distributeddataservice/service/data_share/data_share_obs_proxy.cpp @@ -15,13 +15,124 @@ #define LOG_TAG "ObserverProxy" #include "data_share_obs_proxy.h" +#include "datashare_errno.h" #include "itypes_util.h" +#include "datashare_itypes_utils.h" #include "log_print.h" namespace OHOS { namespace DataShare { static constexpr int REQUEST_CODE = 0; +int RdbObserverProxy::CreateAshmem(RdbChangeNode &changeNode) +{ + OHOS::sptr memory = Ashmem::CreateAshmem(ASHMEM_NAME, DATA_SIZE_ASHMEM_TRANSFER_LIMIT); + if (memory == nullptr) { + ZLOGE("failed to create Ashmem instance."); + return E_ERROR; + } + bool mapRet = memory->MapReadAndWriteAshmem(); + if (!mapRet) { + ZLOGE("failed to map read and write ashmem, ret=%{public}d", mapRet); + memory->CloseAshmem(); + return E_ERROR; + } + if (changeNode.memory_ != nullptr) { + ZLOGE( + "Unknown error: changeNode.memory_ should be null, but something is there %{public}p", + (void *)changeNode.memory_ + ); + return E_ERROR; + } + changeNode.memory_ = memory; + return E_OK; +} + +int RdbObserverProxy::WriteAshmem(RdbChangeNode &changeNode, void *data, int len, int &offset) +{ + if (changeNode.memory_ == nullptr) { + ZLOGE("changeNode memory is nullptr."); + return E_ERROR; + } + bool writeRet = changeNode.memory_->WriteToAshmem(data, len, offset); + if (!writeRet) { + ZLOGE("failed to write into ashmem, ret=%{public}d", writeRet); + changeNode.memory_->UnmapAshmem(); + changeNode.memory_->CloseAshmem(); + changeNode.memory_ = nullptr; + return E_ERROR; + } + offset += len; + return E_OK; +} + +int RdbObserverProxy::SerializeDataIntoAshmem(RdbChangeNode &changeNode) +{ + if (changeNode.memory_ == nullptr) { + ZLOGE("changeNode.memory_ is nullptr"); + return E_ERROR; + } + // move data + // simple serialization: [vec_size(int32); str1_len(int32), str1; str2_len(int32), str2; ...], + // total byte size is recorded in changeNode.size + int offset = 0; + // 4 byte for length int + int intLen = 4; + int dataSize = changeNode.data_.size(); + if (WriteAshmem(changeNode, (void *)&dataSize, intLen, offset) != E_OK) { + ZLOGE("failed to write data with len %{public}d, offset %{public}d.", intLen, offset); + return E_ERROR; + } + for (int i = 0; i < dataSize; i++) { + const char *str = changeNode.data_[i].c_str(); + int strLen = changeNode.data_[i].length(); + // write length int + if (WriteAshmem(changeNode, (void *)&strLen, intLen, offset) != E_OK) { + ZLOGE("failed to write data with index %{public}d, len %{public}d, offset %{public}d.", i, intLen, offset); + return E_ERROR; + } + // write str + if (WriteAshmem(changeNode, (void *)str, strLen, offset) != E_OK) { + ZLOGE("failed to write data with index %{public}d, len %{public}d, offset %{public}d.", i, strLen, offset); + return E_ERROR; + } + } + changeNode.size_ = offset; + return E_OK; +} + +int RdbObserverProxy::PrepareRdbChangeNodeData(RdbChangeNode &changeNode) +{ + // If data size is bigger than the limit, move it to the shared memory + // 4 byte for length int + int intByteLen = 4; + int size = intByteLen; + for (int i = 0; i < changeNode.data_.size(); i++) { + size += intByteLen; + size += changeNode.data_[i].length(); + } + if (size > DATA_SIZE_ASHMEM_TRANSFER_LIMIT) { + ZLOGE("Data to write into ashmem is %{public}d bytes, over 10M.", size); + return E_ERROR; + } + if (size > DATA_SIZE_IPC_TRANSFER_LIMIT) { + ZLOGD("Data size is over 200k, transfer it by the shared memory"); + if (RdbObserverProxy::CreateAshmem(changeNode) != E_OK) { + ZLOGE("failed to create ashmem."); + return E_ERROR; + } + if (RdbObserverProxy::SerializeDataIntoAshmem(changeNode) != E_OK) { + ZLOGE("failed to serialize data into ashmem."); + return E_ERROR; + } + // clear original data spot + changeNode.data_.clear(); + changeNode.isSharedMemory_ = true; + ZLOGD("Preparation done. Data size: %{public}d", changeNode.size_); + } + return E_OK; +} + void RdbObserverProxy::OnChangeFromRdb(RdbChangeNode &changeNode) { MessageParcel parcel; @@ -29,6 +140,11 @@ void RdbObserverProxy::OnChangeFromRdb(RdbChangeNode &changeNode) return; } + if (RdbObserverProxy::PrepareRdbChangeNodeData(changeNode) != E_OK) { + ZLOGE("failed to prepare RdbChangeNode data."); + return; + } + if (!ITypesUtil::Marshal(parcel, changeNode)) { ZLOGE("failed to WriteParcelable changeNode "); return; diff --git a/services/distributeddataservice/service/data_share/data_share_obs_proxy.h b/services/distributeddataservice/service/data_share/data_share_obs_proxy.h index b54e0b0ceda78e00da752a9ddf9f1588f82765b5..42a3f1065730051c965b9239ce0d66c1074cb5df 100644 --- a/services/distributeddataservice/service/data_share/data_share_obs_proxy.h +++ b/services/distributeddataservice/service/data_share/data_share_obs_proxy.h @@ -28,6 +28,10 @@ public: void OnChangeFromRdb(RdbChangeNode &changeNode) override; private: + int PrepareRdbChangeNodeData(RdbChangeNode &changeNode); + int CreateAshmem(RdbChangeNode &changeNode); + int WriteAshmem(RdbChangeNode &changeNode, void *data, int len, int &offset); + int SerializeDataIntoAshmem(RdbChangeNode &changeNode); static inline BrokerDelegator delegator_; }; diff --git a/services/distributeddataservice/service/data_share/data_share_service_stub.cpp b/services/distributeddataservice/service/data_share/data_share_service_stub.cpp index 0ffcf71257206286719ee073b7070aeb8c48f8fa..d86f3016d65ebd5bc3b6e7b05cb562129b92746f 100644 --- a/services/distributeddataservice/service/data_share/data_share_service_stub.cpp +++ b/services/distributeddataservice/service/data_share/data_share_service_stub.cpp @@ -168,7 +168,7 @@ int32_t DataShareServiceStub::OnAddTemplate(MessageParcel &data, MessageParcel & std::string uri; int64_t subscriberId; Template tpl; - if (!ITypesUtil::Unmarshal(data, uri, subscriberId, tpl.predicates_, tpl.scheduler_)) { + if (!ITypesUtil::Unmarshal(data, uri, subscriberId, tpl.update_, tpl.predicates_, tpl.scheduler_)) { ZLOGW("read device list failed."); return -1; } diff --git a/services/distributeddataservice/service/data_share/data_share_types_util.cpp b/services/distributeddataservice/service/data_share/data_share_types_util.cpp index 804a6f76dd2797b29efd531352128b806256a99d..32f850f1d0164e6559bb49eea6504edb2ca3ea1c 100644 --- a/services/distributeddataservice/service/data_share/data_share_types_util.cpp +++ b/services/distributeddataservice/service/data_share/data_share_types_util.cpp @@ -85,7 +85,21 @@ bool Unmarshalling(PredicateTemplateNode &predicateTemplateNode, MessageParcel & template<> bool Marshalling(const RdbChangeNode &changeNode, MessageParcel &parcel) { - return ITypesUtil::Marshal(parcel, changeNode.uri_, changeNode.templateId_, changeNode.data_); + bool firstPart = ITypesUtil::Marshal( + parcel, changeNode.uri_, changeNode.templateId_, changeNode.data_, changeNode.isSharedMemory_); + if (!firstPart) { + return false; + } + if (changeNode.isSharedMemory_) { + if (changeNode.memory_ == nullptr) { + ZLOGE("Used shared memory but ashmem is nullptr."); + return false; + } + if (!parcel.WriteAshmem(changeNode.memory_)) { + return false; + } + } + return ITypesUtil::Marshal(parcel, changeNode.size_); } template<> diff --git a/services/distributeddataservice/service/data_share/subscriber_managers/rdb_subscriber_manager.cpp b/services/distributeddataservice/service/data_share/subscriber_managers/rdb_subscriber_manager.cpp index 1629b88834e9082d96fcbb6f57884cdb3c890ea8..f87185bfa8882e0a17c01ec0daeab02eff4f44b9 100644 --- a/services/distributeddataservice/service/data_share/subscriber_managers/rdb_subscriber_manager.cpp +++ b/services/distributeddataservice/service/data_share/subscriber_managers/rdb_subscriber_manager.cpp @@ -50,7 +50,7 @@ int32_t TemplateManager::Delete(const Key &key, int32_t userId) ZLOGE("Delete failed, %{public}d", status); return E_ERROR; } - SchedulerManager::GetInstance().RemoveTimer(key); + SchedulerManager::GetInstance().Stop(key); return E_OK; } @@ -120,7 +120,7 @@ int RdbSubscriberManager::Add(const Key &key, const sptr auto callerTokenId = IPCSkeleton::GetCallingTokenID(); value.emplace_back(observer, context->callerTokenId, callerTokenId); std::vector node; - node.emplace_back(observer, context->callerTokenId, callerTokenId); + node.emplace_back(observer, context->callerTokenId, callerTokenId, callerPid); ExecutorPool::Task task = [key, node, context, this]() { LoadConfigDataInfoStrategy loadDataInfo; if (!loadDataInfo(context)) { @@ -130,8 +130,7 @@ int RdbSubscriberManager::Add(const Key &key, const sptr } Notify(key, context->currentUserId, node, context->calledSourceDir, context->version); if (GetEnableObserverCount(key) == 1) { - SchedulerManager::GetInstance().Execute( - key, context->currentUserId, context->calledSourceDir, context->version); + SchedulerManager::GetInstance().Start(key, context->currentUserId, metaData); } }; executorPool->Execute(task); @@ -156,7 +155,7 @@ int RdbSubscriberManager::Delete(const Key &key, uint32_t firstCallerTokenId) } } if (value.empty()) { - SchedulerManager::GetInstance().RemoveTimer(key); + SchedulerManager::GetInstance().Stop(key); } return !value.empty(); }); @@ -177,7 +176,7 @@ void RdbSubscriberManager::Delete(uint32_t callerTokenId) ZLOGI("delete timer, subId %{public}" PRId64 ", bundleName %{public}s, tokenId %{public}x, uri %{public}s.", key.subscriberId, key.bundleName.c_str(), callerTokenId, DistributedData::Anonymous::Change(key.uri).c_str()); - SchedulerManager::GetInstance().RemoveTimer(key); + SchedulerManager::GetInstance().Stop(key); } return value.empty(); }); @@ -185,39 +184,55 @@ void RdbSubscriberManager::Delete(uint32_t callerTokenId) int RdbSubscriberManager::Disable(const Key &key, uint32_t firstCallerTokenId) { + bool isAllDisabled = true; auto result = - rdbCache_.ComputeIfPresent(key, [&firstCallerTokenId, this](const auto &key, + rdbCache_.ComputeIfPresent(key, [&firstCallerTokenId, &isAllDisabled, this](const auto &key, std::vector &value) { for (auto it = value.begin(); it != value.end(); it++) { if (it->firstCallerTokenId == firstCallerTokenId) { it->enabled = false; it->isNotifyOnEnabled = false; } + if (it->enabled) { + isAllDisabled = false; + } } return true; }); + if (isAllDisabled) { + SchedulerManager::GetInstance().Disable(key); + } return result ? E_OK : E_SUBSCRIBER_NOT_EXIST; } int RdbSubscriberManager::Enable(const Key &key, std::shared_ptr context) { - auto result = rdbCache_.ComputeIfPresent(key, [&context, this](const auto &key, std::vector &value) { + bool isChanged = false; + DistributedData::StoreMetaData metaData; + auto result = rdbCache_.ComputeIfPresent(key, [&context, &metaData, &isChanged, this](const auto &key, + std::vector &value) { for (auto it = value.begin(); it != value.end(); it++) { if (it->firstCallerTokenId != context->callerTokenId) { continue; } it->enabled = true; + LoadConfigDataInfoStrategy loadDataInfo; + if (!loadDataInfo(context)) { + return true; + } + isChanged = true; + metaData = RdbSubscriberManager::GenMetaDataFromContext(context); if (it->isNotifyOnEnabled) { std::vector node; node.emplace_back(it->observer, context->callerTokenId); - LoadConfigDataInfoStrategy loadDataInfo; - if (loadDataInfo(context)) { - Notify(key, context->currentUserId, node, context->calledSourceDir, context->version); - } + Notify(key, context->currentUserId, node, metaData); } } return true; }); + if (isChanged) { + SchedulerManager::GetInstance().Enable(key, context->currentUserId, metaData); + } return result ? E_OK : E_SUBSCRIBER_NOT_EXIST; } @@ -343,6 +358,13 @@ int RdbSubscriberManager::Notify(const Key &key, int32_t userId, const std::vect } changeNode.data_.emplace_back("{\"" + predicate.key_ + "\":" + result + "}"); } + if (!tpl.update_.empty()) { + auto [errCode, rowCount] = delegate->UpdateSql(tpl.update_); + if (errCode != E_OK) { + ZLOGE("Update failed, err:%{public}d, %{public}s, %{public}" PRId64 ", %{public}s", + errCode, DistributedData::Anonymous::Change(key.uri).c_str(), key.subscriberId, key.bundleName.c_str()); + } + } ZLOGI("emit, valSize: %{public}zu, dataSize:%{public}zu, uri:%{public}s,", val.size(), changeNode.data_.size(), DistributedData::Anonymous::Change(changeNode.uri_).c_str()); @@ -376,8 +398,18 @@ void RdbSubscriberManager::Emit(const std::string &uri, int64_t subscriberId, st SetObserverNotifyOnEnabled(val); return false; }); - SchedulerManager::GetInstance().Execute( - uri, context->currentUserId, context->calledSourceDir, context->version, context->calledBundleName); + Key executeKey(uri, subscriberId, bundleName); + SchedulerManager::GetInstance().Start(executeKey, context->currentUserId, metaData); +} + +DistributedData::StoreMetaData RdbSubscriberManager::GenMetaDataFromContext(const std::shared_ptr context) +{ + DistributedData::StoreMetaData metaData; + metaData.tokenId = context->calledTokenId; + metaData.dataDir = context->calledSourceDir; + metaData.storeId = context->calledStoreName; + metaData.haMode = context->haMode; + return metaData; } RdbSubscriberManager::ObserverNode::ObserverNode(const sptr &observer, uint32_t firstCallerTokenId, uint32_t callerTokenId) diff --git a/services/distributeddataservice/service/data_share/subscriber_managers/rdb_subscriber_manager.h b/services/distributeddataservice/service/data_share/subscriber_managers/rdb_subscriber_manager.h index 751d64d5d762fb199d67337f40fd2637acfcaf14..8b060405a60b62cc6aec249f74284f5bef3e3149 100644 --- a/services/distributeddataservice/service/data_share/subscriber_managers/rdb_subscriber_manager.h +++ b/services/distributeddataservice/service/data_share/subscriber_managers/rdb_subscriber_manager.h @@ -64,7 +64,8 @@ public: void Emit(const std::string &uri, int64_t subscriberId, std::shared_ptr context); void Emit(const std::string &uri, std::shared_ptr context); void Emit(const std::string &uri, int32_t userId, DistributedData::StoreMetaData &metaData); - void EmitByKey(const Key &key, int32_t userId, const std::string &rdbPath, int version); + void EmitByKey(const Key &key, int32_t userId, const DistributedData::StoreMetaData &metaData); + DistributedData::StoreMetaData GenMetaDataFromContext(const std::shared_ptr context); std::vector GetKeysByUri(const std::string &uri); void Clear(); diff --git a/services/distributeddataservice/service/test/BUILD.gn b/services/distributeddataservice/service/test/BUILD.gn index cb771a28c1506f2617e452623620b91a5578ef8c..522094d5dbb4c6a2f809227f905ed0b07bf333a9 100644 --- a/services/distributeddataservice/service/test/BUILD.gn +++ b/services/distributeddataservice/service/test/BUILD.gn @@ -884,6 +884,7 @@ ohos_unittest("DataShareServiceImplTest") { "${data_service_path}/service/data_share/sys_event_subscriber.cpp", "${data_service_path}/service/kvdb/user_delegate.cpp", "${data_service_path}/service/permission/src/permit_delegate.cpp", + "data_share_obs_proxy_test.cpp", "data_share_profile_config_test.cpp", "data_share_service_impl_test.cpp", "data_share_service_stub_test.cpp", diff --git a/services/distributeddataservice/service/test/data_share_obs_proxy_test.cpp b/services/distributeddataservice/service/test/data_share_obs_proxy_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ec335f6ee8a4376ed4e6785d6197421eaa866bf0 --- /dev/null +++ b/services/distributeddataservice/service/test/data_share_obs_proxy_test.cpp @@ -0,0 +1,318 @@ +/* +* Copyright (c) 2024 Huawei Device Co., Ltd. +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#define LOG_TAG "DataShareObsProxyTest" + +#include +#include + +#include "data_share_obs_proxy.h" +#include "datashare_errno.h" +#include "log_print.h" + +namespace OHOS::Test { +using namespace testing::ext; +using namespace OHOS::DataShare; +std::string BUNDLE_NAME = "ohos.datasharetest.demo"; +constexpr int64_t TEST_SUB_ID = 100; + +class DataShareObsProxyTest : public testing::Test { +public: + static void SetUpTestCase(void){}; + static void TearDownTestCase(void){}; + void SetUp(){}; + void TearDown(){}; +}; + +RdbChangeNode SampleRdbChangeNode() +{ + TemplateId tplId; + tplId.subscriberId_ = TEST_SUB_ID; + tplId.bundleName_ = BUNDLE_NAME; + + RdbChangeNode node; + node.uri_ = std::string(""); + node.templateId_ = tplId; + node.data_ = std::vector(); + node.isSharedMemory_ = false; + node.memory_ = nullptr; + node.size_ = 0; + + return node; +} + +/** +* @tc.name: CreateAshmem +* @tc.desc: test CreateAshmem function +* @tc.type: FUNC +* @tc.require: +*/ +HWTEST_F(DataShareObsProxyTest, CreateAshmem, TestSize.Level1) +{ + ZLOGI("CreateAshmem starts"); + RdbChangeNode node = SampleRdbChangeNode(); + + OHOS::sptr fake = nullptr; + RdbObserverProxy proxy(fake); + int ret = proxy.CreateAshmem(node); + EXPECT_EQ(ret, DataShare::E_OK); + EXPECT_NE(node.memory_, nullptr); + ZLOGI("CreateAshmem ends"); +} + +/** +* @tc.name: WriteAshmem001 +* @tc.desc: test WriteAshmem function. Write an int +* @tc.type: FUNC +* @tc.require: +*/ +HWTEST_F(DataShareObsProxyTest, WriteAshmem001, TestSize.Level1) +{ + ZLOGI("WriteAshmem001 starts"); + RdbChangeNode node = SampleRdbChangeNode(); + OHOS::sptr fake = nullptr; + RdbObserverProxy proxy(fake); + int retCreate = proxy.CreateAshmem(node); + EXPECT_EQ(retCreate, DataShare::E_OK); + + int len = 10; + int intLen = 4; + int offset = 0; + int ret = proxy.WriteAshmem(node, (void *)&len, intLen, offset); + EXPECT_EQ(ret, E_OK); + EXPECT_EQ(offset, intLen); + + // read from the start + const void *read = node.memory_->ReadFromAshmem(intLen, 0); + EXPECT_NE(read, nullptr); + int lenRead = *reinterpret_cast(read); + EXPECT_EQ(len, lenRead); + ZLOGI("WriteAshmem001 ends"); +} + +/** +* @tc.name: WriteAshmem002 +* @tc.desc: test WriteAshmem function. Write a str +* @tc.type: FUNC +* @tc.require: +*/ +HWTEST_F(DataShareObsProxyTest, WriteAshmem002, TestSize.Level1) +{ + ZLOGI("WriteAshmem002 starts"); + RdbChangeNode node = SampleRdbChangeNode(); + OHOS::sptr fake = nullptr; + RdbObserverProxy proxy(fake); + int retCreate = proxy.CreateAshmem(node); + EXPECT_EQ(retCreate, DataShare::E_OK); + + std::string string("Hello World"); + const char *str = string.c_str(); + int len = string.length(); + int offset = 0; + int ret = proxy.WriteAshmem(node, (void *)str, len, offset); + EXPECT_EQ(ret, E_OK); + EXPECT_EQ(offset, len); + + // read from the start + const void *read = node.memory_->ReadFromAshmem(len, 0); + EXPECT_NE(read, nullptr); + const char *strRead = reinterpret_cast(read); + std::string stringRead(strRead, len); + EXPECT_EQ(stringRead, string); + ZLOGI("WriteAshmem002 ends"); +} + +/** +* @tc.name: WriteAshmem003 +* @tc.desc: test WriteAshmem function with error +* @tc.type: FUNC +* @tc.require: +*/ +HWTEST_F(DataShareObsProxyTest, WriteAshmem003, TestSize.Level1) +{ + ZLOGI("WriteAshmem003 starts"); + RdbChangeNode node = SampleRdbChangeNode(); + OHOS::sptr fake = nullptr; + RdbObserverProxy proxy(fake); + + OHOS::sptr memory = Ashmem::CreateAshmem("WriteAshmem003", 2); + EXPECT_NE(memory, nullptr); + bool mapRet = memory->MapReadAndWriteAshmem(); + ASSERT_TRUE(mapRet); + node.memory_ = memory; + + int len = 10; + int offset = 0; + int ret = proxy.WriteAshmem(node, (void *)&len, 4, offset); + EXPECT_EQ(ret, E_ERROR); + ZLOGI("WriteAshmem003 ends"); +} + +/** +* @tc.name: SerializeDataIntoAshmem +* @tc.desc: test SerializeDataIntoAshmem function +* @tc.type: FUNC +* @tc.require: +*/ +HWTEST_F(DataShareObsProxyTest, SerializeDataIntoAshmem, TestSize.Level1) +{ + ZLOGI("SerializeDataIntoAshmem starts"); + RdbChangeNode node = SampleRdbChangeNode(); + + OHOS::sptr fake = nullptr; + RdbObserverProxy proxy(fake); + + int retCreate = proxy.CreateAshmem(node); + EXPECT_EQ(retCreate, E_OK); + + // Push three times + node.data_.push_back(BUNDLE_NAME); + node.data_.push_back(BUNDLE_NAME); + node.data_.push_back(BUNDLE_NAME); + + int intLen = 4; + // item length size + (str length size + str length) * 3 + int offset = intLen + (intLen + strlen(BUNDLE_NAME.c_str())) * 3; + int retSe = proxy.SerializeDataIntoAshmem(node); + EXPECT_EQ(retSe, E_OK); + EXPECT_EQ(node.size_, offset); + + offset = 0; + const void *vecLenRead = node.memory_->ReadFromAshmem(intLen, offset); + EXPECT_NE(vecLenRead, nullptr); + int vecLen = *reinterpret_cast(vecLenRead); + EXPECT_EQ(vecLen, 3); + offset += intLen; + + // 3 strings in the vec + for (int i = 0; i < 3; i++) { + const void *strLenRead = node.memory_->ReadFromAshmem(intLen, offset); + EXPECT_NE(strLenRead, nullptr); + int strLen = *reinterpret_cast(strLenRead); + EXPECT_EQ(strLen, BUNDLE_NAME.length()); + offset += intLen; + + const void *strRead = node.memory_->ReadFromAshmem(strLen, offset); + EXPECT_NE(strRead, nullptr); + const char *str = reinterpret_cast(strRead); + std::string stringRead(str, strLen); + EXPECT_EQ(stringRead, BUNDLE_NAME); + offset += strLen; + } + ZLOGI("SerializeDataIntoAshmem ends"); +} + +/** +* @tc.name: SerializeDataIntoAshmem002 +* @tc.desc: test SerializeDataIntoAshmem function +* @tc.type: FUNC +* @tc.require: +*/ +HWTEST_F(DataShareObsProxyTest, SerializeDataIntoAshmem002, TestSize.Level1) +{ + ZLOGI("SerializeDataIntoAshmem starts"); + RdbChangeNode node = SampleRdbChangeNode(); + + OHOS::sptr fake = nullptr; + RdbObserverProxy proxy(fake); + + // Push three times + node.data_.push_back(BUNDLE_NAME); + node.data_.push_back(BUNDLE_NAME); + node.data_.push_back(BUNDLE_NAME); + + // memory too small for vec length + OHOS::sptr memory = Ashmem::CreateAshmem("SerializeDataIntoAshmem002", 2); + EXPECT_NE(memory, nullptr); + bool mapRet = memory->MapReadAndWriteAshmem(); + ASSERT_TRUE(mapRet); + node.memory_ = memory; + + int retSe = proxy.SerializeDataIntoAshmem(node); + EXPECT_EQ(retSe, E_ERROR); + EXPECT_EQ(node.size_, 0); + EXPECT_EQ(node.data_.size(), 3); + ASSERT_FALSE(node.isSharedMemory_); + + // memory too small for string length + OHOS::sptr memory2 = Ashmem::CreateAshmem("SerializeDataIntoAshmem002", 6); + EXPECT_NE(memory2, nullptr); + mapRet = memory2->MapReadAndWriteAshmem(); + ASSERT_TRUE(mapRet); + node.memory_ = memory2; + + retSe = proxy.SerializeDataIntoAshmem(node); + EXPECT_EQ(retSe, E_ERROR); + EXPECT_EQ(node.size_, 0); + EXPECT_EQ(node.data_.size(), 3); + ASSERT_FALSE(node.isSharedMemory_); + + // memory too small for string + OHOS::sptr memory3 = Ashmem::CreateAshmem("SerializeDataIntoAshmem002", 10); + EXPECT_NE(memory3, nullptr); + mapRet = memory3->MapReadAndWriteAshmem(); + ASSERT_TRUE(mapRet); + node.memory_ = memory3; + + retSe = proxy.SerializeDataIntoAshmem(node); + EXPECT_EQ(retSe, E_ERROR); + EXPECT_EQ(node.size_, 0); + EXPECT_EQ(node.data_.size(), 3); + ASSERT_FALSE(node.isSharedMemory_); + + ZLOGI("SerializeDataIntoAshmem002 ends"); +} + +/** +* @tc.name: PreparationData +* @tc.desc: test PrepareRdbChangeNodeData function +* @tc.type: FUNC +* @tc.require: +*/ +HWTEST_F(DataShareObsProxyTest, PreparationData, TestSize.Level1) +{ + ZLOGI("PreparationData starts"); + RdbChangeNode node = SampleRdbChangeNode(); + OHOS::sptr fake = nullptr; + RdbObserverProxy proxy(fake); + + // Push three times, less than 200k + node.data_.push_back(BUNDLE_NAME); + node.data_.push_back(BUNDLE_NAME); + node.data_.push_back(BUNDLE_NAME); + + int ret = proxy.PrepareRdbChangeNodeData(node); + EXPECT_EQ(ret, E_OK); + EXPECT_EQ(node.data_.size(), 3); + EXPECT_FALSE(node.isSharedMemory_); + + // Try to fake a 200k data. BUNDLE_NAME is 23 byte long and 7587 BUNDLE_NAMEs is over 200k. + for (int i = 0; i < 7587; i++) { + node.data_.push_back(BUNDLE_NAME); + } + ret = proxy.PrepareRdbChangeNodeData(node); + EXPECT_EQ(ret, E_OK); + EXPECT_EQ(node.data_.size(), 0); + EXPECT_TRUE(node.isSharedMemory_); + + // Try to fake data over 10M. Write data of such size should fail because it exceeds the limit. + for (int i = 0; i < 388362; i++) { + node.data_.push_back(BUNDLE_NAME); + } + ret = proxy.PrepareRdbChangeNodeData(node); + EXPECT_EQ(ret, E_ERROR); + + ZLOGI("PreparationData ends"); +} +} // namespace OHOS::Test \ No newline at end of file diff --git a/services/distributeddataservice/service/test/data_share_service_impl_test.cpp b/services/distributeddataservice/service/test/data_share_service_impl_test.cpp index d43d2e6700d91bb36d964a7b7ee63850072dd7a2..1eb3b763507812e74bb0b7cf17052a40a1a78112 100644 --- a/services/distributeddataservice/service/test/data_share_service_impl_test.cpp +++ b/services/distributeddataservice/service/test/data_share_service_impl_test.cpp @@ -23,6 +23,7 @@ #include "accesstoken_kit.h" #include "hap_token_info.h" #include "iservice_registry.h" +#include "log_print.h" #include "system_ability_definition.h" #include "token_setproc.h" #include "data_share_service_impl.h"