diff --git a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp index a22f43ddb7f1aa107947bf16e47510c01ce71096..629b6be6fd76f85e2c222d56b131e72a3a27cdd2 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp +++ b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp @@ -103,7 +103,7 @@ RdbServiceImpl::Factory::~Factory() { } -RdbServiceImpl::RdbServiceImpl() +RdbServiceImpl::RdbServiceImpl() : eventContainer_(std::make_shared()) { ZLOGI("construct"); DistributedDB::RelationalStoreManager::SetAutoLaunchRequestCallback( @@ -1537,6 +1537,35 @@ int32_t RdbServiceImpl::NotifyDataChange(const RdbSyncerParam ¶m, const RdbC return RDB_OK; } +void RdbServiceImpl::GlobalEvent::AddEvent(const std::string& path, + const DistributedData::DataChangeEvent::EventInfo& newEvent) +{ + std::lock_guard lock(mutex); + auto& storedEvent = events_[path]; + for (const auto& [tableName, properties] : newEvent.tableProperties) { + auto& globalProps = storedEvent.tableProperties[tableName]; + globalProps.isTrackedDataChange |= properties.isTrackedDataChange; + globalProps.isP2pSyncDataChange |= properties.isP2pSyncDataChange; + ZLOGD("table:%{public}s, the isTrackedDataChange is %{public}d", + Anonymous::Change(tableName).c_str(), globalProps.isTrackedDataChange); + } + storedEvent.isFull |= newEvent.isFull; +} + +std::optional RdbServiceImpl::GlobalEvent::StealEvent( + const std::string& path) +{ + std::lock_guard lock(mutex); + auto it = events_.find(path); + if (it == events_.end()) { + ZLOGE("The events to the path:%{public}s does not exist", Anonymous::Change(path).c_str()); + return std::nullopt; + } + auto eventInfo = std::move(it->second); + events_.erase(it); + return eventInfo; +} + void RdbServiceImpl::PostHeartbeatTask(int32_t pid, uint32_t delay, StoreInfo &storeInfo, DataChangeEvent::EventInfo &eventInfo) { @@ -1555,14 +1584,25 @@ void RdbServiceImpl::PostHeartbeatTask(int32_t pid, uint32_t delay, StoreInfo &s return !tasks.empty(); } - if (taskId == ExecutorPool::INVALID_TASK_ID) { - auto task = [storeInfo, eventInfo]() mutable { - auto evt = std::make_unique(storeInfo, eventInfo); + eventContainer_->AddEvent(storeInfo.path, eventInfo); + auto weakContainer = std::weak_ptr(eventContainer_); + auto path = storeInfo.path; + auto task = [info = storeInfo, path, weakContainer]() mutable { + auto container = weakContainer.lock(); + if (container == nullptr) { + ZLOGW("GlobalEvent container has been destroyed"); + return; + } + if (auto eventOpt = container->StealEvent(path)) { + auto evt = std::make_unique(std::move(info), std::move(*eventOpt)); EventCenter::GetInstance().PostEvent(std::move(evt)); - }; + } + }; + if (taskId == ExecutorPool::INVALID_TASK_ID) { taskId = executors_->Schedule(std::chrono::milliseconds(delay), task); } else { - taskId = executors_->Reset(taskId, std::chrono::milliseconds(delay)); + executors_->Remove(taskId); + taskId = executors_->Schedule(std::chrono::milliseconds(delay), task); } tasks.insert_or_assign(storeInfo.path, taskId); return true; diff --git a/services/distributeddataservice/service/rdb/rdb_service_impl.h b/services/distributeddataservice/service/rdb/rdb_service_impl.h index bfb2ffbb2171238c31ea7308052ac848a8899c21..6db3b343dff14ce0b45e74e3a27b947fc51ee0e2 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_impl.h +++ b/services/distributeddataservice/service/rdb/rdb_service_impl.h @@ -146,6 +146,14 @@ private: }; using SyncAgents = std::map; + struct GlobalEvent { + void AddEvent(const std::string& path, const DistributedData::DataChangeEvent::EventInfo& eventInfo); + std::optional StealEvent(const std::string& path); + private: + std::mutex mutex; + std::map events_; + }; + class RdbStatic : public StaticActs { public: ~RdbStatic() override {}; @@ -281,6 +289,7 @@ private: static Factory factory_; ConcurrentMap syncAgents_; std::shared_ptr executors_; + std::shared_ptr eventContainer_; ConcurrentMap> heartbeatTaskIds_; LRUBucket specialChannels_ { 10 }; diff --git a/services/distributeddataservice/service/test/rdb_service_impl_test.cpp b/services/distributeddataservice/service/test/rdb_service_impl_test.cpp index 148ef2d41c6eccf41d0ea99882d5746e0e81979f..3cc9f85b7ae8fa694304367dce0891dfeaa0104e 100644 --- a/services/distributeddataservice/service/test/rdb_service_impl_test.cpp +++ b/services/distributeddataservice/service/test/rdb_service_impl_test.cpp @@ -2759,5 +2759,94 @@ HWTEST_F(RdbServiceImplTest, SaveSecretKeyMeta_CloneKeyUpdate_NoUpdate_003, Test EXPECT_EQ(MetaDataManager::GetInstance().DelMeta(meta.GetCloneSecretKey(), true), true); EXPECT_EQ(MetaDataManager::GetInstance().DelMeta(meta.GetKey(), true), true); } + +/** + * @tc.name: PostHeartbeatTask001 + * @tc.desc: Test that tasks does not contain path and delay == 0. + * @tc.type: FUNC + * @tc.expect: The taskId is invalid in PostHeartbeatTask + */ +HWTEST_F(RdbServiceImplTest, PostHeartbeatTask001, TestSize.Level0) +{ + int32_t callingPid = 123; + uint32_t delay = 0; + DistributedData::StoreInfo storeInfo; + DataChangeEvent::EventInfo eventInfo; + storeInfo.path = "/test/path"; + RdbServiceImpl service; + service.PostHeartbeatTask(callingPid, delay, storeInfo, eventInfo); + auto it = service.heartbeatTaskIds_.Find(callingPid); + auto taskId = it.second[storeInfo.path]; + EXPECT_EQ(taskId, ExecutorPool::INVALID_TASK_ID); +} + +/** + * @tc.name: PostHeartbeatTask002 + * @tc.desc: Test that tasks does not contain path and delay != 0. + * @tc.type: FUNC + * @tc.expect: The taskId is invalid in PostHeartbeatTask + */ +HWTEST_F(RdbServiceImplTest, PostHeartbeatTask002, TestSize.Level0) +{ + int32_t callingPid = 123; + uint32_t delay = 1000; + DistributedData::StoreInfo storeInfo; + DataChangeEvent::EventInfo eventInfo; + storeInfo.path = "/test/path"; + RdbServiceImpl service; + service.executors_ = std::make_shared(2, 0); + service.PostHeartbeatTask(callingPid, delay, storeInfo, eventInfo); + auto it = service.heartbeatTaskIds_.Find(callingPid); + auto taskId = it.second[storeInfo.path]; + EXPECT_NE(taskId, ExecutorPool::INVALID_TASK_ID); + service.executors_->Remove(taskId); +} + +/** + * @tc.name: PostHeartbeatTask003 + * @tc.desc: Test if the task already exists, delay is not 0. + * @tc.type: FUNC + * @tc.expect: The tableProperties value in the global variable changes + */ +HWTEST_F(RdbServiceImplTest, PostHeartbeatTask003, TestSize.Level0) +{ + int32_t callingPid = 456; + uint32_t delay = 1000; + DistributedData::StoreInfo storeInfo; + DataChangeEvent::EventInfo eventInfo; + eventInfo.isFull = true; + eventInfo.tableProperties["table1"] = {1, 0}; + storeInfo.path = "/test/path"; + RdbServiceImpl service; + service.executors_ = std::make_shared(2, 0); + service.PostHeartbeatTask(callingPid, delay, storeInfo, eventInfo); + DataChangeEvent::EventInfo eventInfo_again; + eventInfo_again.isFull = false; + eventInfo_again.tableProperties["table1"] = {0, 0}; + eventInfo_again.tableProperties["table2"] = {1, 0}; + + service.PostHeartbeatTask(callingPid, delay, storeInfo, eventInfo); + auto globalEvents = service.eventContainer_->events_[storeInfo.path]; + EXPECT_EQ(globalEvents.tableProperties["table1"].isTrackedDataChange, 1); + EXPECT_EQ(globalEvents.tableProperties["table1"].isP2pSyncDataChange, 0); + auto it = service.heartbeatTaskIds_.Find(callingPid); + auto taskId = it.second[storeInfo.path]; + service.executors_->Remove(taskId); +} + +/** + * @tc.name: StealEvent001 + * @tc.desc: Test path is not in events_. + * @tc.type: FUNC + * @tc.expect: StealEvent returns nullopt + */ +HWTEST_F(RdbServiceImplTest, StealEvent001, TestSize.Level0) +{ + const std::string testPath = "/test/path"; + DataChangeEvent::EventInfo testEventInfo; + RdbServiceImpl service; + auto result = service.eventContainer_->StealEvent(testPath); + EXPECT_EQ(result, std::nullopt); +} } // namespace DistributedRDBTest } // namespace OHOS::Test