From c1e9f95d3d505a4cc4ea61c7296277a42f89d348 Mon Sep 17 00:00:00 2001 From: changjiaxing Date: Thu, 17 Jul 2025 00:19:40 +0800 Subject: [PATCH] =?UTF-8?q?AutoCache=20RDB=E6=94=AF=E6=8C=81=E5=A4=9A?= =?UTF-8?q?=E7=BA=A7=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: changjiaxing Change-Id: I53a2ef13ff110ae8af6ed21e2c7d89087f5b5fd9 --- .../framework/include/store/general_store.h | 1 + .../framework/store/auto_cache.cpp | 1 + .../service/kvdb/kvdb_general_store.cpp | 5 ++ .../service/kvdb/kvdb_general_store.h | 1 + .../service/rdb/rdb_general_store.cpp | 77 +++++++++++++++---- .../service/rdb/rdb_general_store.h | 8 +- .../service/rdb/rdb_service_impl.cpp | 7 +- 7 files changed, 78 insertions(+), 22 deletions(-) diff --git a/services/distributeddataservice/framework/include/store/general_store.h b/services/distributeddataservice/framework/include/store/general_store.h index 0b56018c5..cc9651ab3 100644 --- a/services/distributeddataservice/framework/include/store/general_store.h +++ b/services/distributeddataservice/framework/include/store/general_store.h @@ -204,6 +204,7 @@ public: { return 0; } + virtual void Init() = 0; }; } // namespace OHOS::DistributedData #endif // OHOS_DISTRIBUTED_DATA_SERVICES_FRAMEWORK_STORE_GENERAL_STORE_H \ No newline at end of file diff --git a/services/distributeddataservice/framework/store/auto_cache.cpp b/services/distributeddataservice/framework/store/auto_cache.cpp index 9e1cbef74..a1e0a820e 100644 --- a/services/distributeddataservice/framework/store/auto_cache.cpp +++ b/services/distributeddataservice/framework/store/auto_cache.cpp @@ -109,6 +109,7 @@ std::pair AutoCache::GetDBStore(const StoreMetaData & StartTimer(); return !stores.empty(); }); + store->Init(); return { E_OK, store }; } diff --git a/services/distributeddataservice/service/kvdb/kvdb_general_store.cpp b/services/distributeddataservice/service/kvdb/kvdb_general_store.cpp index 9fd1fe9d0..3132cb557 100644 --- a/services/distributeddataservice/service/kvdb/kvdb_general_store.cpp +++ b/services/distributeddataservice/service/kvdb/kvdb_general_store.cpp @@ -210,6 +210,11 @@ KVDBGeneralStore::KVDBGeneralStore(const StoreMetaData &meta) enableCloud_ = meta.enableCloud; } +void KVDBGeneralStore::Init() +{ + +} + KVDBGeneralStore::~KVDBGeneralStore() { { diff --git a/services/distributeddataservice/service/kvdb/kvdb_general_store.h b/services/distributeddataservice/service/kvdb/kvdb_general_store.h index 80e030f05..90dea6f9f 100644 --- a/services/distributeddataservice/service/kvdb/kvdb_general_store.h +++ b/services/distributeddataservice/service/kvdb/kvdb_general_store.h @@ -82,6 +82,7 @@ public: static DBSecurity GetDBSecurity(int32_t secLevel); std::pair LockCloudDB() override; int32_t UnLockCloudDB() override; + void Init() override; private: using KvDelegate = DistributedDB::KvStoreNbDelegate; diff --git a/services/distributeddataservice/service/rdb/rdb_general_store.cpp b/services/distributeddataservice/service/rdb/rdb_general_store.cpp index 9a3ba03cb..b46aae085 100644 --- a/services/distributeddataservice/service/rdb/rdb_general_store.cpp +++ b/services/distributeddataservice/service/rdb/rdb_general_store.cpp @@ -176,20 +176,47 @@ RdbGeneralStore::DBPassword RdbGeneralStore::GetDBPassword(const StoreMetaData & return dbPassword; } +void RdbGeneralStore::AwaitInit() const +{ + if (hasInit_.load()) { + return; + } + std::unique_lock lock(initMutex_); + if (!hasInit_.load()) { + cond_.wait_for(lock, std::chrono::seconds(WAIT_TIME), [this] { return hasInit_.load(); }); + } + + if (!hasInit_.load()) { + ZLOGE("The RdbGeneralStore init timeout."); + } +} + RdbGeneralStore::RdbGeneralStore(const StoreMetaData &meta) : manager_(meta.appId, meta.user, meta.instanceId), tasks_(std::make_shared>()) { - observer_.storeId_ = meta.storeId; - observer_.meta_ = meta; - RelationalStoreDelegate::Option option = GetOption(meta); + meta_ = meta; +} + +void RdbGeneralStore::Init() +{ + if (hasInit_.load()) { + return; + } + std::lock_guard LockGuard(initMutex_); + if (hasInit_.load()) { + return; + } + observer_.storeId_ = meta_.storeId; + observer_.meta_ = meta_; + RelationalStoreDelegate::Option option = GetOption(meta_); option.observer = &observer_; - if (meta.isEncrypt) { - option.passwd = GetDBPassword(meta); - option.isEncryptedDb = meta.isEncrypt; + if (meta_.isEncrypt) { + option.passwd = GetDBPassword(meta_); + option.isEncryptedDb = meta_.isEncrypt; option.cipher = CipherType::AES_256_GCM; for (uint32_t i = 0; i < ITERS_COUNT; ++i) { option.iterateTimes = ITERS[i]; - auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_); + auto ret = manager_.OpenStore(meta_.dataDir, meta_.storeId, option, delegate_); if (ret == DBStatus::OK && delegate_ != nullptr) { break; } @@ -197,24 +224,26 @@ RdbGeneralStore::RdbGeneralStore(const StoreMetaData &meta) delegate_ = nullptr; } } else { - auto ret = manager_.OpenStore(meta.dataDir, meta.storeId, option, delegate_); + auto ret = manager_.OpenStore(meta_.dataDir, meta_.storeId, option, delegate_); if (ret != DBStatus::OK || delegate_ == nullptr) { manager_.CloseStore(delegate_); delegate_ = nullptr; } } - InitStoreInfo(meta); - if (meta.isSearchable) { + InitStoreInfo(meta_); + if (meta_.isSearchable) { syncNotifyFlag_ |= SEARCHABLE_FLAG; } - if (delegate_ != nullptr && meta.isManualClean) { - PragmaData data = static_cast(const_cast(static_cast(&meta.isManualClean))); + if (delegate_ != nullptr && meta_.isManualClean) { + PragmaData data = static_cast(const_cast(static_cast(&meta_.isManualClean))); delegate_->Pragma(PragmaCmd::LOGIC_DELETE_SYNC_DATA, data); } + hasInit_.store(true); + cond_.notify_all(); ZLOGI("Get rdb store, tokenId:%{public}u, bundleName:%{public}s, storeName:%{public}s, user:%{public}s," "isEncrypt:%{public}d, isManualClean:%{public}d, isSearchable:%{public}d", - meta.tokenId, meta.bundleName.c_str(), Anonymous::Change(meta.storeId).c_str(), meta.user.c_str(), - meta.isEncrypt, meta.isManualClean, meta.isSearchable); + meta_.tokenId, meta_.bundleName.c_str(), Anonymous::Change(meta_.storeId).c_str(), meta_.user.c_str(), + meta_.isEncrypt, meta_.isManualClean, meta_.isSearchable); } RdbGeneralStore::~RdbGeneralStore() @@ -253,7 +282,6 @@ int32_t RdbGeneralStore::Bind(const Database &database, const std::map lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("database:%{public}s already closed!", Anonymous::Change(database.name).c_str()); @@ -301,6 +330,7 @@ bool RdbGeneralStore::IsBound(uint32_t user) int32_t RdbGeneralStore::Close(bool isForce) { + AwaitInit(); { std::unique_lock lock(rwMutex_, std::chrono::seconds(isForce ? LOCK_TIMEOUT : 0)); if (!lock) { @@ -336,6 +366,7 @@ int32_t RdbGeneralStore::Close(bool isForce) int32_t RdbGeneralStore::Execute(const std::string &table, const std::string &sql) { + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("Database already closed! database:%{public}s, table:%{public}s, sql:%{public}s", @@ -407,6 +438,7 @@ int32_t RdbGeneralStore::Insert(const std::string &table, VBuckets &&values) std::vector changedData; std::vector bindArgs = ValueProxy::Convert(std::move(args)); + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("Database already closed! database:%{public}s, table:%{public}s", @@ -471,6 +503,7 @@ int32_t RdbGeneralStore::Update(const std::string &table, const std::string &set std::vector changedData; std::vector bindArgs = ValueProxy::Convert(std::move(args)); + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("Database already closed! database:%{public}s, table:%{public}s", @@ -502,6 +535,7 @@ int32_t RdbGeneralStore::Replace(const std::string &table, VBucket &&value) } std::vector changedData; std::vector bindArgs = ValueProxy::Convert(std::move(args)); + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("Database already closed! database:%{public}s, table:%{public}s", @@ -528,6 +562,7 @@ int32_t RdbGeneralStore::Delete(const std::string &table, const std::string &sql std::pair> RdbGeneralStore::Query(__attribute__((unused))const std::string &table, const std::string &sql, Values &&args) { + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str()); @@ -545,6 +580,7 @@ std::pair> RdbGeneralStore::Query(const std::st ZLOGE("not RdbQuery!"); return { GeneralError::E_INVALID_ARGS, nullptr }; } + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("Database already closed! database:%{public}s, table:%{public}s", @@ -564,6 +600,7 @@ std::pair> RdbGeneralStore::Query(const std::st int32_t RdbGeneralStore::MergeMigratedData(const std::string &tableName, VBuckets &&values) { + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("Database already closed! database:%{public}s, table:%{public}s", @@ -577,6 +614,7 @@ int32_t RdbGeneralStore::MergeMigratedData(const std::string &tableName, VBucket int32_t RdbGeneralStore::CleanTrackerData(const std::string &tableName, int64_t cursor) { + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("Database already closed! database:%{public}s, table:%{public}s", @@ -642,6 +680,7 @@ std::pair RdbGeneralStore::Sync(const Devices &devices, GenQue } auto syncMode = GeneralStore::GetSyncMode(syncParam.mode); auto dbMode = DistributedDB::SyncMode(syncMode); + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, " @@ -675,6 +714,7 @@ std::pair> RdbGeneralStore::PreSharing(GenQuery std::string sql = BuildSql(*tables.begin(), statement, rdbQuery->GetColumns()); VBuckets values; { + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str()); @@ -740,6 +780,7 @@ int32_t RdbGeneralStore::Clean(const std::vector &devices, int32_t return GeneralError::E_INVALID_ARGS; } DBStatus status = DistributedDB::DB_ERROR; + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("store already closed! devices count:%{public}zu, the 1st:%{public}s, mode:%{public}d, " @@ -930,6 +971,7 @@ int32_t RdbGeneralStore::SetReference(const std::vector &references) int32_t RdbGeneralStore::SetDistributedTables(const std::vector &tables, int32_t type, const std::vector &references) { + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("Database already closed! database:%{public}s, tables size:%{public}zu, type:%{public}d", @@ -981,6 +1023,7 @@ int32_t RdbGeneralStore::SetDistributedTables(const std::vector &ta void RdbGeneralStore::SetConfig(const StoreConfig &storeConfig) { + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("database already closed!, tableMode is :%{public}d", @@ -1001,6 +1044,7 @@ void RdbGeneralStore::SetConfig(const StoreConfig &storeConfig) int32_t RdbGeneralStore::SetTrackerTable(const std::string &tableName, const std::set &trackerColNames, const std::set &extendColNames, bool isForceUpgrade) { + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("database already closed! database:%{public}s, tables name:%{public}s", @@ -1124,6 +1168,7 @@ void RdbGeneralStore::OnSyncFinish(const StoreInfo &storeInfo, uint32_t flag, ui std::set RdbGeneralStore::GetTables() { std::set tables; + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str()); @@ -1271,6 +1316,7 @@ std::shared_ptr RdbGeneralStore::GetRdbCloud() const bool RdbGeneralStore::IsFinished(SyncId syncId) const { + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str()); @@ -1377,6 +1423,7 @@ RdbGeneralStore::DBProcessCB RdbGeneralStore::GetCB(SyncId syncId) int32_t RdbGeneralStore::UpdateDBStatus() { + AwaitInit(); std::shared_lock lock(rwMutex_); if (delegate_ == nullptr) { ZLOGE("Database already closed! database:%{public}s", Anonymous::Change(storeInfo_.storeName).c_str()); diff --git a/services/distributeddataservice/service/rdb/rdb_general_store.h b/services/distributeddataservice/service/rdb/rdb_general_store.h index 0c356bb2b..dc0cb8146 100644 --- a/services/distributeddataservice/service/rdb/rdb_general_store.h +++ b/services/distributeddataservice/service/rdb/rdb_general_store.h @@ -90,7 +90,7 @@ public: std::pair LockCloudDB() override; int32_t UnLockCloudDB() override; int32_t UpdateDBStatus() override; - + void Init() override; private: RdbGeneralStore(const RdbGeneralStore& rdbGeneralStore); RdbGeneralStore& operator=(const RdbGeneralStore& rdbGeneralStore); @@ -117,6 +117,7 @@ private: static constexpr uint32_t ITER_V1 = 5000; static constexpr uint32_t ITERS[] = {ITER_V0, ITER_V1}; static constexpr uint32_t ITERS_COUNT = sizeof(ITERS) / sizeof(ITERS[0]); + static constexpr uint32_t WAIT_TIME = 2; class ObserverProxy : public DistributedDB::StoreObserver { public: using DBChangedIF = DistributedDB::StoreChangedData; @@ -161,6 +162,7 @@ private: const DistributedData::SyncParam &syncParam, bool isPriority, DetailAsync async); void Report(const std::string &faultType, int32_t errCode, const std::string &appendix); DBPassword GetDBPassword(const StoreMetaData &data); + void AwaitInit() const; ObserverProxy observer_; RdbManager manager_; @@ -190,6 +192,10 @@ private: }; std::shared_ptr executor_ = nullptr; std::shared_ptr> tasks_; + StoreMetaData meta_; + mutable std::mutex initMutex_; + mutable std::condition_variable cond_; + std::atomic hasInit_ = false; }; } // namespace OHOS::DistributedRdb #endif // OHOS_DISTRIBUTED_DATA_DATAMGR_SERVICE_RDB_GENERAL_STORE_H \ No newline at end of file diff --git a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp index c9662f01a..a55d2f6bc 100644 --- a/services/distributeddataservice/service/rdb/rdb_service_impl.cpp +++ b/services/distributeddataservice/service/rdb/rdb_service_impl.cpp @@ -84,12 +84,7 @@ RdbServiceImpl::Factory::Factory() return product_; }); AutoCache::GetInstance().RegCreator(RDB_DEVICE_COLLABORATION, [](const StoreMetaData& metaData) -> GeneralStore* { - auto store = new (std::nothrow) RdbGeneralStore(metaData); - if (store != nullptr && !store->IsValid()) { - delete store; - store = nullptr; - } - return store; + return new (std::nothrow) RdbGeneralStore(metaData); }); staticActs_ = std::make_shared(); FeatureSystem::GetInstance().RegisterStaticActs(RdbServiceImpl::SERVICE_NAME, staticActs_); -- Gitee