From c19c28da824c9e713097ef6789c21590688e056c Mon Sep 17 00:00:00 2001 From: wangyingli Date: Mon, 8 Sep 2025 19:37:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B910s=E5=85=9C=E5=BA=95,?= =?UTF-8?q?=E9=98=B2=E6=AD=A2=E4=BB=BB=E5=8A=A1=E4=B8=A2=E5=A4=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: wangyingli --- .../service/rdb/rdb_service_impl.cpp | 52 +++++++++-- .../service/rdb/rdb_service_impl.h | 9 ++ .../service/test/rdb_service_impl_test.cpp | 89 +++++++++++++++++++ 3 files changed, 144 insertions(+), 6 deletions(-) diff --git a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp index a22f43ddb..629b6be6f 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp +++ b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp @@ -103,7 +103,7 @@ RdbServiceImpl::Factory::~Factory() { } -RdbServiceImpl::RdbServiceImpl() +RdbServiceImpl::RdbServiceImpl() : eventContainer_(std::make_shared()) { ZLOGI("construct"); DistributedDB::RelationalStoreManager::SetAutoLaunchRequestCallback( @@ -1537,6 +1537,35 @@ int32_t RdbServiceImpl::NotifyDataChange(const RdbSyncerParam ¶m, const RdbC return RDB_OK; } +void RdbServiceImpl::GlobalEvent::AddEvent(const std::string& path, + const DistributedData::DataChangeEvent::EventInfo& newEvent) +{ + std::lock_guard lock(mutex); + auto& storedEvent = events_[path]; + for (const auto& [tableName, properties] : newEvent.tableProperties) { + auto& globalProps = storedEvent.tableProperties[tableName]; + globalProps.isTrackedDataChange |= properties.isTrackedDataChange; + globalProps.isP2pSyncDataChange |= properties.isP2pSyncDataChange; + ZLOGD("table:%{public}s, the isTrackedDataChange is %{public}d", + Anonymous::Change(tableName).c_str(), globalProps.isTrackedDataChange); + } + storedEvent.isFull |= newEvent.isFull; +} + +std::optional RdbServiceImpl::GlobalEvent::StealEvent( + const std::string& path) +{ + std::lock_guard lock(mutex); + auto it = events_.find(path); + if (it == events_.end()) { + ZLOGE("The events to the path:%{public}s does not exist", Anonymous::Change(path).c_str()); + return std::nullopt; + } + auto eventInfo = std::move(it->second); + events_.erase(it); + return eventInfo; +} + void RdbServiceImpl::PostHeartbeatTask(int32_t pid, uint32_t delay, StoreInfo &storeInfo, DataChangeEvent::EventInfo &eventInfo) { @@ -1555,14 +1584,25 @@ void RdbServiceImpl::PostHeartbeatTask(int32_t pid, uint32_t delay, StoreInfo &s return !tasks.empty(); } - if (taskId == ExecutorPool::INVALID_TASK_ID) { - auto task = [storeInfo, eventInfo]() mutable { - auto evt = std::make_unique(storeInfo, eventInfo); + eventContainer_->AddEvent(storeInfo.path, eventInfo); + auto weakContainer = std::weak_ptr(eventContainer_); + auto path = storeInfo.path; + auto task = [info = storeInfo, path, weakContainer]() mutable { + auto container = weakContainer.lock(); + if (container == nullptr) { + ZLOGW("GlobalEvent container has been destroyed"); + return; + } + if (auto eventOpt = container->StealEvent(path)) { + auto evt = std::make_unique(std::move(info), std::move(*eventOpt)); EventCenter::GetInstance().PostEvent(std::move(evt)); - }; + } + }; + if (taskId == ExecutorPool::INVALID_TASK_ID) { taskId = executors_->Schedule(std::chrono::milliseconds(delay), task); } else { - taskId = executors_->Reset(taskId, std::chrono::milliseconds(delay)); + executors_->Remove(taskId); + taskId = executors_->Schedule(std::chrono::milliseconds(delay), task); } tasks.insert_or_assign(storeInfo.path, taskId); return true; diff --git a/services/distributeddataservice/service/rdb/rdb_service_impl.h b/services/distributeddataservice/service/rdb/rdb_service_impl.h index bfb2ffbb2..6db3b343d 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_impl.h +++ b/services/distributeddataservice/service/rdb/rdb_service_impl.h @@ -146,6 +146,14 @@ private: }; using SyncAgents = std::map; + struct GlobalEvent { + void AddEvent(const std::string& path, const DistributedData::DataChangeEvent::EventInfo& eventInfo); + std::optional StealEvent(const std::string& path); + private: + std::mutex mutex; + std::map events_; + }; + class RdbStatic : public StaticActs { public: ~RdbStatic() override {}; @@ -281,6 +289,7 @@ private: static Factory factory_; ConcurrentMap syncAgents_; std::shared_ptr executors_; + std::shared_ptr eventContainer_; ConcurrentMap> heartbeatTaskIds_; LRUBucket specialChannels_ { 10 }; diff --git a/services/distributeddataservice/service/test/rdb_service_impl_test.cpp b/services/distributeddataservice/service/test/rdb_service_impl_test.cpp index 148ef2d41..3cc9f85b7 100644 --- a/services/distributeddataservice/service/test/rdb_service_impl_test.cpp +++ b/services/distributeddataservice/service/test/rdb_service_impl_test.cpp @@ -2759,5 +2759,94 @@ HWTEST_F(RdbServiceImplTest, SaveSecretKeyMeta_CloneKeyUpdate_NoUpdate_003, Test EXPECT_EQ(MetaDataManager::GetInstance().DelMeta(meta.GetCloneSecretKey(), true), true); EXPECT_EQ(MetaDataManager::GetInstance().DelMeta(meta.GetKey(), true), true); } + +/** + * @tc.name: PostHeartbeatTask001 + * @tc.desc: Test that tasks does not contain path and delay == 0. + * @tc.type: FUNC + * @tc.expect: The taskId is invalid in PostHeartbeatTask + */ +HWTEST_F(RdbServiceImplTest, PostHeartbeatTask001, TestSize.Level0) +{ + int32_t callingPid = 123; + uint32_t delay = 0; + DistributedData::StoreInfo storeInfo; + DataChangeEvent::EventInfo eventInfo; + storeInfo.path = "/test/path"; + RdbServiceImpl service; + service.PostHeartbeatTask(callingPid, delay, storeInfo, eventInfo); + auto it = service.heartbeatTaskIds_.Find(callingPid); + auto taskId = it.second[storeInfo.path]; + EXPECT_EQ(taskId, ExecutorPool::INVALID_TASK_ID); +} + +/** + * @tc.name: PostHeartbeatTask002 + * @tc.desc: Test that tasks does not contain path and delay != 0. + * @tc.type: FUNC + * @tc.expect: The taskId is invalid in PostHeartbeatTask + */ +HWTEST_F(RdbServiceImplTest, PostHeartbeatTask002, TestSize.Level0) +{ + int32_t callingPid = 123; + uint32_t delay = 1000; + DistributedData::StoreInfo storeInfo; + DataChangeEvent::EventInfo eventInfo; + storeInfo.path = "/test/path"; + RdbServiceImpl service; + service.executors_ = std::make_shared(2, 0); + service.PostHeartbeatTask(callingPid, delay, storeInfo, eventInfo); + auto it = service.heartbeatTaskIds_.Find(callingPid); + auto taskId = it.second[storeInfo.path]; + EXPECT_NE(taskId, ExecutorPool::INVALID_TASK_ID); + service.executors_->Remove(taskId); +} + +/** + * @tc.name: PostHeartbeatTask003 + * @tc.desc: Test if the task already exists, delay is not 0. + * @tc.type: FUNC + * @tc.expect: The tableProperties value in the global variable changes + */ +HWTEST_F(RdbServiceImplTest, PostHeartbeatTask003, TestSize.Level0) +{ + int32_t callingPid = 456; + uint32_t delay = 1000; + DistributedData::StoreInfo storeInfo; + DataChangeEvent::EventInfo eventInfo; + eventInfo.isFull = true; + eventInfo.tableProperties["table1"] = {1, 0}; + storeInfo.path = "/test/path"; + RdbServiceImpl service; + service.executors_ = std::make_shared(2, 0); + service.PostHeartbeatTask(callingPid, delay, storeInfo, eventInfo); + DataChangeEvent::EventInfo eventInfo_again; + eventInfo_again.isFull = false; + eventInfo_again.tableProperties["table1"] = {0, 0}; + eventInfo_again.tableProperties["table2"] = {1, 0}; + + service.PostHeartbeatTask(callingPid, delay, storeInfo, eventInfo); + auto globalEvents = service.eventContainer_->events_[storeInfo.path]; + EXPECT_EQ(globalEvents.tableProperties["table1"].isTrackedDataChange, 1); + EXPECT_EQ(globalEvents.tableProperties["table1"].isP2pSyncDataChange, 0); + auto it = service.heartbeatTaskIds_.Find(callingPid); + auto taskId = it.second[storeInfo.path]; + service.executors_->Remove(taskId); +} + +/** + * @tc.name: StealEvent001 + * @tc.desc: Test path is not in events_. + * @tc.type: FUNC + * @tc.expect: StealEvent returns nullopt + */ +HWTEST_F(RdbServiceImplTest, StealEvent001, TestSize.Level0) +{ + const std::string testPath = "/test/path"; + DataChangeEvent::EventInfo testEventInfo; + RdbServiceImpl service; + auto result = service.eventContainer_->StealEvent(testPath); + EXPECT_EQ(result, std::nullopt); +} } // namespace DistributedRDBTest } // namespace OHOS::Test -- Gitee