diff --git a/services/distributeddataservice/libs/distributeddb/BUILD.gn b/services/distributeddataservice/libs/distributeddb/BUILD.gn index 55c3dcd34209f6e338987aa0cadf83a7b11b9399..0198fa1cc419e5881d9018a17d4af7b609dc76e0 100644 --- a/services/distributeddataservice/libs/distributeddb/BUILD.gn +++ b/services/distributeddataservice/libs/distributeddb/BUILD.gn @@ -136,6 +136,7 @@ ohos_shared_library("distributeddb") { "storage/src/generic_kvdb.cpp", "storage/src/generic_kvdb_connection.cpp", "storage/src/generic_single_ver_kv_entry.cpp", + "storage/src/iconnection.cpp", "storage/src/ikvdb_factory.cpp", "storage/src/kvdb_commit_notify_filterable_data.cpp", "storage/src/kvdb_manager.cpp", diff --git a/services/distributeddataservice/libs/distributeddb/common/include/auto_launch.h b/services/distributeddataservice/libs/distributeddb/common/include/auto_launch.h index fdb79da5f725bd07bcaf353984f4283a3a720b72..a6da51344169cd945bc5cc94bb893068e1ebb2ed 100644 --- a/services/distributeddataservice/libs/distributeddb/common/include/auto_launch.h +++ b/services/distributeddataservice/libs/distributeddb/common/include/auto_launch.h @@ -133,7 +133,7 @@ protected: void UpdateGlobalMap(std::map> &doOpenMap); - void ReceiveUnknownIdentifierCallBackTask(const std::string &identifier, const std::string userId); + void ReceiveUnknownIdentifierCallBackTask(const std::string &identifier, const std::string &userId); void ConnectionLifeCycleCallback(const std::string &identifier, const std::string &userId); @@ -143,7 +143,7 @@ protected: int AutoLaunchExt(const std::string &identifier, const std::string &userId); - void AutoLaunchExtTask(const std::string identifier, const std::string userId, AutoLaunchItem autoLaunchItem); + void AutoLaunchExtTask(const std::string &identifier, const std::string &userId, AutoLaunchItem &autoLaunchItem); void ExtObserverFunc(const KvDBCommitNotifyData ¬ifyData, const std::string &identifier, const std::string &userId); diff --git a/services/distributeddataservice/libs/distributeddb/common/include/db_constant.h b/services/distributeddataservice/libs/distributeddb/common/include/db_constant.h index a94051de65f4c06c8c2354fd00f6a60d7b492a01..6ecd3149f5fe1ebe1b1de3908f7dbe579f471110 100644 --- a/services/distributeddataservice/libs/distributeddb/common/include/db_constant.h +++ b/services/distributeddataservice/libs/distributeddb/common/include/db_constant.h @@ -130,6 +130,7 @@ public: static constexpr int RELATIONAL_LOG_TABLE_FIELD_NUM = 7; // field num is relational distributed log table + static constexpr uint64_t IGNORE_CONNECTION_ID = 0; // For relational static const std::string RELATIONAL_PREFIX; static const std::string TIMESTAMP_ALIAS; diff --git a/services/distributeddataservice/libs/distributeddb/common/include/runtime_context.h b/services/distributeddataservice/libs/distributeddb/common/include/runtime_context.h index 9b68f63f6624c9cf028c93cbe559eb69ae241648..cd09bdd169fb191167aa0e5a5d6486bf1ef31c5b 100644 --- a/services/distributeddataservice/libs/distributeddb/common/include/runtime_context.h +++ b/services/distributeddataservice/libs/distributeddb/common/include/runtime_context.h @@ -123,6 +123,9 @@ public: EventType event) = 0; virtual int NotifyUserChanged() const = 0; + + // Generate global sessionId in current process + virtual uint32_t GenerateSessionId() = 0; protected: RuntimeContext() = default; virtual ~RuntimeContext() {} diff --git a/services/distributeddataservice/libs/distributeddb/common/src/auto_launch.cpp b/services/distributeddataservice/libs/distributeddb/common/src/auto_launch.cpp index 717e3c8a352a1c4a9a1305bcb133e668e683366e..f8976ce52c801178ba36161c03cb0ae7f0938eb2 100644 --- a/services/distributeddataservice/libs/distributeddb/common/src/auto_launch.cpp +++ b/services/distributeddataservice/libs/distributeddb/common/src/auto_launch.cpp @@ -675,7 +675,7 @@ void AutoLaunch::UpdateGlobalMap(std::map autoLock(extLock_); diff --git a/services/distributeddataservice/libs/distributeddb/common/src/runtime_context_impl.cpp b/services/distributeddataservice/libs/distributeddb/common/src/runtime_context_impl.cpp index 6c95824e32f6476bd85b97da3e7aa8ee62382c4a..9414a1404475d5f013c4734b0f3a48d349fcb694 100644 --- a/services/distributeddataservice/libs/distributeddb/common/src/runtime_context_impl.cpp +++ b/services/distributeddataservice/libs/distributeddb/common/src/runtime_context_impl.cpp @@ -29,7 +29,8 @@ RuntimeContextImpl::RuntimeContextImpl() taskPoolReportsTimerId_(0), timeTickMonitor_(nullptr), systemApiAdapter_(nullptr), - lockStatusObserver_(nullptr) + lockStatusObserver_(nullptr), + currentSessionId_(1) { } @@ -654,4 +655,13 @@ int RuntimeContextImpl::NotifyUserChanged() const userChangeMonitor_->NotifyUserChanged(); return E_OK; } + +uint32_t RuntimeContextImpl::GenerateSessionId() +{ + uint32_t sessionId = currentSessionId_++; + if (sessionId == 0) { + sessionId = currentSessionId_++; + } + return sessionId; +} } // namespace DistributedDB diff --git a/services/distributeddataservice/libs/distributeddb/common/src/runtime_context_impl.h b/services/distributeddataservice/libs/distributeddb/common/src/runtime_context_impl.h index 5dd607e8422c3b042e5b610e888613f67a1b6049..cf7fb4c6f1d608229248a995ae3cb0987569cf52 100644 --- a/services/distributeddataservice/libs/distributeddb/common/src/runtime_context_impl.h +++ b/services/distributeddataservice/libs/distributeddb/common/src/runtime_context_impl.h @@ -113,6 +113,8 @@ public: EventType event) override; // Notify TIME_CHANGE_EVENT. int NotifyUserChanged() const override; + + uint32_t GenerateSessionId() override; private: static constexpr int MAX_TP_THREADS = 10; // max threads of the task pool. static constexpr int MIN_TP_THREADS = 1; // min threads of the task pool. @@ -167,6 +169,8 @@ private: mutable std::mutex userChangeMonitorLock_; std::unique_ptr userChangeMonitor_; + + std::atomic currentSessionId_; }; } // namespace DistributedDB diff --git a/services/distributeddataservice/libs/distributeddb/common/src/time_tick_monitor.cpp b/services/distributeddataservice/libs/distributeddb/common/src/time_tick_monitor.cpp index d43fadc18a55cce45fe42421a1560d7351e026fb..1bb415e6899cf18cbb78895efb8a5414f6613d4a 100644 --- a/services/distributeddataservice/libs/distributeddb/common/src/time_tick_monitor.cpp +++ b/services/distributeddataservice/libs/distributeddb/common/src/time_tick_monitor.cpp @@ -123,7 +123,13 @@ int TimeTickMonitor::TimeTick(TimerId timerId) int64_t changedOffset = systemOffset - monotonicOffset; if (std::abs(changedOffset) > MAX_NOISE) { LOGI("Local system time may be changed! changedOffset %ld", changedOffset); - timeChangedNotifier_->NotifyEvent(TIME_CHANGE_EVENT, &changedOffset); + int ret = RuntimeContext::GetInstance()->ScheduleTask([this, changedOffset](){ + int64_t offset = changedOffset; + timeChangedNotifier_->NotifyEvent(TIME_CHANGE_EVENT, &offset); + }); + if (ret != E_OK) { + LOGE("TimeTickMonitor ScheduleTask failed %d", ret); + } } return E_OK; } diff --git a/services/distributeddataservice/libs/distributeddb/common/src/user_change_monitor.cpp b/services/distributeddataservice/libs/distributeddb/common/src/user_change_monitor.cpp index 6add9c4d487da452396156e8ab20fcf72e4f4491..f670ce3182d2848be3cbf1def83b4171759e1617 100644 --- a/services/distributeddataservice/libs/distributeddb/common/src/user_change_monitor.cpp +++ b/services/distributeddataservice/libs/distributeddb/common/src/user_change_monitor.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Huawei Device Co., Ltd. + * Copyright (c) 2022 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -49,7 +49,7 @@ void UserChangeMonitor::Stop() if (!isStarted_) { return; } - if (userNotifier_ == nullptr) { + if (userNotifier_ != nullptr) { userNotifier_->UnRegisterEventType(USER_ACTIVE_EVENT); userNotifier_->UnRegisterEventType(USER_NON_ACTIVE_EVENT); userNotifier_->UnRegisterEventType(USER_ACTIVE_TO_NON_ACTIVE_EVENT); @@ -82,31 +82,30 @@ int UserChangeMonitor::PrepareNotifierChain() if (userNotifier_ != nullptr) { return E_OK; } + userNotifier_ = new (std::nothrow) NotificationChain(); if (userNotifier_ == nullptr) { - userNotifier_ = new (std::nothrow) NotificationChain(); - if (userNotifier_ == nullptr) { - return -E_OUT_OF_MEMORY; - } - errCode = userNotifier_->RegisterEventType(USER_ACTIVE_EVENT); - if (errCode != E_OK) { - RefObject::KillAndDecObjRef(userNotifier_); - userNotifier_ = nullptr; - return errCode; - } - errCode = userNotifier_->RegisterEventType(USER_NON_ACTIVE_EVENT); - if (errCode != E_OK) { - RefObject::KillAndDecObjRef(userNotifier_); - userNotifier_ = nullptr; - return errCode; - } - errCode = userNotifier_->RegisterEventType(USER_ACTIVE_TO_NON_ACTIVE_EVENT); - if (errCode != E_OK) { - RefObject::KillAndDecObjRef(userNotifier_); - userNotifier_ = nullptr; - return errCode; - } + return -E_OUT_OF_MEMORY; + } + errCode = userNotifier_->RegisterEventType(USER_ACTIVE_EVENT); + if (errCode != E_OK) { + goto ERROR_HANDLE; + } + errCode = userNotifier_->RegisterEventType(USER_NON_ACTIVE_EVENT); + if (errCode != E_OK) { + userNotifier_->UnRegisterEventType(USER_ACTIVE_EVENT); + goto ERROR_HANDLE; + } + errCode = userNotifier_->RegisterEventType(USER_ACTIVE_TO_NON_ACTIVE_EVENT); + if (errCode != E_OK) { + userNotifier_->UnRegisterEventType(USER_ACTIVE_EVENT); + userNotifier_->UnRegisterEventType(USER_NON_ACTIVE_EVENT); + goto ERROR_HANDLE; } return errCode; +ERROR_HANDLE: + RefObject::KillAndDecObjRef(userNotifier_); + userNotifier_ = nullptr; + return errCode; } void UserChangeMonitor::NotifyUserChanged() const diff --git a/services/distributeddataservice/libs/distributeddb/communicator/include/icommunicator.h b/services/distributeddataservice/libs/distributeddb/communicator/include/icommunicator.h index 84fbed366dda2158fd198e6d0aa15e18cea05bb0..0234c2dc8549c6976ed96885acaa83321ed31006 100644 --- a/services/distributeddataservice/libs/distributeddb/communicator/include/icommunicator.h +++ b/services/distributeddataservice/libs/distributeddb/communicator/include/icommunicator.h @@ -22,6 +22,7 @@ #include "ref_object.h" #include "communicator_type_define.h" #include "iprocess_communicator.h" +#include "db_properties.h" namespace DistributedDB { // inMsg is heap memory, its ownership transfers by calling OnMessageCallback @@ -35,6 +36,19 @@ struct SendConfig { ExtendInfo paramInfo; }; +inline void SetSendConfigParam(const DBProperties &dbProperty, const std::string &dstTarget, bool nonBlock, + uint32_t timeout, SendConfig &sendConf) +{ + sendConf.nonBlock = nonBlock; + sendConf.timeout = timeout; + sendConf.isNeedExtendHead = dbProperty.GetBoolProp(DBProperties::SYNC_DUAL_TUPLE_MODE, + false); + sendConf.paramInfo.appId = dbProperty.GetStringProp(DBProperties::APP_ID, ""); + sendConf.paramInfo.userId = dbProperty.GetStringProp(DBProperties::USER_ID, ""); + sendConf.paramInfo.storeId = dbProperty.GetStringProp(DBProperties::STORE_ID, ""); + sendConf.paramInfo.dstTarget = dstTarget; +} + class ICommunicator : public virtual RefObject { public: // Message heap memory @@ -65,8 +79,8 @@ public: // If send fail in SendMessage, nonBlock true will return, nonBlock false will block and retry // timeout is ignore if nonBlock true. OnSendEnd won't always be called such as when in finalize stage. // Return 0 as success. Return negative as error - virtual int SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config) = 0; - virtual int SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config, + virtual int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config) = 0; + virtual int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config, const OnSendEnd &onEnd) = 0; // HW Code Regulation do not allow to use default parameters on virtual function virtual ~ICommunicator() {}; diff --git a/services/distributeddataservice/libs/distributeddb/communicator/src/communicator.cpp b/services/distributeddataservice/libs/distributeddb/communicator/src/communicator.cpp index 4478696e4c86f9bbbed8f0361480824e946445c8..e5c97f4126738d07594a7101a7fe5464d81b48c3 100644 --- a/services/distributeddataservice/libs/distributeddb/communicator/src/communicator.cpp +++ b/services/distributeddataservice/libs/distributeddb/communicator/src/communicator.cpp @@ -95,12 +95,12 @@ bool Communicator::IsDeviceOnline(const std::string &device) const return commAggrHandle_->IsDeviceOnline(device); } -int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config) +int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config) { return SendMessage(dstTarget, inMsg, config, nullptr); } -int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config, +int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config, const OnSendEnd &onEnd) { if (dstTarget.empty() || inMsg == nullptr) { diff --git a/services/distributeddataservice/libs/distributeddb/communicator/src/communicator.h b/services/distributeddataservice/libs/distributeddb/communicator/src/communicator.h index 17c8604e29c17e125f46ed416616cfa964510608..c1f2ce39d4a8139905cfa57a6acf19e31b0e90be 100644 --- a/services/distributeddataservice/libs/distributeddb/communicator/src/communicator.h +++ b/services/distributeddataservice/libs/distributeddb/communicator/src/communicator.h @@ -51,8 +51,8 @@ public: // Get the protocol version of remote target. Return -E_NOT_FOUND if no record. int GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const override; - int SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config) override; - int SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config, + int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config) override; + int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config, const OnSendEnd &onEnd) override; // Call by CommunicatorAggregator directly diff --git a/services/distributeddataservice/libs/distributeddb/storage/include/iconnection.h b/services/distributeddataservice/libs/distributeddb/storage/include/iconnection.h new file mode 100644 index 0000000000000000000000000000000000000000..dbe63aac573ea529b0606f1537b5dd1e77d16bf6 --- /dev/null +++ b/services/distributeddataservice/libs/distributeddb/storage/include/iconnection.h @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef I_CONNECTION_H +#define I_CONNECTION_H + +#include +#include + +#include "macro_utils.h" + +namespace DistributedDB { +class IConnection { +public: + IConnection(); + virtual ~IConnection() {}; + + DISABLE_COPY_ASSIGN_MOVE(IConnection); + +protected: + uint64_t GetConnectionId(); + + std::mutex connectionIdLock_; + std::atomic connectionId_; +}; +} // namespace DistributedDB + +#endif // I_KV_DB_CONNECTION_H diff --git a/services/distributeddataservice/libs/distributeddb/storage/include/ikvdb_connection.h b/services/distributeddataservice/libs/distributeddb/storage/include/ikvdb_connection.h index 09f0e5ec7d4de4c043d5a841f8eeb6647ffff08d..b051f95bd2a3972fc9e657a8f6ff606e7be97d4d 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/include/ikvdb_connection.h +++ b/services/distributeddataservice/libs/distributeddb/storage/include/ikvdb_connection.h @@ -19,10 +19,11 @@ #include #include -#include "store_types.h" #include "db_types.h" +#include "iconnection.h" #include "macro_utils.h" #include "query.h" +#include "store_types.h" namespace DistributedDB { class IKvDB; @@ -34,7 +35,7 @@ class IKvDBResultSet; using KvDBObserverAction = std::function; using KvDBConflictAction = std::function; -class IKvDBConnection { +class IKvDBConnection : public IConnection { public: IKvDBConnection() = default; virtual ~IKvDBConnection() {}; diff --git a/services/distributeddataservice/libs/distributeddb/storage/include/relational_store_connection.h b/services/distributeddataservice/libs/distributeddb/storage/include/relational_store_connection.h index be53b2c6cc243c4a710256fffd3853c7758e19cd..4c9ab7b6d22f5ee7431f63c71d8f1104cad0ea58 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/include/relational_store_connection.h +++ b/services/distributeddataservice/libs/distributeddb/storage/include/relational_store_connection.h @@ -20,6 +20,7 @@ #include #include "db_types.h" +#include "iconnection.h" #include "macro_utils.h" #include "ref_object.h" #include "relational_store_delegate.h" @@ -27,7 +28,7 @@ namespace DistributedDB { class IRelationalStore; using RelationalObserverAction = std::function; -class RelationalStoreConnection : public virtual RefObject { +class RelationalStoreConnection : public IConnection, public virtual RefObject { public: struct SyncInfo { const std::vector &devices; diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/data_transformer.h b/services/distributeddataservice/libs/distributeddb/storage/src/data_transformer.h index 05022ff757e3ab2220cd65527444e26d7f85ba5c..7a48276f74b292b7486ada8c0df2a5833a9c59d9 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/data_transformer.h +++ b/services/distributeddataservice/libs/distributeddb/storage/src/data_transformer.h @@ -1,83 +1,82 @@ -/* - * Copyright (c) 2021 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef DATA_TRANSFORMER_H -#define DATA_TRANSFORMER_H -#ifdef RELATIONAL_STORE - -#include -#include -#include "data_value.h" -#include "db_types.h" -#include "relational_schema_object.h" - -namespace DistributedDB { -using RowData = std::vector; -using OptRowData = std::vector; - -struct LogInfo { - int64_t dataKey = -1; - std::string device; - std::string originDev; - Timestamp timestamp = 0; - Timestamp wTimestamp = 0; - uint64_t flag = 0; - Key hashKey; // primary key hash value -}; - -struct RowDataWithLog { - LogInfo logInfo; - RowData rowData; -}; - -struct OptRowDataWithLog { - LogInfo logInfo; - OptRowData optionalData; -}; - -struct TableDataWithLog { - std::string tableName; - std::vector dataList; -}; - -struct OptTableDataWithLog { - std::string tableName; - std::vector dataList; -}; - -class DataTransformer { -public: - static int TransformTableData(const TableDataWithLog &tableDataWithLog, - const std::vector &fieldInfoList, std::vector &dataItems); - static int TransformDataItem(const std::vector &dataItems, const std::vector &remoteFieldInfo, - const std::vector &localFieldInfo, OptTableDataWithLog &tableDataWithLog); - - static int SerializeDataItem(const RowDataWithLog &data, const std::vector &fieldInfo, - DataItem &dataItem); - static int DeSerializeDataItem(const DataItem &dataItem, OptRowDataWithLog &data, - const std::vector &remoteFieldInfo); - static void ReduceMapping(const std::vector &remoteFieldInfo, - const std::vector &localFieldInfo); - -private: - static int SerializeValue(Value &value, const RowData &rowData, const std::vector &fieldInfoList); - static int DeSerializeValue(const Value &value, OptRowData &optionalData, - const std::vector &remoteFieldInfo); - - static uint32_t CalDataValueLength(const DataValue &dataValue); -}; -} - -#endif +/* + * Copyright (c) 2021 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef DATA_TRANSFORMER_H +#define DATA_TRANSFORMER_H +#ifdef RELATIONAL_STORE + +#include +#include "data_value.h" +#include "db_types.h" +#include "relational_schema_object.h" + +namespace DistributedDB { +using RowData = std::vector; +using OptRowData = std::vector; + +struct LogInfo { + int64_t dataKey = -1; + std::string device; + std::string originDev; + Timestamp timestamp = 0; + Timestamp wTimestamp = 0; + uint64_t flag = 0; + Key hashKey; // primary key hash value +}; + +struct RowDataWithLog { + LogInfo logInfo; + RowData rowData; +}; + +struct OptRowDataWithLog { + LogInfo logInfo; + OptRowData optionalData; +}; + +struct TableDataWithLog { + std::string tableName; + std::vector dataList; +}; + +struct OptTableDataWithLog { + std::string tableName; + std::vector dataList; +}; + +class DataTransformer { +public: + static int TransformTableData(const TableDataWithLog &tableDataWithLog, + const std::vector &fieldInfoList, std::vector &dataItems); + static int TransformDataItem(const std::vector &dataItems, const std::vector &remoteFieldInfo, + const std::vector &localFieldInfo, OptTableDataWithLog &tableDataWithLog); + + static int SerializeDataItem(const RowDataWithLog &data, const std::vector &fieldInfo, + DataItem &dataItem); + static int DeSerializeDataItem(const DataItem &dataItem, OptRowDataWithLog &data, + const std::vector &remoteFieldInfo); + static void ReduceMapping(const std::vector &remoteFieldInfo, + const std::vector &localFieldInfo); + +private: + static int SerializeValue(Value &value, const RowData &rowData, const std::vector &fieldInfoList); + static int DeSerializeValue(const Value &value, OptRowData &optionalData, + const std::vector &remoteFieldInfo); + + static uint32_t CalDataValueLength(const DataValue &dataValue); +}; +} + +#endif #endif // DATA_TRANSFORMER_H \ No newline at end of file diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/iconnection.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/iconnection.cpp new file mode 100644 index 0000000000000000000000000000000000000000..620a9a31a62f05bc6d6defb702633309f75fd410 --- /dev/null +++ b/services/distributeddataservice/libs/distributeddb/storage/src/iconnection.cpp @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "iconnection.h" + +#include "runtime_context.h" + +namespace DistributedDB { +IConnection::IConnection() + : connectionId_(0) +{ +} + +uint64_t IConnection::GetConnectionId() +{ + if (connectionId_ != 0) { + return connectionId_; + } + std::lock_guard autoLock(connectionIdLock_); + // check again here, may be generated after get lock + if (connectionId_ == 0) { + connectionId_ = static_cast(RuntimeContext::GetInstance()->GenerateSessionId()); + } + return connectionId_; +} +} \ No newline at end of file diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/relational_store_instance.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/relational_store_instance.cpp index cb44f6fbb7b923c10aead3dd23ded92d8e4fe661..fdb04ccff9662a435a513b2557a1dc46a3e1233d 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/relational_store_instance.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/relational_store_instance.cpp @@ -79,6 +79,7 @@ static IRelationalStore *GetFromCache(const RelationalDBProperties &properties, { errCode = E_OK; std::string identifier = properties.GetStringProp(RelationalDBProperties::IDENTIFIER_DATA, ""); + std::lock_guard lockGuard(storeLock_); auto iter = dbs_.find(identifier); if (iter == dbs_.end()) { errCode = -E_NOT_FOUND; @@ -98,23 +99,17 @@ static IRelationalStore *GetFromCache(const RelationalDBProperties &properties, // Save to IKvDB to the global map void RelationalStoreInstance::RemoveKvDBFromCache(const RelationalDBProperties &properties) { - std::lock_guard lockGuard(storeLock_); std::string identifier = properties.GetStringProp(RelationalDBProperties::IDENTIFIER_DATA, ""); + std::lock_guard lockGuard(storeLock_); dbs_.erase(identifier); } void RelationalStoreInstance::SaveRelationalDBToCache(IRelationalStore *store, const RelationalDBProperties &properties) { - if (store == nullptr) { - return; - } - - { - std::string identifier = properties.GetStringProp(RelationalDBProperties::IDENTIFIER_DATA, ""); - store->WakeUpSyncer(); - if (dbs_.count(identifier) == 0) { - dbs_.insert(std::pair(identifier, store)); - } + std::string identifier = properties.GetStringProp(RelationalDBProperties::IDENTIFIER_DATA, ""); + std::lock_guard lockGuard(storeLock_); + if (dbs_.count(identifier) == 0) { + dbs_.insert(std::pair(identifier, store)); } } @@ -138,6 +133,7 @@ IRelationalStore *RelationalStoreInstance::OpenDatabase(const RelationalDBProper RefObject::KillAndDecObjRef(db); return nullptr; } + db->WakeUpSyncer(); SaveRelationalDBToCache(db, properties); return db; @@ -145,7 +141,6 @@ IRelationalStore *RelationalStoreInstance::OpenDatabase(const RelationalDBProper IRelationalStore *RelationalStoreInstance::GetDataBase(const RelationalDBProperties &properties, int &errCode) { - std::lock_guard lockGuard(storeLock_); auto *db = GetFromCache(properties, errCode); if (db != nullptr) { LOGD("Get db from cache."); @@ -172,6 +167,10 @@ RelationalStoreConnection *RelationalStoreInstance::GetDatabaseConnection(const std::string identifier = properties.GetStringProp(KvDBProperties::IDENTIFIER_DATA, ""); LOGD("Begin to get [%s] database connection.", STR_MASK(DBCommon::TransferStringToHex(identifier))); RelationalStoreInstance *manager = RelationalStoreInstance::GetInstance(); + if (manager == nullptr) { + errCode = -E_OUT_OF_MEMORY; + return nullptr; + } manager->EnterDBOpenCloseProcess(properties.GetStringProp(DBProperties::IDENTIFIER_DATA, "")); RelationalStoreConnection *connection = nullptr; std::string canonicalDir; diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/relational_sync_able_storage.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/relational_sync_able_storage.cpp index ebe347c0074ed81a78da703ca13006fead27acc5..31ae8b02982961e5bdf40f80b5d4a01c8599d6e2 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/relational_sync_able_storage.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/relational_sync_able_storage.cpp @@ -255,7 +255,7 @@ static size_t GetDataItemSerialSize(const DataItem &item, size_t appendLen) } static bool CanHoldDeletedData(const std::vector &dataItems, const DataSizeSpecInfo &dataSizeInfo, - size_t appendLen) + size_t appendLen) { bool reachThreshold = (dataItems.size() >= dataSizeInfo.packetSize); for (size_t i = 0, blockSize = 0; !reachThreshold && i < dataItems.size(); i++) { diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp index b50bc6bcc7ed7b10533224598c6ec577ce9a58dd..6708d9a5aac896b1b8779248cc3569938d30f5f3 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp @@ -265,9 +265,9 @@ void SQLiteRelationalStore::ReleaseHandle(SQLiteSingleVerRelationalStorageExecut } } -int SQLiteRelationalStore::Sync(const ISyncer::SyncParma &syncParam) +int SQLiteRelationalStore::Sync(const ISyncer::SyncParma &syncParam, uint64_t connectionId) { - return syncAbleEngine_->Sync(syncParam); + return syncAbleEngine_->Sync(syncParam, connectionId); } // Called when a connection released. @@ -490,5 +490,10 @@ RelationalDBProperties SQLiteRelationalStore::GetProperties() const { return properties_; } + +void SQLiteRelationalStore::StopSync(uint64_t connectionId) +{ + return syncAbleEngine_->StopSync(connectionId); +} } #endif \ No newline at end of file diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.h b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.h index 15933421ee93caa07a6380d6f15bccf0ab30996b..2811a6cbe0018b0b67a3184fad912352800523de 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.h +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.h @@ -41,7 +41,7 @@ public: SQLiteSingleVerRelationalStorageExecutor *GetHandle(bool isWrite, int &errCode) const; void ReleaseHandle(SQLiteSingleVerRelationalStorageExecutor *&handle) const; - int Sync(const ISyncer::SyncParma &syncParam); + int Sync(const ISyncer::SyncParma &syncParam, uint64_t connectionId); void ReleaseDBConnection(RelationalStoreConnection *connection); @@ -64,6 +64,8 @@ public: RelationalDBProperties GetProperties() const override; + void StopSync(uint64_t connectionId); + private: void ReleaseResources(); diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp index 41a30fabc1f332ea9e7916ad54ff9afd5a0c1e9c..0628f32ef384b3a223815a75fa44a8382e7c01b7 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp @@ -19,7 +19,18 @@ namespace DistributedDB { SQLiteRelationalStoreConnection::SQLiteRelationalStoreConnection(SQLiteRelationalStore *store) - : RelationalStoreConnection(store) {} + : RelationalStoreConnection(store) +{ + OnKill([this]() { + auto *store = GetDB(); + if (store == nullptr) { + return; + } + UnlockObj(); + store->StopSync(GetConnectionId()); + LockObj(); + }); +} // Close and release the connection. int SQLiteRelationalStoreConnection::Close() { @@ -193,8 +204,7 @@ int SQLiteRelationalStoreConnection::SyncToDevice(SyncInfo &info) syncParam.relationOnComplete = info.onComplete; syncParam.syncQuery = QuerySyncObject(info.query); syncParam.onFinalize = [this]() { DecObjRef(this); }; - - int errCode = store->Sync(syncParam); + int errCode = store->Sync(syncParam, GetConnectionId()); if (errCode != E_OK) { DecObjRef(this); return errCode; diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.cpp index c1d487850e76ee4eb6babbc0c58d53d3b77ac067..42bc14fe72042193529867a575ab72f0475f0a8d 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.cpp @@ -1071,15 +1071,13 @@ int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataByQuery(std::vector &dataItems, const DataItem &item, size_ const DataSizeSpecInfo &dataSizeInfo) { // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item. - dataTotalSize += SQLiteSingleVerStorageExecutor::GetDataItemSerialSize(item, appendLength); - if ((dataTotalSize > dataSizeInfo.blockSize && !dataItems.empty()) || - dataItems.size() >= dataSizeInfo.packetSize) { + size_t appendSize = dataTotalSize + SQLiteSingleVerStorageExecutor::GetDataItemSerialSize(item, appendLength); + if ((appendSize > dataSizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= dataSizeInfo.packetSize) { return -E_UNFINISHED; - } else { - dataItems.push_back(item); } + dataItems.push_back(item); + dataTotalSize = appendSize; return E_OK; } @@ -665,20 +664,19 @@ int SQLiteSingleVerStorageExecutor::GetSyncDataWithQuery(sqlite3_stmt *fullStmt, LOGE("Get next changed data failed. %d", errCode); return errCode; } - if (!isMatchItemFinished && matchItem.key == fullItem.key) { - errCode = AppendDataItem(dataItems, matchItem, dataTotalSize, appendLength, dataSizeInfo); - if (errCode == -E_UNFINISHED) { - goto END; - } - break; // step to next match data - } else { - DBCommon::CalcValueHash(fullItem.key, fullItem.key); + bool matchData = true; + if (isMatchItemFinished || matchItem.key != fullItem.key) { + matchData = false; // got miss query data + DBCommon::CalcValueHash(fullItem.key, fullItem.key); // set and send key with hash_key Value().swap(fullItem.value); // not send value when data miss query - fullItem.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY; - errCode = AppendDataItem(dataItems, fullItem, dataTotalSize, appendLength, dataSizeInfo); - if (errCode == -E_UNFINISHED) { - goto END; - } + fullItem.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY; // mark with miss query flag + } + errCode = AppendDataItem(dataItems, fullItem, dataTotalSize, appendLength, dataSizeInfo); + if (errCode == -E_UNFINISHED) { + goto END; + } + if (matchData) { + break; // step to next match data } } } diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_engine.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_engine.cpp index 5c328dca7e1a672e9f72b8ce33600ab3dcb88a2f..79b20f13c19e8e8421bb3d8803007092a31fad9b 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_engine.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_engine.cpp @@ -30,7 +30,7 @@ SyncAbleEngine::~SyncAbleEngine() {} // Start a sync action. -int SyncAbleEngine::Sync(const ISyncer::SyncParma &parm) +int SyncAbleEngine::Sync(const ISyncer::SyncParma &parm, uint64_t connectionId) { if (!started_) { StartSyncer(); @@ -38,7 +38,7 @@ int SyncAbleEngine::Sync(const ISyncer::SyncParma &parm) return -E_NOT_INIT; } } - return syncer_.Sync(parm); + return syncer_.Sync(parm, connectionId); } void SyncAbleEngine::WakeUpSyncer() @@ -69,14 +69,6 @@ int SyncAbleEngine::DisableManualSync(void) return syncer_.DisableManualSync(); } -// Stop a sync action in progress. -void SyncAbleEngine::StopSync(int syncId) -{ - if (started_) { - syncer_.RemoveSyncOperation(syncId); - } -} - // Get The current virtual timestamp uint64_t SyncAbleEngine::GetTimestamp() { @@ -141,4 +133,11 @@ int SyncAbleEngine::GetLocalIdentity(std::string &outTarget) } return syncer_.GetLocalIdentity(outTarget); } + +void SyncAbleEngine::StopSync(uint64_t connectionId) +{ + if (started_) { + syncer_.StopSync(connectionId); + } +} } diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_engine.h b/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_engine.h index 87afcc74cbc3a6a1b7c926b4a472ca0fef54e820..c1fc4e8bd1ce1c9ae8e5b2c3b3c9d26cd2ff270a 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_engine.h +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_engine.h @@ -29,7 +29,7 @@ public: void TriggerSync(int notifyEvent); // Start a sync action. - int Sync(const ISyncer::SyncParma &parm); + int Sync(const ISyncer::SyncParma &parm, uint64_t connectionId); void WakeUpSyncer(); void Close(); @@ -40,9 +40,6 @@ public: int EnableManualSync(void); int DisableManualSync(void); - // Stop a sync action in progress. - void StopSync(int syncId); - // Get The current virtual timestamp uint64_t GetTimestamp(); @@ -50,6 +47,8 @@ public: int GetLocalIdentity(std::string &outTarget); + // Stop a sync action in progress + void StopSync(uint64_t connectionId); private: // Start syncer void StartSyncer(); diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.cpp index c2c32867531c6a4da1fce9a741f8c78da5de7ff3..76b67c97a6a83f17e449425ccbb359ede2df709e 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.cpp @@ -78,7 +78,7 @@ void SyncAbleKvDB::Close() } // Start a sync action. -int SyncAbleKvDB::Sync(const ISyncer::SyncParma &parma) +int SyncAbleKvDB::Sync(const ISyncer::SyncParma &parma, uint64_t connectionId) { if (!started_) { int errCode = StartSyncer(); @@ -86,7 +86,7 @@ int SyncAbleKvDB::Sync(const ISyncer::SyncParma &parma) return errCode; } } - return syncer_.Sync(parma); + return syncer_.Sync(parma, connectionId); } void SyncAbleKvDB::EnableAutoSync(bool enable) @@ -103,10 +103,10 @@ void SyncAbleKvDB::WakeUpSyncer() } // Stop a sync action in progress. -void SyncAbleKvDB::StopSync() +void SyncAbleKvDB::StopSync(uint64_t connectionId) { if (started_) { - syncer_.StopSync(); + syncer_.StopSync(connectionId); } } diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.h b/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.h index df6f2602a03df74713a5bdbf431c2280259c4c1c..91a8d6c8d08bb5542b53ba5269d6df2e898e5034 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.h +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb.h @@ -41,13 +41,13 @@ public: void Close() override; // Start a sync action. - int Sync(const ISyncer::SyncParma &parma); + int Sync(const ISyncer::SyncParma &parma, uint64_t connectionId); // Enable auto sync void EnableAutoSync(bool enable); // Stop a sync action in progress. - void StopSync(); + void StopSync(uint64_t connectionId); // Get The current virtual timestamp uint64_t GetTimestamp(); diff --git a/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb_connection.cpp b/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb_connection.cpp index dbcf77fd9f6a47d81eff7aa6dbe4e9d5b111f98b..ca10da6dba362090e65b2c659c4f3e7a3e2fe2d7 100644 --- a/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb_connection.cpp +++ b/services/distributeddataservice/libs/distributeddb/storage/src/sync_able_kvdb_connection.cpp @@ -20,6 +20,7 @@ #include "db_constant.h" #include "kvdb_pragma.h" #include "performance_analysis.h" +#include "runtime_context.h" #include "sync_able_kvdb.h" namespace DistributedDB { @@ -34,7 +35,7 @@ SyncAbleKvDBConnection::SyncAbleKvDBConnection(SyncAbleKvDB *kvDB) } // Drop the lock before we call RemoveSyncOperation(). UnlockObj(); - db->StopSync(); + db->StopSync(GetConnectionId()); LockObj(); }); } @@ -151,7 +152,7 @@ int SyncAbleKvDBConnection::PragmaSyncAction(const PragmaSync *syncParameter) syncParam.onFinalize = [this]() { DecObjRef(this); }; syncParam.onComplete = std::bind(&SyncAbleKvDBConnection::OnSyncComplete, this, std::placeholders::_1, syncParameter->onComplete_, syncParameter->wait_); - int errCode = kvDB->Sync(syncParam); + int errCode = kvDB->Sync(syncParam, GetConnectionId()); if (errCode != E_OK) { DecObjRef(this); } diff --git a/services/distributeddataservice/libs/distributeddb/syncer/include/isyncer.h b/services/distributeddataservice/libs/distributeddb/syncer/include/isyncer.h index 3fbc70fac4f74da18e084ed4cded43b808b235bb..e02221a6508fb6eb769c3f4d4af291299964232f 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/include/isyncer.h +++ b/services/distributeddataservice/libs/distributeddb/syncer/include/isyncer.h @@ -62,12 +62,12 @@ public: const std::function &onFinalize, bool wait) = 0; // Sync function. use SyncParma to reduce parameter. - virtual int Sync(const SyncParma ¶m) = 0; + virtual int Sync(const SyncParma ¶m, uint64_t connectionId) = 0; // Remove the operation, with the given syncId, used to clean resource if sync finished or failed. virtual int RemoveSyncOperation(int syncId) = 0; - virtual int StopSync() = 0; + virtual int StopSync(uint64_t connectionId) = 0; // Get The current virtual timestamp virtual uint64_t GetTimestamp() = 0; diff --git a/services/distributeddataservice/libs/distributeddb/syncer/include/syncer_proxy.h b/services/distributeddataservice/libs/distributeddb/syncer/include/syncer_proxy.h index 8a1c8c279cd20f7af44ad10d99ef1545faf161a5..6ef417b795361df8754ce6509c9921bb59b6d8e8 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/include/syncer_proxy.h +++ b/services/distributeddataservice/libs/distributeddb/syncer/include/syncer_proxy.h @@ -46,12 +46,12 @@ public: const std::function &onFinalize, bool wait) override; // Sync function. use SyncParma to reduce parameter. - int Sync(const SyncParma ¶m) override; + int Sync(const SyncParma ¶m, uint64_t connectionId) override; // Remove the operation, with the given syncId, used to clean resource if sync finished or failed. int RemoveSyncOperation(int syncId) override; - int StopSync() override; + int StopSync(uint64_t connectionId) override; // Get The current virtual timestamp uint64_t GetTimestamp() override; diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/ability_sync.cpp b/services/distributeddataservice/libs/distributeddb/syncer/src/ability_sync.cpp index 71000def92876024a4d9882cacdc7042082d26b1..94d7510517c517ea2089fc635bb4c898ec623e30 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/ability_sync.cpp +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/ability_sync.cpp @@ -377,7 +377,7 @@ int AbilitySync::SyncStart(uint32_t sessionId, uint32_t sequenceId, uint16_t rem message->SetSessionId(sessionId); message->SetSequenceId(sequenceId); SendConfig conf; - SetSendConfig(deviceId_, false, SEND_TIME_OUT, conf); + SetSendConfigParam(storageInterface_->GetDbProperties(), deviceId_, false, SEND_TIME_OUT, conf); errCode = communicator_->SendMessage(deviceId_, message, conf, handler); if (errCode != E_OK) { LOGE("[AbilitySync][SyncStart] SendPacket failed, err %d", errCode); @@ -1112,7 +1112,7 @@ int AbilitySync::SendAck(const Message *inMsg, const AbilitySyncAckPacket &ackPa ackMessage->SetSessionId(inMsg->GetSessionId()); ackMessage->SetSequenceId(inMsg->GetSequenceId()); SendConfig conf; - SetSendConfig(deviceId_, false, SEND_TIME_OUT, conf); + SetSendConfigParam(storageInterface_->GetDbProperties(), deviceId_, false, SEND_TIME_OUT, conf); errCode = communicator_->SendMessage(deviceId_, ackMessage, conf); if (errCode != E_OK) { LOGE("[AbilitySync][SendAck] SendPacket failed, err %d", errCode); @@ -1174,18 +1174,6 @@ int AbilitySync::HandleRelationAckSchemaParam(const AbilitySyncAckPacket *recvPa return errCode; } -void AbilitySync::SetSendConfig(const std::string &dstTarget, bool nonBlock, uint32_t timeout, SendConfig &sendConf) -{ - sendConf.nonBlock = nonBlock; - sendConf.timeout = timeout; - sendConf.isNeedExtendHead = storageInterface_->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, - false); - sendConf.paramInfo.appId = storageInterface_->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, ""); - sendConf.paramInfo.userId = storageInterface_->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, ""); - sendConf.paramInfo.storeId = storageInterface_->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, ""); - sendConf.paramInfo.dstTarget = dstTarget; -} - int AbilitySync::AckRecvWithHighVersion(const Message *message, ISyncTaskContext *context, const AbilitySyncAckPacket *packet) { diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/ability_sync.h b/services/distributeddataservice/libs/distributeddb/syncer/src/ability_sync.h index 3605e617326d5f7d04a4768e3ed728da49e135af..05f0465ff8c023e569f5418e605883d40279343f 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/ability_sync.h +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/ability_sync.h @@ -235,8 +235,6 @@ private: RelationalSyncOpinion MakeRelationSyncOpnion(const AbilitySyncRequestPacket *packet, const std::string &remoteSchema) const; - void SetSendConfig(const std::string &dstTarget, bool nonBlock, uint32_t timeout, SendConfig &sendConf); - int AckRecvWithHighVersion(const Message *message, ISyncTaskContext *context, const AbilitySyncAckPacket *packet); ICommunicator *communicator_; diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/communicator_proxy.cpp b/services/distributeddataservice/libs/distributeddb/syncer/src/communicator_proxy.cpp index 3eef85f248770abb4704a31e432b58767aca2fbe..7dae8b409927d8decdc333ac7b8700fb9accf9e1 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/communicator_proxy.cpp +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/communicator_proxy.cpp @@ -193,12 +193,12 @@ int CommunicatorProxy::GetRemoteCommunicatorVersion(const std::string &target, u return -E_NOT_INIT; } -int CommunicatorProxy::SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config) +int CommunicatorProxy::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config) { return SendMessage(dstTarget, inMsg, config, nullptr); } -int CommunicatorProxy::SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config, +int CommunicatorProxy::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config, const OnSendEnd &onEnd) { ICommunicator *targetCommunicator = nullptr; diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/communicator_proxy.h b/services/distributeddataservice/libs/distributeddb/syncer/src/communicator_proxy.h index 055bffcb56e154ace6298a46ca1c599e88c7a1d1..08b0c6ad9ec33233990e8da79f73efe7af5b8652 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/communicator_proxy.h +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/communicator_proxy.h @@ -42,8 +42,8 @@ public: bool IsDeviceOnline(const std::string &device) const override; int GetLocalIdentity(std::string &outTarget) const override; int GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const override; - int SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config) override; - int SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config, + int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config) override; + int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config, const OnSendEnd &onEnd) override; // Set an Main communicator for this database, used userid & appId & storeId diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/generic_syncer.cpp b/services/distributeddataservice/libs/distributeddb/syncer/src/generic_syncer.cpp index 83dd6c1cc75488aff131dbed28f14ec0e9879541..6450c1bdc71fcdb14d0f9e2955554aa502512691 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/generic_syncer.cpp +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/generic_syncer.cpp @@ -184,6 +184,11 @@ int GenericSyncer::Sync(const InternalSyncParma ¶m) } int GenericSyncer::Sync(const SyncParma ¶m) +{ + return Sync(param, DBConstant::IGNORE_CONNECTION_ID); +} + +int GenericSyncer::Sync(const SyncParma ¶m, uint64_t connectionId) { int errCode = SyncParamCheck(param); if (errCode != E_OK) { @@ -195,7 +200,7 @@ int GenericSyncer::Sync(const SyncParma ¶m) } uint32_t syncId = GenerateSyncId(); - errCode = PrepareSync(param, syncId); + errCode = PrepareSync(param, syncId, connectionId); if (errCode != E_OK) { LOGE("[Syncer] PrepareSync failed when sync called, err %d", errCode); return errCode; @@ -204,7 +209,7 @@ int GenericSyncer::Sync(const SyncParma ¶m) return E_OK; } -int GenericSyncer::PrepareSync(const SyncParma ¶m, uint32_t syncId) +int GenericSyncer::PrepareSync(const SyncParma ¶m, uint32_t syncId, uint64_t connectionId) { auto *operation = new (std::nothrow) SyncOperation(syncId, param.devices, param.mode, param.onComplete, param.wait); @@ -217,14 +222,15 @@ int GenericSyncer::PrepareSync(const SyncParma ¶m, uint32_t syncId) std::lock_guard autoLock(syncerLock_); PerformanceAnalysis::GetInstance()->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SYNC_TOTAL); InitSyncOperation(operation, param); - LOGI("[Syncer] GenerateSyncId %d, mode = %d, wait = %d , label = %s, devices = %s", syncId, param.mode, - param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str()); + LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %s, devices = %s", syncId, param.mode, + param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str()); AddSyncOperation(operation); PerformanceAnalysis::GetInstance()->StepTimeRecordEnd(PT_TEST_RECORDS::RECORD_SYNC_TOTAL); } - if (!param.wait) { + if (!param.wait && connectionId != DBConstant::IGNORE_CONNECTION_ID) { std::lock_guard lockGuard(syncIdLock_); - syncIdList_.push_back(static_cast(syncId)); + connectionIdMap_[connectionId].push_back(static_cast(syncId)); + syncIdMap_[static_cast(syncId)] = connectionId; } if (operation->CheckIsAllFinished()) { operation->Finished(); @@ -253,20 +259,31 @@ int GenericSyncer::RemoveSyncOperation(int syncId) RefObject::KillAndDecObjRef(operation); operation = nullptr; std::lock_guard lockGuard(syncIdLock_); - syncIdList_.remove(syncId); + if (syncIdMap_.find(syncId) == syncIdMap_.end()) { + return E_OK; + } + uint64_t connectionId = syncIdMap_[syncId]; + if (connectionIdMap_.find(connectionId) != connectionIdMap_.end()) { + connectionIdMap_[connectionId].remove(syncId); + } + syncIdMap_.erase(syncId); return E_OK; } return -E_INVALID_ARGS; } -int GenericSyncer::StopSync() +int GenericSyncer::StopSync(uint64_t connectionId) { std::list syncIdList; { std::lock_guard lockGuard(syncIdLock_); - syncIdList = syncIdList_; + if (connectionIdMap_.find(connectionId) == connectionIdMap_.end()) { + return E_OK; + } + syncIdList = connectionIdMap_[connectionId]; + connectionIdMap_.erase(connectionId); } - for (const auto &syncId : syncIdList) { + for (auto syncId : syncIdList) { RemoveSyncOperation(syncId); } return E_OK; @@ -463,6 +480,11 @@ void GenericSyncer::ClearSyncOperations(bool isClosedOperation) } syncOperationMap_.clear(); } + { + std::lock_guard lock(syncIdLock_); + connectionIdMap_.clear(); + syncIdMap_.clear(); + } } void GenericSyncer::TriggerSyncFinished(SyncOperation *operation) @@ -474,10 +496,6 @@ void GenericSyncer::TriggerSyncFinished(SyncOperation *operation) void GenericSyncer::OnSyncFinished(int syncId) { - { - std::lock_guard lockGuard(syncIdLock_); - syncIdList_.remove(syncId); - } (void)(RemoveSyncOperation(syncId)); } diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/generic_syncer.h b/services/distributeddataservice/libs/distributeddb/syncer/src/generic_syncer.h index e675d4d9e3b04fc96bdad5627960c268e39f4a55..9ccfad58f9fe6ead3cb14d8dfeac84fc9cf2e65a 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/generic_syncer.h +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/generic_syncer.h @@ -51,12 +51,14 @@ public: const std::function &onFinalize, bool wait) override; // Sync function. use SyncParma to reduce parameter. - int Sync(const SyncParma ¶m) override; + int Sync(const SyncParma ¶m); + + int Sync(const SyncParma ¶m, uint64_t connectionId) override; // Remove the operation, with the given syncId, used to clean resource if sync finished or failed. int RemoveSyncOperation(int syncId) override; - int StopSync() override; + int StopSync(uint64_t connectionId) override; // Get The current virtual timestamp uint64_t GetTimestamp() override; @@ -103,7 +105,7 @@ protected: // Create a sync engine, if has memory error, will return nullptr. virtual ISyncEngine *CreateSyncEngine() = 0; - virtual int PrepareSync(const SyncParma ¶m, uint32_t syncId); + virtual int PrepareSync(const SyncParma ¶m, uint32_t syncId, uint64_t connectionId); // Add a Sync Operation, after call this function, the operation will be start virtual void AddSyncOperation(SyncOperation *operation); @@ -178,7 +180,8 @@ protected: static int currentSyncId_; static std::mutex syncIdLock_; // For sync in progress. - std::list syncIdList_; + std::map> connectionIdMap_; + std::map syncIdMap_; ISyncEngine *syncEngine_; ISyncInterface *syncInterface_; diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.cpp b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.cpp index e7c4269dbe23da6572b12474860c5ceadc042b7e..8796f514405d9a7d4bc5d1838709c9618ffc6d1a 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.cpp +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.cpp @@ -237,7 +237,7 @@ int SingleVerDataSync::Send(SingleVerSyncTaskContext *context, const Message *me startFeedDogRet = context->StartFeedDogForSync(time, SyncDirectionFlag::SEND); } SendConfig sendConfig; - SetSendConfig(context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig); + SetSendConfigParam(storage_->GetDbProperties(), context->GetDeviceId(), false, SEND_TIME_OUT, sendConfig); int errCode = communicateHandle_->SendMessage(context->GetDeviceId(), message, sendConfig, handler); if (errCode != E_OK) { LOGE("[DataSync][Send] send message failed, errCode=%d", errCode); @@ -2028,17 +2028,4 @@ void SingleVerDataSync::ClearDataMsg() { msgSchedule_.ClearMsg(); } - -void SingleVerDataSync::SetSendConfig(const std::string &dstTarget, bool nonBlock, uint32_t timeout, - SendConfig &sendConf) -{ - sendConf.nonBlock = nonBlock; - sendConf.timeout = timeout; - sendConf.isNeedExtendHead = storage_->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, - false); - sendConf.paramInfo.appId = storage_->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, ""); - sendConf.paramInfo.userId = storage_->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, ""); - sendConf.paramInfo.storeId = storage_->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, ""); - sendConf.paramInfo.dstTarget = dstTarget; -} } // namespace DistributedDB diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.h b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.h index 619696e8c2e2cccfffc0c3067841f20db7c94d6f..610537fa909662f371b416be2775b384886b6e5a 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.h +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync.h @@ -243,8 +243,6 @@ protected: int SendControlAck(SingleVerSyncTaskContext *context, const Message *message, int32_t recvCode, uint32_t controlCmdType, const CommErrHandler &handler = nullptr); - void SetSendConfig(const std::string &dstTarget, bool nonBlock, uint32_t timeout, SendConfig &sendConf); - uint32_t mtuSize_; SyncGenericInterface* storage_; ICommunicator* communicateHandle_; diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync_utils.cpp b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync_utils.cpp index 9f96f97a70975b3540ae5ea14f5ddcabe4512018..a2bc1d620b6d3cca2bced63ec6ce43912d56d5fa 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync_utils.cpp +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_data_sync_utils.cpp @@ -163,15 +163,19 @@ int SingleVerDataSyncUtils::RunPermissionCheck(SingleVerSyncTaskContext *context std::string userId = storage->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, ""); std::string storeId = storage->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, ""); uint8_t flag; - if (mode == SyncModeType::PUSH) { - flag = CHECK_FLAG_RECEIVE; - } else if (mode == SyncModeType::PULL) { - flag = CHECK_FLAG_SEND; - } else if (mode == SyncModeType::PUSH_AND_PULL) { - flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE; - } else { - // before add permissionCheck, PushStart packet and pullResponse packet do not setMode. - flag = CHECK_FLAG_RECEIVE; + switch (mode) { + case SyncModeType::PUSH: + flag = CHECK_FLAG_RECEIVE; + break; + case SyncModeType::PULL: + flag = CHECK_FLAG_SEND; + break; + case SyncModeType::PUSH_AND_PULL: + flag = CHECK_FLAG_SEND | CHECK_FLAG_RECEIVE; + break; + default: + flag = CHECK_FLAG_RECEIVE; + break; } int errCode = E_OK; if (storage->GetInterfaceType() != ISyncInterface::SYNC_RELATION) { diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_relational_syncer.cpp b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_relational_syncer.cpp index 08e87474d3a5bc597f9450c317f9d9ffe97f7a65..264ab16c05ed35f4c4adb5c98af0c03111862bdf 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_relational_syncer.cpp +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_relational_syncer.cpp @@ -30,7 +30,7 @@ int SingleVerRelationalSyncer::Initialize(ISyncInterface *syncInterface, bool is RegisterSchemaChangedCallback(callback); } -int SingleVerRelationalSyncer::Sync(const SyncParma ¶m) +int SingleVerRelationalSyncer::Sync(const SyncParma ¶m, uint64_t connectionId) { if (param.mode == SYNC_MODE_PUSH_PULL) { return -E_NOT_SUPPORT; @@ -38,10 +38,10 @@ int SingleVerRelationalSyncer::Sync(const SyncParma ¶m) if (param.syncQuery.GetRelationTableName().empty()) { return -E_NOT_SUPPORT; } - return GenericSyncer::Sync(param); + return GenericSyncer::Sync(param, connectionId); } -int SingleVerRelationalSyncer::PrepareSync(const SyncParma ¶m, uint32_t syncId) +int SingleVerRelationalSyncer::PrepareSync(const SyncParma ¶m, uint32_t syncId, uint64_t connectionId) { const auto &syncInterface = static_cast(syncInterface_); std::vector tablesQuery; @@ -51,7 +51,7 @@ int SingleVerRelationalSyncer::PrepareSync(const SyncParma ¶m, uint32_t sync tablesQuery = syncInterface->GetTablesQuery(); } std::set subSyncIdSet; - int errCode = GenerateEachSyncTask(param, syncId, tablesQuery, subSyncIdSet); + int errCode = GenerateEachSyncTask(param, syncId, tablesQuery, connectionId, subSyncIdSet); if (errCode != E_OK) { DoRollBack(subSyncIdSet); return errCode; @@ -63,7 +63,7 @@ int SingleVerRelationalSyncer::PrepareSync(const SyncParma ¶m, uint32_t sync } int SingleVerRelationalSyncer::GenerateEachSyncTask(const SyncParma ¶m, uint32_t syncId, - const std::vector &tablesQuery, std::set &subSyncIdSet) + const std::vector &tablesQuery, uint64_t connectionId, std::set &subSyncIdSet) { SyncParma subParam = param; subParam.isQuerySync = true; @@ -80,7 +80,7 @@ int SingleVerRelationalSyncer::GenerateEachSyncTask(const SyncParma ¶m, uint std::lock_guard lockGuard(syncMapLock_); fullSyncIdMap_[syncId].insert(subSyncId); } - errCode = GenericSyncer::PrepareSync(subParam, subSyncId); + errCode = GenericSyncer::PrepareSync(subParam, subSyncId, connectionId); if (errCode != E_OK) { LOGW("[SingleVerRelationalSyncer] PrepareSync failed errCode:%d", errCode); std::lock_guard lockGuard(syncMapLock_); diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_relational_syncer.h b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_relational_syncer.h index cd54e6d4ff2ac07a1a08222070df54e25c92b24e..5bb7caee0821a6db7a6642c21aa871d8ee1b69a6 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_relational_syncer.h +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/single_ver_relational_syncer.h @@ -1,60 +1,60 @@ -/* - * Copyright (c) 2021 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef RELATIONAL_SYNCER_H -#define RELATIONAL_SYNCER_H -#ifdef RELATIONAL_STORE -#include "single_ver_syncer.h" -namespace DistributedDB { -class SingleVerRelationalSyncer final : public SingleVerSyncer { -public: - SingleVerRelationalSyncer() = default; - ~SingleVerRelationalSyncer() override = default; - - int Initialize(ISyncInterface *syncInterface, bool isNeedActive) override; - - // Sync function. use SyncParma to reduce parameter. - int Sync(const SyncParma ¶m) override; - - void EnableAutoSync(bool enable) override; - - void LocalDataChanged(int notifyEvent) override; - -protected: - - int PrepareSync(const SyncParma ¶m, uint32_t syncId) override; - - int SyncConditionCheck(QuerySyncObject &query, int mode, bool isQuerySync, - const std::vector &devices) const override; - -private: - - int GenerateEachSyncTask(const SyncParma ¶m, uint32_t syncId, - const std::vector &tablesQuery, std::set &subSyncIdSet); - - void DoRollBack(std::set &subSyncIdSet); - - void DoOnComplete(const SyncParma ¶m, uint32_t syncId); - void DoOnSubSyncComplete(const uint32_t subSyncId, const uint32_t syncId, - const SyncParma ¶m, const std::map &devicesMap); - - void SchemaChangeCallback(); - - mutable std::mutex syncMapLock_; - std::map> fullSyncIdMap_; - std::map>> resMap_; -}; -} -#endif +/* + * Copyright (c) 2021 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef RELATIONAL_SYNCER_H +#define RELATIONAL_SYNCER_H +#ifdef RELATIONAL_STORE +#include "single_ver_syncer.h" +namespace DistributedDB { +class SingleVerRelationalSyncer final : public SingleVerSyncer { +public: + SingleVerRelationalSyncer() = default; + ~SingleVerRelationalSyncer() override = default; + + int Initialize(ISyncInterface *syncInterface, bool isNeedActive) override; + + // Sync function. use SyncParma to reduce parameter. + int Sync(const SyncParma ¶m, uint64_t connectionId) override; + + void EnableAutoSync(bool enable) override; + + void LocalDataChanged(int notifyEvent) override; + +protected: + + int PrepareSync(const SyncParma ¶m, uint32_t syncId, uint64_t connectionId) override; + + int SyncConditionCheck(QuerySyncObject &query, int mode, bool isQuerySync, + const std::vector &devices) const override; + +private: + + int GenerateEachSyncTask(const SyncParma ¶m, uint32_t syncId, + const std::vector &tablesQuery, uint64_t connectionId, std::set &subSyncIdSet); + + void DoRollBack(std::set &subSyncIdSet); + + void DoOnComplete(const SyncParma ¶m, uint32_t syncId); + void DoOnSubSyncComplete(const uint32_t subSyncId, const uint32_t syncId, + const SyncParma ¶m, const std::map &devicesMap); + + void SchemaChangeCallback(); + + mutable std::mutex syncMapLock_; + std::map> fullSyncIdMap_; + std::map>> resMap_; +}; +} +#endif #endif // RELATIONAL_SYNCER_H \ No newline at end of file diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/sync_config.h b/services/distributeddataservice/libs/distributeddb/syncer/src/sync_config.h index 67b39b16ac6329b61ffab635f81949e53c53a2f6..dc2c252def3661ca70226ef46ea5f995345eea0f 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/sync_config.h +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/sync_config.h @@ -1,46 +1,46 @@ -/* - * Copyright (c) 2021 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef SYNC_CONFIG_H -#define SYNC_CONFIG_H - -#include -#include -#include -#include "macro_utils.h" -#include "parcel.h" -#include "types_export.h" - -// db ability config -namespace DistributedDB { -// offset, used_bits_num, used_bits_num < 64 -using AbilityItem = std::pair; -// format: {offset, used_bits_num} -/* -if need to add new ability, just add append to the last ability -current ability format: -|first bit|second bit|third bit| -|DATABASE_COMPRESSION_ZLIB|ALLPREDICATEQUERY|SUBSCRIBEQUERY| -*/ -class SyncConfig final { -public: - static const AbilityItem DATABASE_COMPRESSION_ZLIB; - static const AbilityItem ALLPREDICATEQUERY; - static const AbilityItem SUBSCRIBEQUERY; - static const AbilityItem INKEYS_QUERY; - static const std::vector ABILITYBITS; - static const std::map COMPRESSALGOMAP; -}; -} +/* + * Copyright (c) 2021 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef SYNC_CONFIG_H +#define SYNC_CONFIG_H + +#include +#include +#include +#include "macro_utils.h" +#include "parcel.h" +#include "types_export.h" + +// db ability config +namespace DistributedDB { +// offset, used_bits_num, used_bits_num < 64 +using AbilityItem = std::pair; +// format: {offset, used_bits_num} +/* +if need to add new ability, just add append to the last ability +current ability format: +|first bit|second bit|third bit| +|DATABASE_COMPRESSION_ZLIB|ALLPREDICATEQUERY|SUBSCRIBEQUERY| +*/ +class SyncConfig final { +public: + static const AbilityItem DATABASE_COMPRESSION_ZLIB; + static const AbilityItem ALLPREDICATEQUERY; + static const AbilityItem SUBSCRIBEQUERY; + static const AbilityItem INKEYS_QUERY; + static const std::vector ABILITYBITS; + static const std::map COMPRESSALGOMAP; +}; +} #endif \ No newline at end of file diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/sync_engine.cpp b/services/distributeddataservice/libs/distributeddb/syncer/src/sync_engine.cpp index 2466b6c0f2c6f93706c98f71c231934984981899..52eedc190fc224c516e20d7d635c57bdcdc04154 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/sync_engine.cpp +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/sync_engine.cpp @@ -118,6 +118,12 @@ int SyncEngine::Close() UnRegCommunicatorsCallback(); StopAutoSubscribeTimer(); + std::unique_lock closeLock(execTaskCountLock_); + bool isTimeout = execTaskCv_.wait_for(closeLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), + [this]() { return execTaskCount_ == 0; }); + if (!isTimeout) { + LOGD("SyncEngine Close with executing task!"); + } // Clear SyncContexts { std::unique_lock lock(contextMapLock_); @@ -355,16 +361,16 @@ void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const ICommuni void SyncEngine::ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator) { (void)DealMsgUtilQueueEmpty(); - { - std::lock_guard lock(queueLock_); - execTaskCount_--; - } + DecExecTaskCount(); RefObject::DecObjRef(communicator); RefObject::DecObjRef(context); } int SyncEngine::DealMsgUtilQueueEmpty() { + if (!isActive_) { + return -E_BUSY; // db is closing just return + } int errCode = E_OK; Message *inMsg = nullptr; { @@ -377,19 +383,23 @@ int SyncEngine::DealMsgUtilQueueEmpty() queueCacheSize_ -= GetMsgSize(inMsg); } + IncExecTaskCount(); // it will deal with the first message in queue, we should increase object reference counts and sure that resources // could be prevented from destroying by other threads. - ISyncTaskContext *nextContext = GetConextForMsg(inMsg->GetTarget(), errCode); - if (errCode != E_OK) { - delete inMsg; - inMsg = nullptr; - return errCode; - } - errCode = ScheduleDealMsg(nextContext, inMsg); + do { + ISyncTaskContext *nextContext = GetConextForMsg(inMsg->GetTarget(), errCode); + if (errCode != E_OK) { + break; + } + errCode = ScheduleDealMsg(nextContext, inMsg); + if (errCode != E_OK) { + RefObject::DecObjRef(nextContext); + } + } while (false); if (errCode != E_OK) { - RefObject::DecObjRef(nextContext); delete inMsg; inMsg = nullptr; + DecExecTaskCount(); } return errCode; } @@ -425,13 +435,10 @@ int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg) { if (inMsg == nullptr) { LOGE("[SyncEngine] MessageReciveCallback inMsg is null!"); + DecExecTaskCount(); return E_OK; } RefObject::IncObjRef(communicatorProxy_); - { - std::lock_guard incLock(queueLock_); - execTaskCount_++; - } int errCode = E_OK; // deal remote local data changed message if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) { @@ -444,20 +451,18 @@ int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg) if (errCode != E_OK) { LOGE("[SyncEngine] MessageReciveCallbackTask Schedule failed err %d", errCode); RefObject::DecObjRef(communicatorProxy_); - { - std::lock_guard decLock(queueLock_); - execTaskCount_--; - } } return errCode; } void SyncEngine::MessageReciveCallback(const std::string &targetDev, Message *inMsg) { + IncExecTaskCount(); int errCode = MessageReciveCallbackInner(targetDev, inMsg); if (errCode != E_OK) { delete inMsg; inMsg = nullptr; + DecExecTaskCount(); LOGE("[SyncEngine] MessageReciveCallback failed!"); } } @@ -489,8 +494,10 @@ int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message return -E_BUSY; } - if (execTaskCount_ >= MAX_EXEC_NUM) { + if (execTaskCount_ > MAX_EXEC_NUM) { PutMsgIntoQueue(targetDev, inMsg, msgSize); + // task dont exec here + DecExecTaskCount(); return E_OK; } } @@ -500,7 +507,6 @@ int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message if (errCode != E_OK) { return errCode; } - LOGD("[SyncEngine] MessageReciveCallback MSG ID = %d", inMsg->GetMessageId()); return ScheduleDealMsg(nextContext, inMsg); } @@ -1022,4 +1028,19 @@ void SyncEngine::SchemaChange() context->SchemaChange(); } } + +void SyncEngine::IncExecTaskCount() +{ + std::lock_guard incLock(execTaskCountLock_); + execTaskCount_++; +} + +void SyncEngine::DecExecTaskCount() +{ + { + std::lock_guard decLock(execTaskCountLock_); + execTaskCount_--; + } + execTaskCv_.notify_all(); +} } // namespace DistributedDB diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/sync_engine.h b/services/distributeddataservice/libs/distributeddb/syncer/src/sync_engine.h index f942302f3c5f14db8a1b5c55db0b9b3c39e97403..9cb3f6fd241ffbffa20b5ffbb20c1a7ec7edb9ac 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/sync_engine.h +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/sync_engine.h @@ -189,6 +189,10 @@ private: static uint8_t GetPermissionCheckFlag(bool isAutoSync, int syncMode); + void IncExecTaskCount(); + + void DecExecTaskCount(); + ICommunicator *communicator_; DeviceManager *deviceManager_; std::function onRemoteDataChanged_; @@ -213,6 +217,8 @@ private: // key: device value: equalIdentifier std::map equalIdentifierMap_; + std::mutex execTaskCountLock_; + std::condition_variable execTaskCv_; }; } // namespace DistributedDB diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/syncer_proxy.cpp b/services/distributeddataservice/libs/distributeddb/syncer/src/syncer_proxy.cpp index 0c71b0da4461edb9db11ca1315d23247fbb26e36..6649beb4336f9087a848e82d2df211a342c352b1 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/syncer_proxy.cpp +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/syncer_proxy.cpp @@ -64,12 +64,12 @@ int SyncerProxy::Sync(const std::vector &devices, int mode, return syncer_->Sync(devices, mode, onComplete, onFinalize, wait); } -int SyncerProxy::Sync(const SyncParma &parma) +int SyncerProxy::Sync(const SyncParma &parma, uint64_t connectionId) { if (syncer_ == nullptr) { return -E_NOT_INIT; } - return syncer_->Sync(parma); + return syncer_->Sync(parma, connectionId); } int SyncerProxy::RemoveSyncOperation(int syncId) @@ -80,12 +80,12 @@ int SyncerProxy::RemoveSyncOperation(int syncId) return syncer_->RemoveSyncOperation(syncId); } -int SyncerProxy::StopSync() +int SyncerProxy::StopSync(uint64_t connectionId) { if (syncer_ == nullptr) { return -E_NOT_INIT; } - return syncer_->StopSync(); + return syncer_->StopSync(connectionId); } uint64_t SyncerProxy::GetTimestamp() diff --git a/services/distributeddataservice/libs/distributeddb/syncer/src/time_helper.cpp b/services/distributeddataservice/libs/distributeddb/syncer/src/time_helper.cpp index c54b7fa6dff78acf52438ccc8902b9c1eaee24ea..0ea1f4f225cc7907347ebaa37ed407096b18254f 100644 --- a/services/distributeddataservice/libs/distributeddb/syncer/src/time_helper.cpp +++ b/services/distributeddataservice/libs/distributeddb/syncer/src/time_helper.cpp @@ -118,13 +118,6 @@ int TimeHelper::SaveLocalTimeOffset(TimeOffset offset) void TimeHelper::SetSendConfig(const std::string &dstTarget, bool nonBlock, uint32_t timeout, SendConfig &sendConf) { - sendConf.nonBlock = nonBlock; - sendConf.timeout = timeout; - sendConf.isNeedExtendHead = storage_->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, - false); - sendConf.paramInfo.appId = storage_->GetDbProperties().GetStringProp(KvDBProperties::APP_ID, ""); - sendConf.paramInfo.userId = storage_->GetDbProperties().GetStringProp(KvDBProperties::USER_ID, ""); - sendConf.paramInfo.storeId = storage_->GetDbProperties().GetStringProp(KvDBProperties::STORE_ID, ""); - sendConf.paramInfo.dstTarget = dstTarget; + SetSendConfigParam(storage_->GetDbProperties(), dstTarget, false, SEND_TIME_OUT, sendConf); } } // namespace DistributedDB diff --git a/services/distributeddataservice/libs/distributeddb/test/BUILD.gn b/services/distributeddataservice/libs/distributeddb/test/BUILD.gn index 4e3d131ca30652042a21ddc3c4f72131a01d803e..aee25ed1a92ee187b42f0ab6cb741df8eb948055 100644 --- a/services/distributeddataservice/libs/distributeddb/test/BUILD.gn +++ b/services/distributeddataservice/libs/distributeddb/test/BUILD.gn @@ -139,6 +139,7 @@ ohos_source_set("src_file") { "../storage/src/generic_kvdb.cpp", "../storage/src/generic_kvdb_connection.cpp", "../storage/src/generic_single_ver_kv_entry.cpp", + "../storage/src/iconnection.cpp", "../storage/src/ikvdb_factory.cpp", "../storage/src/kvdb_commit_notify_filterable_data.cpp", "../storage/src/kvdb_manager.cpp", diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_common.h b/services/distributeddataservice/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_common.h index 04f9c5d2d95ad10dace644123100cd50636957eb..11be295afdb9763a8771963019dddbd37a7a488b 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_common.h +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_common.h @@ -72,7 +72,7 @@ struct ExtendHeadInfo { class ExtendHeaderHandleTest : public DistributedDB::ExtendHeaderHandle { public: - explicit ExtendHeaderHandleTest(const DistributedDB::ExtendInfo &info) + explicit ExtendHeaderHandleTest(const DistributedDB::ExtendInfo &info) : headSize_(0) { localDbProperty_.appId = info.appId; localDbProperty_.storeId = info.storeId; diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_multi_user_test.cpp b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_multi_user_test.cpp index 24f3b2163872dc12decc4f567d27521ff0fc0fd4..aa3614c4b1f1d0f002cd7b97a604dd44f24acac4 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_multi_user_test.cpp +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_multi_user_test.cpp @@ -228,6 +228,65 @@ bool AutoLaunchCallBack(const std::string &identifier, AutoLaunchParam ¶m, K param.option.syncDualTupleMode = true; return ret; } + +void TestSyncWithUserChange(bool wait) +{ + /** + * @tc.steps: step1. set SyncActivationCheckCallback and only userId1 can active + */ + g_mgr1.SetSyncActivationCheckCallback(g_syncActivationCheckCallback2); + /** + * @tc.steps: step2. openstore1 in dual tuple sync mode and openstore2 in normal sync mode + * @tc.expected: step2. only user2 sync mode is active + */ + OpenStore1(true); + OpenStore2(true); + /** + * @tc.steps: step3. set SyncActivationCheckCallback and only userId2 can active + */ + g_mgr1.SetSyncActivationCheckCallback(g_syncActivationCheckCallback1); + + /** + * @tc.steps: step4. call NotifyUserChanged and block sync db concurrently + * @tc.expected: step4. return OK + */ + CipherPassword passwd; + bool startSync = false; + std::condition_variable cv; + thread subThread([&]() { + std::mutex notifyLock; + std::unique_lock lck(notifyLock); + cv.wait(lck, [&startSync]() { return startSync; }); + EXPECT_TRUE(KvStoreDelegateManager::NotifyUserChanged() == OK); + }); + subThread.detach(); + g_communicatorAggregator->RegOnDispatch([&](const std::string&, Message *inMsg) { + if (!startSync) { + startSync = true; + cv.notify_all(); + } + }); + + /** + * @tc.steps: step5. deviceA call sync and wait + * @tc.expected: step5. sync should return OK. + */ + std::map result; + std::vector devices; + devices.push_back(g_deviceB->GetDeviceId()); + DBStatus status = g_tool.SyncTest(g_kvDelegatePtr1, devices, SYNC_MODE_PUSH_ONLY, result, wait); + EXPECT_EQ(status, OK); + g_communicatorAggregator->RegOnDispatch(nullptr); + /** + * @tc.expected: step6. onComplete should be called, and status is USER_CHANGED + */ + EXPECT_EQ(result.size(), devices.size()); + for (const auto &pair : result) { + LOGD("dev %s, status %d", pair.first.c_str(), pair.second); + EXPECT_EQ(pair.second, USER_CHANGED); + } + CloseStore(); +} } /** @@ -631,50 +690,7 @@ HWTEST_F(DistributedDBSingleVerMultiUserTest, MultiUser007, TestSize.Level0) */ HWTEST_F(DistributedDBSingleVerMultiUserTest, MultiUser008, TestSize.Level0) { - /** - * @tc.steps: step1. set SyncActivationCheckCallback and only userId1 can active - */ - g_mgr1.SetSyncActivationCheckCallback(g_syncActivationCheckCallback2); - /** - * @tc.steps: step2. openstore1 in dual tuple sync mode and openstore2 in normal sync mode - * @tc.expected: step2. only user2 sync mode is active - */ - OpenStore1(true); - OpenStore2(true); - /** - * @tc.steps: step3. set SyncActivationCheckCallback and only userId2 can active - */ - g_mgr1.SetSyncActivationCheckCallback(g_syncActivationCheckCallback1); - - /** - * @tc.steps: step4. call NotifyUserChanged and block sync db concurrently - * @tc.expected: step4. return OK - */ - CipherPassword passwd; - thread subThread([&]() { - std::this_thread::sleep_for(std::chrono::milliseconds(5)); - EXPECT_TRUE(KvStoreDelegateManager::NotifyUserChanged() == OK); - }); - subThread.detach(); - /** - * @tc.steps: step5. deviceA call sync and wait - * @tc.expected: step5. sync should return OK. - */ - std::map result; - std::vector devices; - devices.push_back(g_deviceB->GetDeviceId()); - DBStatus status = g_tool.SyncTest(g_kvDelegatePtr1, devices, SYNC_MODE_PUSH_ONLY, result, true); - EXPECT_TRUE(status == OK); - - /** - * @tc.expected: step6. onComplete should be called, and status is USER_CHANGED - */ - EXPECT_TRUE(result.size() == devices.size()); - for (const auto &pair : result) { - LOGD("dev %s, status %d", pair.first.c_str(), pair.second); - EXPECT_TRUE(pair.second == USER_CHANGED); - } - CloseStore(); + TestSyncWithUserChange(true); } /** @@ -686,61 +702,7 @@ HWTEST_F(DistributedDBSingleVerMultiUserTest, MultiUser008, TestSize.Level0) */ HWTEST_F(DistributedDBSingleVerMultiUserTest, MultiUser009, TestSize.Level0) { - /** - * @tc.steps: step1. set SyncActivationCheckCallback and only userId1 can active - */ - g_mgr1.SetSyncActivationCheckCallback(g_syncActivationCheckCallback2); - /** - * @tc.steps: step2. openstore1 in dual tuple sync mode and openstore2 in normal sync mode - * @tc.expected: step2. only user2 sync mode is active - */ - OpenStore1(true); - OpenStore2(true); - /** - * @tc.steps: step3. set SyncActivationCheckCallback and only userId2 can active - */ - g_mgr1.SetSyncActivationCheckCallback(g_syncActivationCheckCallback1); - - /** - * @tc.steps: step4. call NotifyUserChanged and block sync db concurrently - * @tc.expected: step4. return OK - */ - CipherPassword passwd; - bool startSync = false; - std::condition_variable cv; - thread subThread([&]() { - std::mutex notifyLock; - std::unique_lock lck(notifyLock); - cv.wait(lck, [&startSync]() { return startSync; }); - EXPECT_TRUE(KvStoreDelegateManager::NotifyUserChanged() == OK); - }); - subThread.detach(); - g_communicatorAggregator->RegOnDispatch([&](const std::string&, Message *inMsg) { - if (!startSync) { - startSync = true; - cv.notify_all(); - } - }); - - /** - * @tc.steps: step5. deviceA call sync and wait - * @tc.expected: step5. sync should return OK. - */ - std::map result; - std::vector devices; - devices.push_back(g_deviceB->GetDeviceId()); - DBStatus status = g_tool.SyncTest(g_kvDelegatePtr1, devices, SYNC_MODE_PUSH_ONLY, result, true); - EXPECT_EQ(status, OK); - g_communicatorAggregator->RegOnDispatch(nullptr); - /** - * @tc.expected: step6. onComplete should be called, and status is USER_CHANGED - */ - EXPECT_EQ(result.size(), devices.size()); - for (const auto &pair : result) { - LOGD("dev %s, status %d", pair.first.c_str(), pair.second); - EXPECT_EQ(pair.second, USER_CHANGED); - } - CloseStore(); + TestSyncWithUserChange(false); } /** diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_communicator.h b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_communicator.h index b927f3216bdd3b7a1e11ed9c4e99f8d27f376965..aab37dff664551735ea66bb2c2ab23787a3cd19d 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_communicator.h +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/mock_communicator.h @@ -24,8 +24,8 @@ class MockCommunicator : public ICommunicator { public: MOCK_CONST_METHOD1(GetLocalIdentity, int(std::string &)); MOCK_CONST_METHOD2(GetRemoteCommunicatorVersion, int(const std::string &, uint16_t &)); - MOCK_METHOD3(SendMessage, int(const std::string &, const Message *, SendConfig &)); - MOCK_METHOD4(SendMessage, int(const std::string &, const Message *, SendConfig &, const OnSendEnd &)); + MOCK_METHOD3(SendMessage, int(const std::string &, const Message *, const SendConfig &)); + MOCK_METHOD4(SendMessage, int(const std::string &, const Message *, const SendConfig &, const OnSendEnd &)); MOCK_CONST_METHOD0(GetCommunicatorMtuSize, uint32_t(void)); MOCK_CONST_METHOD1(GetCommunicatorMtuSize, uint32_t(const std::string &)); MOCK_CONST_METHOD0(GetTimeout, uint32_t(void)); diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/relational_virtual_device.cpp b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/relational_virtual_device.cpp index c4a55e158fd72bda839b1f5ad983ba43cacc0a7c..816795b6c99c9ae00020b23dc9d97758d39599b8 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/relational_virtual_device.cpp +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/relational_virtual_device.cpp @@ -1,63 +1,63 @@ -/* - * Copyright (c) 2021 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifdef RELATIONAL_STORE -#include "relational_virtual_device.h" -#include "virtual_relational_ver_sync_db_interface.h" -namespace DistributedDB { -RelationalVirtualDevice::RelationalVirtualDevice(const std::string &deviceId) : GenericVirtualDevice(deviceId) -{ -} - -RelationalVirtualDevice::~RelationalVirtualDevice() -{ -} - -int RelationalVirtualDevice::PutData(const std::string &tableName, const std::vector &dataList) -{ - return static_cast(storage_)->PutLocalData(dataList, tableName); -} - -int RelationalVirtualDevice::GetAllSyncData(const std::string &tableName, std::vector &data) -{ - return static_cast(storage_)->GetAllSyncData(tableName, data); -} - -int RelationalVirtualDevice::GetSyncData(const std::string &tableName, - const std::string &hashKey, VirtualRowData &data) -{ - return static_cast(storage_)->GetVirtualSyncData(tableName, hashKey, data); -} - -void RelationalVirtualDevice::SetLocalFieldInfo(const std::vector &localFieldInfo) -{ - static_cast(storage_)->SetLocalFieldInfo(localFieldInfo); -} - -int RelationalVirtualDevice::Sync(SyncMode mode, bool wait) -{ - return -E_NOT_SUPPORT; -} - -void RelationalVirtualDevice::EraseSyncData(const std::string &tableName) -{ - static_cast(storage_)->EraseSyncData(tableName); -} - -void RelationalVirtualDevice::SetTableInfo(const TableInfo &tableInfo) -{ - static_cast(storage_)->SetTableInfo(tableInfo); -} -} // DistributedDB +/* + * Copyright (c) 2021 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifdef RELATIONAL_STORE +#include "relational_virtual_device.h" +#include "virtual_relational_ver_sync_db_interface.h" +namespace DistributedDB { +RelationalVirtualDevice::RelationalVirtualDevice(const std::string &deviceId) : GenericVirtualDevice(deviceId) +{ +} + +RelationalVirtualDevice::~RelationalVirtualDevice() +{ +} + +int RelationalVirtualDevice::PutData(const std::string &tableName, const std::vector &dataList) +{ + return static_cast(storage_)->PutLocalData(dataList, tableName); +} + +int RelationalVirtualDevice::GetAllSyncData(const std::string &tableName, std::vector &data) +{ + return static_cast(storage_)->GetAllSyncData(tableName, data); +} + +int RelationalVirtualDevice::GetSyncData(const std::string &tableName, + const std::string &hashKey, VirtualRowData &data) +{ + return static_cast(storage_)->GetVirtualSyncData(tableName, hashKey, data); +} + +void RelationalVirtualDevice::SetLocalFieldInfo(const std::vector &localFieldInfo) +{ + static_cast(storage_)->SetLocalFieldInfo(localFieldInfo); +} + +int RelationalVirtualDevice::Sync(SyncMode mode, bool wait) +{ + return -E_NOT_SUPPORT; +} + +void RelationalVirtualDevice::EraseSyncData(const std::string &tableName) +{ + static_cast(storage_)->EraseSyncData(tableName); +} + +void RelationalVirtualDevice::SetTableInfo(const TableInfo &tableInfo) +{ + static_cast(storage_)->SetTableInfo(tableInfo); +} +} // DistributedDB #endif \ No newline at end of file diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/relational_virtual_device.h b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/relational_virtual_device.h index e4e1baf74590c18b3876d9ac740ea94f3fff0b32..9aed0bbf3988306eefc314a125fa7099b630db19 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/relational_virtual_device.h +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/relational_virtual_device.h @@ -1,40 +1,40 @@ -/* - * Copyright (c) 2021 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef RELATIONAL_VIRTUAL_DEVICE_H -#define RELATIONAL_VIRTUAL_DEVICE_H -#ifdef RELATIONAL_STORE - -#include "data_transformer.h" -#include "generic_virtual_device.h" -#include "relational_schema_object.h" -#include "virtual_relational_ver_sync_db_interface.h" - -namespace DistributedDB { -class RelationalVirtualDevice final : public GenericVirtualDevice { -public: - explicit RelationalVirtualDevice(const std::string &deviceId); - ~RelationalVirtualDevice() override; - - int PutData(const std::string &tableName, const std::vector &dataList); - int GetAllSyncData(const std::string &tableName, std::vector &data); - int GetSyncData(const std::string &tableName, const std::string &hashKey, VirtualRowData &data); - void SetLocalFieldInfo(const std::vector &localFieldInfo); - void SetTableInfo(const TableInfo &tableInfo); - int Sync(SyncMode mode, bool wait) override; - void EraseSyncData(const std::string &tableName); -}; -} -#endif -#endif // RELATIONAL_VIRTUAL_DEVICE_H +/* + * Copyright (c) 2021 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef RELATIONAL_VIRTUAL_DEVICE_H +#define RELATIONAL_VIRTUAL_DEVICE_H +#ifdef RELATIONAL_STORE + +#include "data_transformer.h" +#include "generic_virtual_device.h" +#include "relational_schema_object.h" +#include "virtual_relational_ver_sync_db_interface.h" + +namespace DistributedDB { +class RelationalVirtualDevice final : public GenericVirtualDevice { +public: + explicit RelationalVirtualDevice(const std::string &deviceId); + ~RelationalVirtualDevice() override; + + int PutData(const std::string &tableName, const std::vector &dataList); + int GetAllSyncData(const std::string &tableName, std::vector &data); + int GetSyncData(const std::string &tableName, const std::string &hashKey, VirtualRowData &data); + void SetLocalFieldInfo(const std::vector &localFieldInfo); + void SetTableInfo(const TableInfo &tableInfo); + int Sync(SyncMode mode, bool wait) override; + void EraseSyncData(const std::string &tableName); +}; +} +#endif +#endif // RELATIONAL_VIRTUAL_DEVICE_H diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp index c8c946e7dd51f7f4fb3fa36ce77d00c5fa7baca0..caed2ea2709e2b643ab19df2ea08f768f248eabe 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp @@ -43,13 +43,13 @@ void VirtualCommunicator::Activate() { } -int VirtualCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config) +int VirtualCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config) { return SendMessage(dstTarget, inMsg, config, nullptr); } -int VirtualCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, - SendConfig &config, const OnSendEnd &onEnd) +int VirtualCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config, + const OnSendEnd &onEnd) { AutoLock lock(this); if (IsKilled()) { diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h index ec7f766d178623cfc2e9a2dff28417c835ab4f19..bd68e82f584bb943bca53999b6307e9b5def35b8 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h @@ -51,8 +51,8 @@ public: uint32_t GetTimeout(const std::string &target) const override; int GetLocalIdentity(std::string &outTarget) const override; - int SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config) override; - int SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config, + int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config) override; + int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config, const OnSendEnd &onEnd) override; int GetRemoteCommunicatorVersion(const std::string &deviceId, uint16_t &version) const override; diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.cpp b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.cpp index 899d3680b74593f2f3f4410e07e8cbcb8c242c8a..c91bf9dc12b67ce45200d999fe1af6ea22b8b9cf 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.cpp +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.cpp @@ -1,357 +1,357 @@ -/* - * Copyright (c) 2021 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifdef RELATIONAL_STORE -#include "db_common.h" -#include "virtual_relational_ver_sync_db_interface.h" -#include "generic_single_ver_kv_entry.h" -#include "virtual_single_ver_sync_db_Interface.h" - -namespace DistributedDB { -namespace { - int GetEntriesFromItems(std::vector &entries, const std::vector &dataItems) - { - int errCode = E_OK; - for (auto &item : dataItems) { - auto entry = new (std::nothrow) GenericSingleVerKvEntry(); - if (entry == nullptr) { - LOGE("Create entry failed."); - errCode = -E_OUT_OF_MEMORY; - break; - } - DataItem storageItem; - storageItem.key = item.key; - storageItem.value = item.value; - storageItem.flag = item.flag; - storageItem.timestamp = item.timestamp; - storageItem.writeTimestamp = item.writeTimestamp; - storageItem.hashKey = item.hashKey; - entry->SetEntryData(std::move(storageItem)); - entries.push_back(entry); - } - if (errCode != E_OK) { - LOGD("[GetEntriesFromItems] failed:%d", errCode); - for (auto &kvEntry : entries) { - delete kvEntry; - kvEntry = nullptr; - } - entries.clear(); - } - LOGD("[GetEntriesFromItems] size:%zu", dataItems.size()); - return errCode; - } - - std::string GetStr(const std::vector &vec) - { - std::string str; - DBCommon::VectorToString(vec, str); - return str; - } -} - -int VirtualRelationalVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &object, - const std::vector &entries, const std::string &deviceName) -{ - LOGD("[PutSyncData] size %zu", entries.size()); - std::vector dataItems; - for (auto itemEntry : entries) { - auto *entry = static_cast(itemEntry); - if (entry != nullptr) { - DataItem item; - item.origDev = entry->GetOrigDevice(); - item.flag = entry->GetFlag(); - item.timestamp = entry->GetTimestamp(); - item.writeTimestamp = entry->GetWriteTimestamp(); - entry->GetKey(item.key); - entry->GetValue(item.value); - entry->GetHashKey(item.hashKey); - dataItems.push_back(item); - } - } - OptTableDataWithLog optTableDataWithLog; - optTableDataWithLog.tableName = object.GetTableName(); - int errCode = DataTransformer::TransformDataItem(dataItems, localFieldInfo_, - localFieldInfo_, optTableDataWithLog); - if (errCode != E_OK) { - return errCode; - } - for (const auto &optRowDataWithLog : optTableDataWithLog.dataList) { - VirtualRowData virtualRowData; - virtualRowData.logInfo = optRowDataWithLog.logInfo; - size_t index = 0; - for (const auto &optItem : optRowDataWithLog.optionalData) { - if (index >= localFieldInfo_.size()) { - break; - } - DataValue dataValue = std::move(optItem); - LOGD("type:%d", static_cast(optItem.GetType())); - virtualRowData.objectData.PutDataValue(localFieldInfo_[index].GetFieldName(), dataValue); - index++; - } - syncData_[object.GetTableName()][GetStr(virtualRowData.logInfo.hashKey)] = virtualRowData; - } - LOGD("tableName %s", optTableDataWithLog.tableName.c_str()); - return errCode; -} - -int VirtualRelationalVerSyncDBInterface::PutLocalData(const std::vector &dataList, - const std::string &tableName) -{ - for (const auto &item : dataList) { - localData_[tableName][GetStr(item.logInfo.hashKey)] = item; - } - return E_OK; -} - -int VirtualRelationalVerSyncDBInterface::GetSyncData(QueryObject &query, - const SyncTimeRange &timeRange, const DataSizeSpecInfo &dataSizeInfo, - ContinueToken &continueStmtToken, std::vector &entries) const -{ - if (localData_.find(query.GetTableName()) == localData_.end()) { - LOGD("[GetSyncData] No Data Return"); - return E_OK; - } - std::vector dataItemList; - TableDataWithLog tableDataWithLog = {query.GetTableName(), {}}; - for (const auto &[hashKey, virtualData] : localData_[query.GetTableName()]) { - if (virtualData.logInfo.timestamp < timeRange.beginTime || - virtualData.logInfo.timestamp >= timeRange.endTime) { - LOGD("ignore hashkey %s", hashKey.c_str()); - continue; - } - RowDataWithLog rowData; - for (const auto &field : localFieldInfo_) { - DataValue dataValue; - (void)virtualData.objectData.GetDataValue(field.GetFieldName(), dataValue); - rowData.rowData.push_back(std::move(dataValue)); - } - rowData.logInfo = virtualData.logInfo; - tableDataWithLog.dataList.push_back(rowData); - } - - int errCode = DataTransformer::TransformTableData(tableDataWithLog, localFieldInfo_, dataItemList); - if (errCode != E_OK) { - return errCode; - } - continueStmtToken = nullptr; - return GetEntriesFromItems(entries, dataItemList); -} - -RelationalSchemaObject VirtualRelationalVerSyncDBInterface::GetSchemaInfo() const -{ - return schemaObj_; -} - -int VirtualRelationalVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const -{ - return E_OK; -} - -int VirtualRelationalVerSyncDBInterface::GetBatchMetaData(const std::vector &keys, - std::vector &entries) const -{ - int errCode = E_OK; - for (const auto &key : keys) { - Entry entry; - entry.key = key; - errCode = GetMetaData(key, entry.value); - if (errCode != E_OK) { - return errCode; - } - entries.push_back(entry); - } - return errCode; -} - -int VirtualRelationalVerSyncDBInterface::PutBatchMetaData(std::vector &entries) -{ - int errCode = E_OK; - for (const auto &entry : entries) { - errCode = PutMetaData(entry.key, entry.value); - if (errCode != E_OK) { - return errCode; - } - } - return errCode; -} - -std::vector VirtualRelationalVerSyncDBInterface::GetTablesQuery() -{ - return {}; -} - -int VirtualRelationalVerSyncDBInterface::LocalDataChanged(int notifyEvent, std::vector &queryObj) -{ - return E_OK; -} - -int VirtualRelationalVerSyncDBInterface::GetInterfaceType() const -{ - return SYNC_RELATION; -} - -void VirtualRelationalVerSyncDBInterface::IncRefCount() -{ -} - -void VirtualRelationalVerSyncDBInterface::DecRefCount() -{ -} - -std::vector VirtualRelationalVerSyncDBInterface::GetIdentifier() const -{ - return {}; -} - -void VirtualRelationalVerSyncDBInterface::GetMaxTimestamp(Timestamp &stamp) const -{ - for (const auto &item : syncData_) { - for (const auto &entry : item.second) { - if (stamp < entry.second.logInfo.timestamp) { - stamp = entry.second.logInfo.timestamp; - } - } - } - LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp); -} - -int VirtualRelationalVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const -{ - auto iter = metadata_.find(key); - if (iter != metadata_.end()) { - value = iter->second; - return E_OK; - } - return -E_NOT_FOUND; -} - -int VirtualRelationalVerSyncDBInterface::PutMetaData(const Key &key, const Value &value) -{ - metadata_[key] = value; - return E_OK; -} - -int VirtualRelationalVerSyncDBInterface::DeleteMetaData(const std::vector &keys) -{ - for (const auto &key : keys) { - (void)metadata_.erase(key); - } - return E_OK; -} - -int VirtualRelationalVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const -{ - size_t prefixKeySize = keyPrefix.size(); - for (auto iter = metadata_.begin();iter != metadata_.end();) { - if (prefixKeySize <= iter->first.size() && - keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) { - iter = metadata_.erase(iter); - } else { - ++iter; - } - } - return E_OK; -} - -int VirtualRelationalVerSyncDBInterface::GetAllMetaKeys(std::vector &keys) const -{ - for (auto &iter : metadata_) { - keys.push_back(iter.first); - } - LOGD("GetAllMetaKeys size %zu", keys.size()); - return E_OK; -} - -const KvDBProperties &VirtualRelationalVerSyncDBInterface::GetDbProperties() const -{ - return properties_; -} - -void VirtualRelationalVerSyncDBInterface::SetLocalFieldInfo(const std::vector &localFieldInfo) -{ - localFieldInfo_.clear(); - localFieldInfo_ = localFieldInfo; -} - -int VirtualRelationalVerSyncDBInterface::GetAllSyncData(const std::string &tableName, - std::vector &data) -{ - if (syncData_.find(tableName) == syncData_.end()) { - return -E_NOT_FOUND; - } - for (const auto &entry : syncData_[tableName]) { - data.push_back(entry.second); - } - return E_OK; -} - -int VirtualRelationalVerSyncDBInterface::GetVirtualSyncData(const std::string &tableName, - const std::string &hashKey, VirtualRowData &data) -{ - if (syncData_.find(tableName) == syncData_.end()) { - return -E_NOT_FOUND; - } - if (syncData_.find(hashKey) == syncData_.end()) { - return -E_NOT_FOUND; - } - data = syncData_[tableName][hashKey]; - return E_OK; -} - -void VirtualRelationalVerSyncDBInterface::EraseSyncData(const std::string &tableName) -{ - if (syncData_.find(tableName) == syncData_.end()) { - return; - } - syncData_.erase(tableName); -} - -int VirtualRelationalVerSyncDBInterface::CreateDistributedDeviceTable(const std::string &device, - const RelationalSyncStrategy &syncStrategy) -{ - return E_OK; -} - -int VirtualRelationalVerSyncDBInterface::RegisterSchemaChangedCallback(const std::function &onSchemaChanged) -{ - return E_OK; -} - -void VirtualRelationalVerSyncDBInterface::SetTableInfo(const TableInfo &tableInfo) -{ - schemaObj_.AddRelationalTable(tableInfo); -} - -int VirtualRelationalVerSyncDBInterface::GetMaxTimestamp(const std::string &tableName, Timestamp ×tamp) const -{ - (void)tableName; - timestamp = 0; - return E_OK; -} - -void ObjectData::PutDataValue(const std::string &fieldName, const DataValue &value) -{ - fieldData[fieldName] = value; -} - -int ObjectData::GetDataValue(const std::string &fieldName, DataValue &value) const -{ - if (fieldData.find(fieldName) == fieldData.end()) { - return -E_NOT_FOUND; - } - value = fieldData[fieldName]; - return E_OK; -} -} +/* + * Copyright (c) 2021 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifdef RELATIONAL_STORE +#include "db_common.h" +#include "virtual_relational_ver_sync_db_interface.h" +#include "generic_single_ver_kv_entry.h" +#include "virtual_single_ver_sync_db_Interface.h" + +namespace DistributedDB { +namespace { + int GetEntriesFromItems(std::vector &entries, const std::vector &dataItems) + { + int errCode = E_OK; + for (auto &item : dataItems) { + auto entry = new (std::nothrow) GenericSingleVerKvEntry(); + if (entry == nullptr) { + LOGE("Create entry failed."); + errCode = -E_OUT_OF_MEMORY; + break; + } + DataItem storageItem; + storageItem.key = item.key; + storageItem.value = item.value; + storageItem.flag = item.flag; + storageItem.timestamp = item.timestamp; + storageItem.writeTimestamp = item.writeTimestamp; + storageItem.hashKey = item.hashKey; + entry->SetEntryData(std::move(storageItem)); + entries.push_back(entry); + } + if (errCode != E_OK) { + LOGD("[GetEntriesFromItems] failed:%d", errCode); + for (auto &kvEntry : entries) { + delete kvEntry; + kvEntry = nullptr; + } + entries.clear(); + } + LOGD("[GetEntriesFromItems] size:%zu", dataItems.size()); + return errCode; + } + + std::string GetStr(const std::vector &vec) + { + std::string str; + DBCommon::VectorToString(vec, str); + return str; + } +} + +int VirtualRelationalVerSyncDBInterface::PutSyncDataWithQuery(const QueryObject &object, + const std::vector &entries, const std::string &deviceName) +{ + LOGD("[PutSyncData] size %zu", entries.size()); + std::vector dataItems; + for (auto itemEntry : entries) { + auto *entry = static_cast(itemEntry); + if (entry != nullptr) { + DataItem item; + item.origDev = entry->GetOrigDevice(); + item.flag = entry->GetFlag(); + item.timestamp = entry->GetTimestamp(); + item.writeTimestamp = entry->GetWriteTimestamp(); + entry->GetKey(item.key); + entry->GetValue(item.value); + entry->GetHashKey(item.hashKey); + dataItems.push_back(item); + } + } + OptTableDataWithLog optTableDataWithLog; + optTableDataWithLog.tableName = object.GetTableName(); + int errCode = DataTransformer::TransformDataItem(dataItems, localFieldInfo_, + localFieldInfo_, optTableDataWithLog); + if (errCode != E_OK) { + return errCode; + } + for (const auto &optRowDataWithLog : optTableDataWithLog.dataList) { + VirtualRowData virtualRowData; + virtualRowData.logInfo = optRowDataWithLog.logInfo; + size_t index = 0; + for (const auto &optItem : optRowDataWithLog.optionalData) { + if (index >= localFieldInfo_.size()) { + break; + } + DataValue dataValue = std::move(optItem); + LOGD("type:%d", static_cast(optItem.GetType())); + virtualRowData.objectData.PutDataValue(localFieldInfo_[index].GetFieldName(), dataValue); + index++; + } + syncData_[object.GetTableName()][GetStr(virtualRowData.logInfo.hashKey)] = virtualRowData; + } + LOGD("tableName %s", optTableDataWithLog.tableName.c_str()); + return errCode; +} + +int VirtualRelationalVerSyncDBInterface::PutLocalData(const std::vector &dataList, + const std::string &tableName) +{ + for (const auto &item : dataList) { + localData_[tableName][GetStr(item.logInfo.hashKey)] = item; + } + return E_OK; +} + +int VirtualRelationalVerSyncDBInterface::GetSyncData(QueryObject &query, + const SyncTimeRange &timeRange, const DataSizeSpecInfo &dataSizeInfo, + ContinueToken &continueStmtToken, std::vector &entries) const +{ + if (localData_.find(query.GetTableName()) == localData_.end()) { + LOGD("[GetSyncData] No Data Return"); + return E_OK; + } + std::vector dataItemList; + TableDataWithLog tableDataWithLog = {query.GetTableName(), {}}; + for (const auto &[hashKey, virtualData] : localData_[query.GetTableName()]) { + if (virtualData.logInfo.timestamp < timeRange.beginTime || + virtualData.logInfo.timestamp >= timeRange.endTime) { + LOGD("ignore hashkey %s", hashKey.c_str()); + continue; + } + RowDataWithLog rowData; + for (const auto &field : localFieldInfo_) { + DataValue dataValue; + (void)virtualData.objectData.GetDataValue(field.GetFieldName(), dataValue); + rowData.rowData.push_back(std::move(dataValue)); + } + rowData.logInfo = virtualData.logInfo; + tableDataWithLog.dataList.push_back(rowData); + } + + int errCode = DataTransformer::TransformTableData(tableDataWithLog, localFieldInfo_, dataItemList); + if (errCode != E_OK) { + return errCode; + } + continueStmtToken = nullptr; + return GetEntriesFromItems(entries, dataItemList); +} + +RelationalSchemaObject VirtualRelationalVerSyncDBInterface::GetSchemaInfo() const +{ + return schemaObj_; +} + +int VirtualRelationalVerSyncDBInterface::GetDatabaseCreateTimestamp(Timestamp &outTime) const +{ + return E_OK; +} + +int VirtualRelationalVerSyncDBInterface::GetBatchMetaData(const std::vector &keys, + std::vector &entries) const +{ + int errCode = E_OK; + for (const auto &key : keys) { + Entry entry; + entry.key = key; + errCode = GetMetaData(key, entry.value); + if (errCode != E_OK) { + return errCode; + } + entries.push_back(entry); + } + return errCode; +} + +int VirtualRelationalVerSyncDBInterface::PutBatchMetaData(std::vector &entries) +{ + int errCode = E_OK; + for (const auto &entry : entries) { + errCode = PutMetaData(entry.key, entry.value); + if (errCode != E_OK) { + return errCode; + } + } + return errCode; +} + +std::vector VirtualRelationalVerSyncDBInterface::GetTablesQuery() +{ + return {}; +} + +int VirtualRelationalVerSyncDBInterface::LocalDataChanged(int notifyEvent, std::vector &queryObj) +{ + return E_OK; +} + +int VirtualRelationalVerSyncDBInterface::GetInterfaceType() const +{ + return SYNC_RELATION; +} + +void VirtualRelationalVerSyncDBInterface::IncRefCount() +{ +} + +void VirtualRelationalVerSyncDBInterface::DecRefCount() +{ +} + +std::vector VirtualRelationalVerSyncDBInterface::GetIdentifier() const +{ + return {}; +} + +void VirtualRelationalVerSyncDBInterface::GetMaxTimestamp(Timestamp &stamp) const +{ + for (const auto &item : syncData_) { + for (const auto &entry : item.second) { + if (stamp < entry.second.logInfo.timestamp) { + stamp = entry.second.logInfo.timestamp; + } + } + } + LOGD("VirtualSingleVerSyncDBInterface::GetMaxTimestamp time = %" PRIu64, stamp); +} + +int VirtualRelationalVerSyncDBInterface::GetMetaData(const Key &key, Value &value) const +{ + auto iter = metadata_.find(key); + if (iter != metadata_.end()) { + value = iter->second; + return E_OK; + } + return -E_NOT_FOUND; +} + +int VirtualRelationalVerSyncDBInterface::PutMetaData(const Key &key, const Value &value) +{ + metadata_[key] = value; + return E_OK; +} + +int VirtualRelationalVerSyncDBInterface::DeleteMetaData(const std::vector &keys) +{ + for (const auto &key : keys) { + (void)metadata_.erase(key); + } + return E_OK; +} + +int VirtualRelationalVerSyncDBInterface::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const +{ + size_t prefixKeySize = keyPrefix.size(); + for (auto iter = metadata_.begin();iter != metadata_.end();) { + if (prefixKeySize <= iter->first.size() && + keyPrefix == Key(iter->first.begin(), std::next(iter->first.begin(), prefixKeySize))) { + iter = metadata_.erase(iter); + } else { + ++iter; + } + } + return E_OK; +} + +int VirtualRelationalVerSyncDBInterface::GetAllMetaKeys(std::vector &keys) const +{ + for (auto &iter : metadata_) { + keys.push_back(iter.first); + } + LOGD("GetAllMetaKeys size %zu", keys.size()); + return E_OK; +} + +const KvDBProperties &VirtualRelationalVerSyncDBInterface::GetDbProperties() const +{ + return properties_; +} + +void VirtualRelationalVerSyncDBInterface::SetLocalFieldInfo(const std::vector &localFieldInfo) +{ + localFieldInfo_.clear(); + localFieldInfo_ = localFieldInfo; +} + +int VirtualRelationalVerSyncDBInterface::GetAllSyncData(const std::string &tableName, + std::vector &data) +{ + if (syncData_.find(tableName) == syncData_.end()) { + return -E_NOT_FOUND; + } + for (const auto &entry : syncData_[tableName]) { + data.push_back(entry.second); + } + return E_OK; +} + +int VirtualRelationalVerSyncDBInterface::GetVirtualSyncData(const std::string &tableName, + const std::string &hashKey, VirtualRowData &data) +{ + if (syncData_.find(tableName) == syncData_.end()) { + return -E_NOT_FOUND; + } + if (syncData_.find(hashKey) == syncData_.end()) { + return -E_NOT_FOUND; + } + data = syncData_[tableName][hashKey]; + return E_OK; +} + +void VirtualRelationalVerSyncDBInterface::EraseSyncData(const std::string &tableName) +{ + if (syncData_.find(tableName) == syncData_.end()) { + return; + } + syncData_.erase(tableName); +} + +int VirtualRelationalVerSyncDBInterface::CreateDistributedDeviceTable(const std::string &device, + const RelationalSyncStrategy &syncStrategy) +{ + return E_OK; +} + +int VirtualRelationalVerSyncDBInterface::RegisterSchemaChangedCallback(const std::function &onSchemaChanged) +{ + return E_OK; +} + +void VirtualRelationalVerSyncDBInterface::SetTableInfo(const TableInfo &tableInfo) +{ + schemaObj_.AddRelationalTable(tableInfo); +} + +int VirtualRelationalVerSyncDBInterface::GetMaxTimestamp(const std::string &tableName, Timestamp ×tamp) const +{ + (void)tableName; + timestamp = 0; + return E_OK; +} + +void ObjectData::PutDataValue(const std::string &fieldName, const DataValue &value) +{ + fieldData[fieldName] = value; +} + +int ObjectData::GetDataValue(const std::string &fieldName, DataValue &value) const +{ + if (fieldData.find(fieldName) == fieldData.end()) { + return -E_NOT_FOUND; + } + value = fieldData[fieldName]; + return E_OK; +} +} #endif \ No newline at end of file diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.h b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.h index d82d55bbbfa296c843b27f2e21337d5913bf0e76..c2f30c829c2d02287d33fc5dad8ec5b6e8de6a75 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.h +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_relational_ver_sync_db_interface.h @@ -1,126 +1,126 @@ -/* - * Copyright (c) 2021 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef VIRTUAL_RELATIONAL_VER_SYNC_DB_INTERFACE_H -#define VIRTUAL_RELATIONAL_VER_SYNC_DB_INTERFACE_H -#ifdef RELATIONAL_STORE - -#include "data_transformer.h" -#include "relational_db_sync_interface.h" -#include "sqlite_single_ver_continue_token.h" -#include "relational_schema_object.h" - -namespace DistributedDB { -struct ObjectData { -public: - void PutDataValue(const std::string &fieldName, const DataValue &value); - int GetDataValue(const std::string &fieldName, DataValue &value) const; -private: - mutable std::map fieldData; -}; - -struct VirtualRowData { - LogInfo logInfo; - ObjectData objectData; -}; - -class VirtualRelationalVerSyncDBInterface : public RelationalDBSyncInterface { -public: - VirtualRelationalVerSyncDBInterface() = default; - ~VirtualRelationalVerSyncDBInterface() override = default; - - int PutSyncDataWithQuery(const QueryObject &query, const std::vector &entries, - const std::string &deviceName) override; - - int PutLocalData(const std::vector &dataList, const std::string &tableName); - - RelationalSchemaObject GetSchemaInfo() const override; - - int GetDatabaseCreateTimestamp(Timestamp &outTime) const override; - - int GetBatchMetaData(const std::vector &keys, std::vector &entries) const override; - - int PutBatchMetaData(std::vector &entries) override; - - std::vector GetTablesQuery() override; - - int LocalDataChanged(int notifyEvent, std::vector &queryObj) override; - - int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange, - const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken, - std::vector &entries) const override; - - int GetInterfaceType() const override; - - void IncRefCount() override; - - void DecRefCount() override; - - std::vector GetIdentifier() const override; - - void GetMaxTimestamp(Timestamp &stamp) const override; - - // Get the max timestamp of one table. - int GetMaxTimestamp(const std::string &tableName, Timestamp ×tamp) const override; - - int GetMetaData(const Key &key, Value &value) const override; - - int PutMetaData(const Key &key, const Value &value) override; - - int DeleteMetaData(const std::vector &keys) override; - - int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override; - - int GetAllMetaKeys(std::vector &keys) const override; - - const KvDBProperties &GetDbProperties() const override; - - void SetLocalFieldInfo(const std::vector &localFieldInfo); - - int GetAllSyncData(const std::string &tableName, std::vector &data); - - int GetVirtualSyncData(const std::string &tableName, const std::string &hashKey, VirtualRowData &data); - - int InterceptData(std::vector &entries, - const std::string &sourceID, const std::string &targetID) const override - { - return E_OK; - } - - int CheckAndInitQueryCondition(QueryObject &query) const override - { - return E_OK; - } - - int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override; - - int RegisterSchemaChangedCallback(const std::function &onSchemaChanged) override; - - void EraseSyncData(const std::string &tableName); - - void SetTableInfo(const TableInfo &tableInfo); - -private: - mutable std::map, std::vector> metadata_; - std::map> syncData_; - mutable std::map> localData_; - std::string schema_; - RelationalSchemaObject schemaObj_; - std::vector localFieldInfo_; - KvDBProperties properties_; - SecurityOption secOption_; -}; -} -#endif +/* + * Copyright (c) 2021 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef VIRTUAL_RELATIONAL_VER_SYNC_DB_INTERFACE_H +#define VIRTUAL_RELATIONAL_VER_SYNC_DB_INTERFACE_H +#ifdef RELATIONAL_STORE + +#include "data_transformer.h" +#include "relational_db_sync_interface.h" +#include "sqlite_single_ver_continue_token.h" +#include "relational_schema_object.h" + +namespace DistributedDB { +struct ObjectData { +public: + void PutDataValue(const std::string &fieldName, const DataValue &value); + int GetDataValue(const std::string &fieldName, DataValue &value) const; +private: + mutable std::map fieldData; +}; + +struct VirtualRowData { + LogInfo logInfo; + ObjectData objectData; +}; + +class VirtualRelationalVerSyncDBInterface : public RelationalDBSyncInterface { +public: + VirtualRelationalVerSyncDBInterface() = default; + ~VirtualRelationalVerSyncDBInterface() override = default; + + int PutSyncDataWithQuery(const QueryObject &query, const std::vector &entries, + const std::string &deviceName) override; + + int PutLocalData(const std::vector &dataList, const std::string &tableName); + + RelationalSchemaObject GetSchemaInfo() const override; + + int GetDatabaseCreateTimestamp(Timestamp &outTime) const override; + + int GetBatchMetaData(const std::vector &keys, std::vector &entries) const override; + + int PutBatchMetaData(std::vector &entries) override; + + std::vector GetTablesQuery() override; + + int LocalDataChanged(int notifyEvent, std::vector &queryObj) override; + + int GetSyncData(QueryObject &query, const SyncTimeRange &timeRange, + const DataSizeSpecInfo &dataSizeInfo, ContinueToken &continueStmtToken, + std::vector &entries) const override; + + int GetInterfaceType() const override; + + void IncRefCount() override; + + void DecRefCount() override; + + std::vector GetIdentifier() const override; + + void GetMaxTimestamp(Timestamp &stamp) const override; + + // Get the max timestamp of one table. + int GetMaxTimestamp(const std::string &tableName, Timestamp ×tamp) const override; + + int GetMetaData(const Key &key, Value &value) const override; + + int PutMetaData(const Key &key, const Value &value) override; + + int DeleteMetaData(const std::vector &keys) override; + + int DeleteMetaDataByPrefixKey(const Key &keyPrefix) const override; + + int GetAllMetaKeys(std::vector &keys) const override; + + const KvDBProperties &GetDbProperties() const override; + + void SetLocalFieldInfo(const std::vector &localFieldInfo); + + int GetAllSyncData(const std::string &tableName, std::vector &data); + + int GetVirtualSyncData(const std::string &tableName, const std::string &hashKey, VirtualRowData &data); + + int InterceptData(std::vector &entries, + const std::string &sourceID, const std::string &targetID) const override + { + return E_OK; + } + + int CheckAndInitQueryCondition(QueryObject &query) const override + { + return E_OK; + } + + int CreateDistributedDeviceTable(const std::string &device, const RelationalSyncStrategy &syncStrategy) override; + + int RegisterSchemaChangedCallback(const std::function &onSchemaChanged) override; + + void EraseSyncData(const std::string &tableName); + + void SetTableInfo(const TableInfo &tableInfo); + +private: + mutable std::map, std::vector> metadata_; + std::map> syncData_; + mutable std::map> localData_; + std::string schema_; + RelationalSchemaObject schemaObj_; + std::vector localFieldInfo_; + KvDBProperties properties_; + SecurityOption secOption_; +}; +} +#endif #endif // VIRTUAL_RELATIONAL_VER_SYNC_DB_INTERFACE_H \ No newline at end of file diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_time_sync_communicator.cpp b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_time_sync_communicator.cpp index fa20e0c2c6581674d2d8195dbc88427a54cd845b..5c42b4261a144e82f75e3b01e9cb004b26c071bf 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_time_sync_communicator.cpp +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_time_sync_communicator.cpp @@ -82,13 +82,14 @@ int VirtualTimeSyncCommunicator::GetLocalIdentity(std::string &outTarget) const return 0; } -int VirtualTimeSyncCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config) +int VirtualTimeSyncCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, + const SendConfig &config) { return SendMessage(dstTarget, inMsg, config, nullptr); } int VirtualTimeSyncCommunicator::SendMessage(const std::string &dstTarget, const Message *inMsg, - SendConfig &config, const OnSendEnd &onEnd) + const SendConfig &config, const OnSendEnd &onEnd) { if (!isEnable_) { LOGD("[VirtualTimeSyncCommunicator]the VirtualTimeSyncCommunicator disabled!"); diff --git a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_time_sync_communicator.h b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_time_sync_communicator.h index 410f587d4b62ca6359ea02c777c37956e4b3a947..e3165a464c9ce9ae7ee7a8cf1b69df4ba7305ee6 100644 --- a/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_time_sync_communicator.h +++ b/services/distributeddataservice/libs/distributeddb/test/unittest/common/syncer/virtual_time_sync_communicator.h @@ -57,8 +57,8 @@ public: // Get local target name for identify self int GetLocalIdentity(std::string &outTarget) const override; - int SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config) override; - int SendMessage(const std::string &dstTarget, const Message *inMsg, SendConfig &config, + int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config) override; + int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config, const OnSendEnd &onEnd) override; int GetRemoteCommunicatorVersion(const std::string &deviceId, uint16_t &version) const override;