1 Star 0 Fork 0

廖永煌/lyh

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
0506_2.diff 128.65 KB
一键复制 编辑 原始数据 按行查看 历史
廖永煌 提交于 2025-05-09 11:43 +08:00 . 1

diff --git a/CMakeLists.txt b/CMakeLists.txt
index a70dc8a01..ff1d2789f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -321,7 +321,8 @@ endif()
set(DB_PATH ${PROJECT_SOURCE_DIR}/frameworks/libs/distributeddb)
-add_executable(distributed_ut ${DISTRIBUTEDDB_SRC} ${SECUREC_SRC} ${SQLITE_SRC})
+add_executable(distributed_ut ${DISTRIBUTEDDB_SRC} ${SECUREC_SRC} ${SQLITE_SRC}
+ frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_relational_multi_user_sync_test.cpp)
target_link_libraries(
distributed_ut
diff --git a/frameworks/libs/distributeddb/communicator/include/icommunicator.h b/frameworks/libs/distributeddb/communicator/include/icommunicator.h
index 4b73ab326..578bec232 100644
--- a/frameworks/libs/distributeddb/communicator/include/icommunicator.h
+++ b/frameworks/libs/distributeddb/communicator/include/icommunicator.h
@@ -26,7 +26,7 @@
namespace DistributedDB {
// inMsg is heap memory, its ownership transfers by calling OnMessageCallback
-using OnMessageCallback = std::function<void(const std::string &srcTarget, Message *inMsg)>;
+using OnMessageCallback = std::function<void(const std::string &srcTarget, const std::string userId, Message *inMsg)>;
constexpr uint32_t SEND_TIME_OUT = 3000; // 3s
struct SendConfig {
diff --git a/frameworks/libs/distributeddb/communicator/src/communicator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator.cpp
index 608adaf91..027424553 100644
--- a/frameworks/libs/distributeddb/communicator/src/communicator.cpp
+++ b/frameworks/libs/distributeddb/communicator/src/communicator.cpp
@@ -144,7 +144,7 @@ int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg
return errCode;
}
-void Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf)
+void Communicator::OnBufferReceive(const std::string &srcTarget, const std::string &userId, const SerialBuffer *inBuf)
{
std::lock_guard<std::mutex> messageHandleLockGuard(messageHandleMutex_);
if (srcTarget.size() != 0 && inBuf != nullptr && onMessageHandle_) {
@@ -166,7 +166,7 @@ void Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuf
return;
}
LOGI("[Comm][Receive] label=%.3s, srcTarget=%s{private}.", VEC_TO_STR(commLabel_), srcTarget.c_str());
- onMessageHandle_(srcTarget, message);
+ onMessageHandle_(srcTarget, userId, message);
} else {
LOGE("[Comm][Receive] label=%.3s, src.size=%zu or buf or handle invalid.", VEC_TO_STR(commLabel_),
srcTarget.size());
diff --git a/frameworks/libs/distributeddb/communicator/src/communicator.h b/frameworks/libs/distributeddb/communicator/src/communicator.h
index 976ce0ae9..5bee40aaf 100644
--- a/frameworks/libs/distributeddb/communicator/src/communicator.h
+++ b/frameworks/libs/distributeddb/communicator/src/communicator.h
@@ -56,7 +56,7 @@ public:
const OnSendEnd &onEnd) override;
// Call by CommunicatorAggregator directly
- void OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf);
+ void OnBufferReceive(const std::string &srcTarget, const std::string &userId, const SerialBuffer *inBuf);
// Call by CommunicatorAggregator directly
void OnConnectChange(const std::string &target, bool isConnect);
diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp
index ea724de99..ee3bbaf75 100644
--- a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp
+++ b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp
@@ -276,7 +276,7 @@ void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel, co
// Do Redeliver, the communicator is responsible to deal with the frame
std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
for (auto &entry : framesToRedeliver) {
- commMap_[userId].at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer);
+ commMap_[userId].at(commLabel).first->OnBufferReceive(entry.srcTarget, userId, entry.buffer);
}
}
@@ -749,7 +749,7 @@ int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const s
{
// Ignore nonactivated communicator, which is regarded as inexistent
if (commMap_[userId].count(toLabel) != 0 && commMap_[userId].at(toLabel).second) {
- commMap_[userId].at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer);
+ commMap_[userId].at(toLabel).first->OnBufferReceive(srcTarget, userId, inFrameBuffer);
// Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
inFrameBuffer = nullptr;
return E_OK;
@@ -771,7 +771,7 @@ int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const s
}
}
if (communicator != nullptr && (userId.empty() || isEmpty)) {
- communicator->OnBufferReceive(srcTarget, inFrameBuffer);
+ communicator->OnBufferReceive(srcTarget, userId, inFrameBuffer);
inFrameBuffer = nullptr;
return E_OK;
}
diff --git a/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h b/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h
index 08870ff23..c39b5205b 100644
--- a/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h
+++ b/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h
@@ -204,6 +204,8 @@ public:
const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete,
const Query &query, bool wait) = 0;
+ DB_API virtual DBStatus Sync(const DeviceSyncParam &param, const KvDeviceSyncOnCompleteCallback &onComplete) = 0;
+
// Sync with device, provides sync count information
DB_API virtual DBStatus Sync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess) = 0;
diff --git a/frameworks/libs/distributeddb/interfaces/include/relational/relational_store_delegate.h b/frameworks/libs/distributeddb/interfaces/include/relational/relational_store_delegate.h
index b58f83fe2..2370db55d 100644
--- a/frameworks/libs/distributeddb/interfaces/include/relational/relational_store_delegate.h
+++ b/frameworks/libs/distributeddb/interfaces/include/relational/relational_store_delegate.h
@@ -61,6 +61,8 @@ public:
DB_API virtual DBStatus Sync(const std::vector<std::string> &devices, SyncMode mode,
const Query &query, const SyncStatusCallback &onComplete, bool wait) = 0;
+ DB_API virtual DBStatus Sync(const DeviceSyncParam &param, const RdbDeviceSyncOnCompleteCallback &onComplete) = 0;
+
DB_API virtual int32_t GetCloudSyncTaskCount()
{
return 0;
diff --git a/frameworks/libs/distributeddb/interfaces/include/store_types.h b/frameworks/libs/distributeddb/interfaces/include/store_types.h
index a5dd8fd81..9e189b10f 100644
--- a/frameworks/libs/distributeddb/interfaces/include/store_types.h
+++ b/frameworks/libs/distributeddb/interfaces/include/store_types.h
@@ -20,6 +20,7 @@
#include <map>
#include <set>
#include <string>
+#include <utility>
#include "query.h"
#include "types_export.h"
@@ -196,6 +197,29 @@ struct SyncProcess {
struct DeviceSyncOption {
std::vector<std::string> devices;
+ std::string userId;
+ SyncMode mode = SYNC_MODE_PULL_ONLY;
+ Query query; // isQuery must be set to true if the query is set
+ bool isQuery = false;
+ bool isWait = true;
+};
+
+struct DeviceSyncTarget {
+ std::string device;
+ std::string userId;
+
+ DeviceSyncTarget(std::string device, std::string userId) : device(std::move(device)), userId(std::move(userId)) {}
+
+ bool operator<(const DeviceSyncTarget& other) const {
+ if (device == other.device) {
+ return userId < other.userId;
+ }
+ return device < other.device;
+ }
+};
+
+struct DeviceSyncParam {
+ std::vector<DeviceSyncTarget> syncTargets;
SyncMode mode = SYNC_MODE_PULL_ONLY;
Query query; // isQuery must be set to true if the query is set
bool isQuery = false;
@@ -224,6 +248,11 @@ using SyncProcessCallback = std::function<void(const std::map<std::string, SyncP
using DeviceSyncProcessCallback = std::function<void(const std::map<std::string, DeviceSyncProcess> &processMap)>;
+using KvDeviceSyncOnCompleteCallback = std::function<void(const std::map<DeviceSyncTarget, DBStatus> &devicesMap)>;
+
+using RdbDeviceSyncOnCompleteCallback = std::function<void(const std::map<DeviceSyncTarget,
+ std::vector<TableStatus>> &devicesMap)>;
+
struct RemoteCondition {
std::string sql; // The sql statement;
std::vector<std::string> bindArgs; // The bind args.
diff --git a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp
index e38585b3d..f8bb55213 100644
--- a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp
+++ b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp
@@ -573,8 +573,12 @@ DBStatus KvStoreNbDelegateImpl::Sync(const std::vector<std::string> &devices, Sy
return NOT_SUPPORT;
}
+ std::vector<DeviceSyncTarget> syncTargets;
+ for (const auto &device : devices) {
+ syncTargets.push_back({device, ""});
+ }
PragmaSync pragmaData(
- devices, mode, [this, onComplete](const std::map<std::string, int> &statuses) {
+ syncTargets, mode, [this, onComplete](const std::map<DeviceSyncTarget, int> &statuses) {
OnSyncComplete(statuses, onComplete);
}, wait);
int errCode = conn_->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData);
@@ -612,8 +616,13 @@ DBStatus KvStoreNbDelegateImpl::Sync(const std::vector<std::string> &devices, Sy
LOGE("not support order by timestamp and query by range");
return NOT_SUPPORT;
}
+
+ std::vector<DeviceSyncTarget> syncTargets;
+ for (const auto &device : devices) {
+ syncTargets.push_back({device, ""});
+ }
PragmaSync pragmaData(
- devices, mode, querySyncObj, [this, onComplete](const std::map<std::string, int> &statuses) {
+ syncTargets, mode, querySyncObj, [this, onComplete](const std::map<DeviceSyncTarget, int> &statuses) {
OnSyncComplete(statuses, onComplete);
}, wait);
int errCode = conn_->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData);
@@ -624,6 +633,42 @@ DBStatus KvStoreNbDelegateImpl::Sync(const std::vector<std::string> &devices, Sy
return OK;
}
+DBStatus KvStoreNbDelegateImpl::Sync(const DeviceSyncParam &param, const KvDeviceSyncOnCompleteCallback &onComplete)
+{
+ if (conn_ == nullptr) {
+ LOGE("%s", INVALID_CONNECTION);
+ return DB_ERROR;
+ }
+ if (param.mode > SYNC_MODE_PUSH_PULL) {
+ LOGE("not support other mode");
+ return NOT_SUPPORT;
+ }
+
+ QuerySyncObject querySyncObj(param.query);
+ if (!querySyncObj.GetRelationTableNames().empty()) {
+ LOGE("check query table names from tables failed!");
+ return NOT_SUPPORT;
+ }
+
+ if (!DBCommon::CheckQueryWithoutMultiTable(param.query)) {
+ LOGE("not support for invalid query");
+ return NOT_SUPPORT;
+ }
+ if (querySyncObj.GetSortType() != SortType::NONE || querySyncObj.IsQueryByRange()) {
+ LOGE("not support order by timestamp and query by range");
+ return NOT_SUPPORT;
+ }
+ PragmaSync pragmaData(param, [this, onComplete](const std::map<DeviceSyncTarget, int> &statuses) {
+ OnSyncComplete(statuses, onComplete);
+ });
+ int errCode = conn_->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData);
+ if (errCode < E_OK) {
+ LOGE("[KvStoreNbDelegate] QuerySync data failed:%d", errCode);
+ return TransferDBErrno(errCode);
+ }
+ return OK;
+}
+
void KvStoreNbDelegateImpl::OnDeviceSyncProcess(const std::map<std::string, DeviceSyncProcess> &processMap,
const DeviceSyncProcessCallback &onProcess) const
{
@@ -1015,13 +1060,26 @@ DBStatus KvStoreNbDelegateImpl::DeleteInner(const IOption &option, const Key &ke
return TransferDBErrno(errCode);
}
-void KvStoreNbDelegateImpl::OnSyncComplete(const std::map<std::string, int> &statuses,
+void KvStoreNbDelegateImpl::OnSyncComplete(const std::map<DeviceSyncTarget, int> &statuses,
const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete) const
{
std::map<std::string, DBStatus> result;
for (const auto &pair : statuses) {
DBStatus status = SyncOperation::DBStatusTrans(pair.second);
- result.insert(std::pair<std::string, DBStatus>(pair.first, status));
+ result.insert(std::pair<std::string, DBStatus>(pair.first.device, status));
+ }
+ if (onComplete) {
+ onComplete(result);
+ }
+}
+
+void KvStoreNbDelegateImpl::OnSyncComplete(const std::map<DeviceSyncTarget, int> &statuses,
+ const KvDeviceSyncOnCompleteCallback &onComplete) const
+{
+ std::map<DeviceSyncTarget, DBStatus> result;
+ for (const auto &pair : statuses) {
+ DBStatus status = SyncOperation::DBStatusTrans(pair.second);
+ result.insert(std::pair<DeviceSyncTarget, DBStatus>(pair.first, status));
}
if (onComplete) {
onComplete(result);
@@ -1074,8 +1132,15 @@ DBStatus KvStoreNbDelegateImpl::SubscribeRemoteQuery(const std::vector<std::stri
LOGE("not support order by timestamp and query by range");
return NOT_SUPPORT;
}
- PragmaSync pragmaData(devices, SyncModeType::SUBSCRIBE_QUERY, querySyncObj,
- [this, onComplete](const std::map<std::string, int> &statuses) { OnSyncComplete(statuses, onComplete); }, wait);
+
+ std::vector<DeviceSyncTarget> syncTargets;
+ for (const auto &device : devices) {
+ syncTargets.push_back({device, ""});
+ }
+ PragmaSync pragmaData(syncTargets, SyncModeType::SUBSCRIBE_QUERY, querySyncObj,
+ [this, onComplete](const std::map<DeviceSyncTarget, int> &statuses) {
+ OnSyncComplete(statuses, onComplete);
+ }, wait);
int errCode = conn_->Pragma(PRAGMA_SUBSCRIBE_QUERY, &pragmaData);
if (errCode < E_OK) {
LOGE("[KvStoreNbDelegate] Subscribe remote data with query failed:%d", errCode);
@@ -1098,8 +1163,15 @@ DBStatus KvStoreNbDelegateImpl::UnSubscribeRemoteQuery(const std::vector<std::st
LOGE("not support order by timestamp and query by range");
return NOT_SUPPORT;
}
- PragmaSync pragmaData(devices, SyncModeType::UNSUBSCRIBE_QUERY, querySyncObj,
- [this, onComplete](const std::map<std::string, int> &statuses) { OnSyncComplete(statuses, onComplete); }, wait);
+
+ std::vector<DeviceSyncTarget> syncTargets;
+ for (const auto &device : devices) {
+ syncTargets.push_back({device, ""});
+ }
+ PragmaSync pragmaData(syncTargets, SyncModeType::UNSUBSCRIBE_QUERY, querySyncObj,
+ [this, onComplete](const std::map<DeviceSyncTarget, int> &statuses) {
+ OnSyncComplete(statuses, onComplete);
+ }, wait);
int errCode = conn_->Pragma(PRAGMA_SUBSCRIBE_QUERY, &pragmaData);
if (errCode < E_OK) {
LOGE("[KvStoreNbDelegate] Unsubscribe remote data with query failed:%d", errCode);
diff --git a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h
index e03bc6f3c..2717de01d 100644
--- a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h
+++ b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h
@@ -133,6 +133,8 @@ public:
const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete,
const Query &query, bool wait) override;
+ DBStatus Sync(const DeviceSyncParam &param, const KvDeviceSyncOnCompleteCallback &onComplete);
+
// Sync with devices, provides sync count information
DBStatus Sync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess) override;
@@ -203,8 +205,11 @@ private:
DBStatus DeleteInner(const IOption &option, const Key &key);
DBStatus GetEntriesInner(const IOption &option, const Key &keyPrefix, std::vector<Entry> &entries) const;
- void OnSyncComplete(const std::map<std::string, int> &statuses,
+ void OnSyncComplete(const std::map<DeviceSyncTarget, int> &statuses,
const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete) const;
+
+ void OnSyncComplete(const std::map<DeviceSyncTarget, int> &statuses,
+ const KvDeviceSyncOnCompleteCallback &onComplete) const;
void OnDeviceSyncProcess(const std::map<std::string, DeviceSyncProcess> &processMap,
const DeviceSyncProcessCallback &onProcess) const;
diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp b/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp
index 5cf4ebdb9..a666aec23 100644
--- a/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp
+++ b/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp
@@ -109,9 +109,17 @@ DBStatus RelationalStoreDelegateImpl::Sync(const std::vector<std::string> &devic
LOGE("not support query with tables");
return NOT_SUPPORT;
}
- RelationalStoreConnection::SyncInfo syncInfo{devices, mode,
- [this, onComplete](const std::map<std::string, std::vector<TableStatus>> &devicesStatus) {
- OnSyncComplete(devicesStatus, onComplete);
+ std::vector<DeviceSyncTarget> syncTargets;
+ for (const auto &device : devices) {
+ syncTargets.push_back({device, ""});
+ }
+ RelationalStoreConnection::SyncInfo syncInfo{syncTargets, mode,
+ [this, onComplete](const std::map<DeviceSyncTarget, std::vector<TableStatus>> &devicesStatus) {
+ std::map<std::string, std::vector<TableStatus>> devicesSyncStatus;
+ for (const auto &status : devicesStatus) {
+ devicesSyncStatus[status.first.device] = status.second;
+ }
+ OnSyncComplete(devicesSyncStatus, onComplete);
}, query, wait};
int errCode = conn_->SyncToDevice(syncInfo);
if (errCode != E_OK) {
@@ -121,6 +129,35 @@ DBStatus RelationalStoreDelegateImpl::Sync(const std::vector<std::string> &devic
return OK;
}
+DBStatus RelationalStoreDelegateImpl::Sync(const DeviceSyncParam &param,
+ const RdbDeviceSyncOnCompleteCallback &onComplete)
+{
+ if (conn_ == nullptr) {
+ LOGE("Invalid connection for operation!");
+ return DB_ERROR;
+ }
+
+ if (param.mode > SYNC_MODE_PUSH_PULL) {
+ LOGE("not support other mode");
+ return NOT_SUPPORT;
+ }
+
+ if (!param.isQuery || !DBCommon::CheckQueryWithoutMultiTable(param.query)) {
+ LOGE("not set query or not support query with tables");
+ return NOT_SUPPORT;
+ }
+ RelationalStoreConnection::SyncInfo syncInfo{param.syncTargets, param.mode,
+ [this, onComplete](const std::map<DeviceSyncTarget, std::vector<TableStatus>> &devicesStatus) {
+ OnSyncComplete(devicesStatus, onComplete);
+ }, param.query, param.isWait};
+ int errCode = conn_->SyncToDevice(syncInfo);
+ if (errCode != E_OK) {
+ LOGW("[RelationalStore Delegate] sync data to device failed:%d", errCode);
+ return TransferDBErrno(errCode);
+ }
+ return OK;
+}
+
DBStatus RelationalStoreDelegateImpl::RemoveDeviceData(const std::string &device, const std::string &tableName)
{
if (conn_ == nullptr) {
@@ -172,12 +209,29 @@ void RelationalStoreDelegateImpl::OnSyncComplete(const std::map<std::string, std
const SyncStatusCallback &onComplete)
{
std::map<std::string, std::vector<TableStatus>> res;
- for (const auto &[device, tablesStatus] : devicesStatus) {
+ for (const auto &[syncTarget, tablesStatus] : devicesStatus) {
+ for (const auto &tableStatus : tablesStatus) {
+ TableStatus table;
+ table.tableName = tableStatus.tableName;
+ table.status = SyncOperation::DBStatusTrans(tableStatus.status);
+ res[syncTarget].push_back(table);
+ }
+ }
+ if (onComplete) {
+ onComplete(res);
+ }
+}
+
+void RelationalStoreDelegateImpl::OnSyncComplete(const std::map<DeviceSyncTarget, std::vector<TableStatus>> &devicesStatus,
+ const RdbDeviceSyncOnCompleteCallback &onComplete)
+{
+ std::map<DeviceSyncTarget, std::vector<TableStatus>> res;
+ for (const auto &[syncTarget, tablesStatus] : devicesStatus) {
for (const auto &tableStatus : tablesStatus) {
TableStatus table;
table.tableName = tableStatus.tableName;
table.status = SyncOperation::DBStatusTrans(tableStatus.status);
- res[device].push_back(table);
+ res[syncTarget].push_back(table);
}
}
if (onComplete) {
diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.h b/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.h
index f53b07233..d9faa9e69 100644
--- a/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.h
+++ b/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.h
@@ -34,6 +34,8 @@ public:
DBStatus Sync(const std::vector<std::string> &devices, SyncMode mode,
const Query &query, const SyncStatusCallback &onComplete, bool wait) override;
+ DBStatus Sync(const DeviceSyncParam &param, const RdbDeviceSyncOnCompleteCallback &onComplete) override;
+
DBStatus RemoveDeviceDataInner(const std::string &device, ClearMode mode) override;
DBStatus CreateDistributedTableInner(const std::string &tableName, TableSyncType type) override;
@@ -98,6 +100,8 @@ public:
private:
static void OnSyncComplete(const std::map<std::string, std::vector<TableStatus>> &devicesStatus,
const SyncStatusCallback &onComplete);
+ static void OnSyncComplete(const std::map<DeviceSyncTarget, std::vector<TableStatus>> &devicesStatus,
+ const RdbDeviceSyncOnCompleteCallback &onComplete);
#ifdef USE_DISTRIBUTEDDB_CLOUD
DBStatus ClearWatermark(const ClearMetaDataOption &option);
diff --git a/frameworks/libs/distributeddb/storage/include/kvdb_pragma.h b/frameworks/libs/distributeddb/storage/include/kvdb_pragma.h
index dd47e8fd5..48d9b83a3 100644
--- a/frameworks/libs/distributeddb/storage/include/kvdb_pragma.h
+++ b/frameworks/libs/distributeddb/storage/include/kvdb_pragma.h
@@ -56,10 +56,10 @@ enum : int {
};
struct PragmaSync {
- PragmaSync(const std::vector<std::string> &devices, int mode, const QuerySyncObject &query,
- const std::function<void(const std::map<std::string, int> &devicesMap)> &onComplete,
+ PragmaSync(const std::vector<DeviceSyncTarget> &syncTargets, int mode, const QuerySyncObject &query,
+ const std::function<void(const std::map<DeviceSyncTarget, int> &devicesMap)> &onComplete,
bool wait = false)
- : devices_(devices),
+ : syncTargets_(syncTargets),
mode_(mode),
onComplete_(onComplete),
wait_(wait),
@@ -68,10 +68,10 @@ struct PragmaSync {
{
}
- PragmaSync(const std::vector<std::string> &devices, int mode,
- const std::function<void(const std::map<std::string, int> &devicesMap)> &onComplete,
+ PragmaSync(const std::vector<DeviceSyncTarget> &syncTargets, int mode,
+ const std::function<void(const std::map<DeviceSyncTarget, int> &devicesMap)> &onComplete,
bool wait = false)
- : devices_(devices),
+ : syncTargets_(syncTargets),
mode_(mode),
onComplete_(onComplete),
wait_(wait),
@@ -81,8 +81,7 @@ struct PragmaSync {
}
PragmaSync(const DeviceSyncOption &option, const QuerySyncObject &query, const DeviceSyncProcessCallback &onProcess)
- : devices_(option.devices),
- mode_(option.mode),
+ : mode_(option.mode),
wait_(option.isWait),
isQuerySync_(option.isQuery),
onSyncProcess_(onProcess)
@@ -91,21 +90,36 @@ struct PragmaSync {
return;
}
query_ = query;
+ for (const auto &device : option.devices) {
+ syncTargets_.push_back({device, ""});
+ }
}
PragmaSync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess)
- : devices_(option.devices),
- mode_(option.mode),
+ : mode_(option.mode),
wait_(option.isWait),
isQuerySync_(false),
query_(Query::Select()),
onSyncProcess_(onProcess)
+ {
+ for (const auto &device : option.devices) {
+ syncTargets_.push_back({device, ""});
+ }
+ }
+
+ PragmaSync(const DeviceSyncParam &param,
+ const std::function<void(const std::map<DeviceSyncTarget, int> &devicesMap)> &onComplete)
+ : mode_(param.mode),
+ wait_(param.isWait),
+ isQuerySync_(false),
+ query_(Query::Select()),
+ onComplete_(onComplete)
{
}
- std::vector<std::string> devices_;
+ std::vector<DeviceSyncTarget> syncTargets_;
int mode_;
- std::function<void(const std::map<std::string, int> &devicesMap)> onComplete_;
+ std::function<void(const std::map<DeviceSyncTarget, int> &devicesMap)> onComplete_;
bool wait_;
bool isQuerySync_;
QuerySyncObject query_;
diff --git a/frameworks/libs/distributeddb/storage/include/relational_store_connection.h b/frameworks/libs/distributeddb/storage/include/relational_store_connection.h
index 67bdc3548..86846e369 100644
--- a/frameworks/libs/distributeddb/storage/include/relational_store_connection.h
+++ b/frameworks/libs/distributeddb/storage/include/relational_store_connection.h
@@ -32,9 +32,19 @@ using RelationalObserverAction =
class RelationalStoreConnection : public IConnection, public virtual RefObject {
public:
struct SyncInfo {
- const std::vector<std::string> &devices;
+ SyncInfo(const std::vector<DeviceSyncTarget> &syncTargets, SyncMode mode,
+ const RdbDeviceSyncOnCompleteCallback onComplete, const Query &query, bool wait)
+ : syncTargets(syncTargets),
+ mode(mode),
+ onComplete(onComplete),
+ query(query),
+ wait(wait)
+ {
+ }
+
+ const std::vector<DeviceSyncTarget> syncTargets;
SyncMode mode = SYNC_MODE_PUSH_PULL;
- const SyncStatusCallback onComplete = nullptr;
+ const RdbDeviceSyncOnCompleteCallback onComplete = nullptr;
const Query &query;
bool wait = true;
};
diff --git a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp
index 94275ffec..170eb91de 100644
--- a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp
+++ b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp
@@ -146,16 +146,16 @@ int SyncAbleKvDBConnection::PragmaSyncAction(const PragmaSync *syncParameter)
}
ISyncer::SyncParma syncParam;
- syncParam.devices = syncParameter->devices_;
+ syncParam.syncTargets = syncParameter->syncTargets_;
syncParam.mode = syncParameter->mode_;
syncParam.wait = syncParameter->wait_;
syncParam.isQuerySync = syncParameter->isQuerySync_;
syncParam.syncQuery = syncParameter->query_;
syncParam.onFinalize = [this]() { DecObjRef(this); };
if (syncParameter->onComplete_) {
- syncParam.onComplete = [this, onComplete = syncParameter->onComplete_, wait = syncParameter->wait_](
- const std::map<std::string, int> &statuses) {
- OnSyncComplete(statuses, onComplete, wait);
+ syncParam.onComplete = [this, onComplete = syncParameter->onComplete_](
+ const std::map<DeviceSyncTarget, int> &statuses) {
+ OnSyncComplete(statuses, onComplete);
};
}
if (syncParameter->onSyncProcess_) {
@@ -207,8 +207,8 @@ int SyncAbleKvDBConnection::EnableAutoSync(bool enable)
return E_OK;
}
-void SyncAbleKvDBConnection::OnSyncComplete(const std::map<std::string, int> &statuses,
- const std::function<void(const std::map<std::string, int> &devicesMap)> &onComplete, bool wait)
+void SyncAbleKvDBConnection::OnSyncComplete(const std::map<DeviceSyncTarget, int> &statuses,
+ const std::function<void(const std::map<DeviceSyncTarget, int> &devicesMap)> &onComplete)
{
AutoLock lockGuard(this);
if (!IsKilled() && onComplete) {
diff --git a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h
index 4594b3dd9..70adf608e 100644
--- a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h
+++ b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h
@@ -65,11 +65,11 @@ private:
// If enable is true, it will enable auto sync
int EnableAutoSync(bool enable);
- void OnSyncComplete(const std::map<std::string, int> &statuses,
- const std::function<void(const std::map<std::string, int> &devicesMap)> &onComplete, bool wait);
+ void OnSyncComplete(const std::map<DeviceSyncTarget, int> &statuses,
+ const std::function<void(const std::map<DeviceSyncTarget, int> &devicesMap)> &onComplete);
void OnDeviceSyncProcess(const std::map<std::string, DeviceSyncProcess> &syncRecordMap,
- const DeviceSyncProcessCallback &onProcess);
+ const DeviceSyncProcessCallback &onProcess);
int GetQueuedSyncSize(int *queuedSyncSize) const;
diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp
index f9db627ac..f6ac30676 100644
--- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp
+++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp
@@ -255,7 +255,7 @@ int SQLiteRelationalStoreConnection::SyncToDevice(SyncInfo &info)
}
ISyncer::SyncParma syncParam;
- syncParam.devices = info.devices;
+ syncParam.syncTargets = info.syncTargets;
syncParam.mode = info.mode;
syncParam.wait = info.wait;
syncParam.isQuerySync = true;
diff --git a/frameworks/libs/distributeddb/syncer/include/isyncer.h b/frameworks/libs/distributeddb/syncer/include/isyncer.h
index 61e0c44c4..83f705f15 100644
--- a/frameworks/libs/distributeddb/syncer/include/isyncer.h
+++ b/frameworks/libs/distributeddb/syncer/include/isyncer.h
@@ -36,9 +36,9 @@ struct SyncerBasicInfo {
class ISyncer {
public:
struct SyncParma {
- std::vector<std::string> devices;
- std::function<void(const std::map<std::string, int> &devicesMap)> onComplete;
- SyncStatusCallback relationOnComplete;
+ std::vector<DeviceSyncTarget> syncTargets;
+ std::function<void(const std::map<DeviceSyncTarget, int> &devicesMap)> onComplete;
+ RdbDeviceSyncOnCompleteCallback relationOnComplete;
std::function<void(void)> onFinalize;
int mode = 0;
bool wait = false;
@@ -61,8 +61,8 @@ public:
// param onComplete: The syncer finish callback. set by caller
// param onFinalize: will be callback when this Sync Operation finalized.
// return a Sync id. It will return a positive value if failed,
- virtual int Sync(const std::vector<std::string> &devices, int mode,
- const std::function<void(const std::map<std::string, int> &)> &onComplete,
+ virtual int Sync(const std::vector<DeviceSyncTarget> &syncTargets, int mode,
+ const std::function<void(const std::map<DeviceSyncTarget, int> &)> &onComplete,
const std::function<void(void)> &onFinalize, bool wait) = 0;
// Sync function. use SyncParma to reduce parameter.
diff --git a/frameworks/libs/distributeddb/syncer/include/syncer_proxy.h b/frameworks/libs/distributeddb/syncer/include/syncer_proxy.h
index 0f6d5bc7d..eea9152d7 100644
--- a/frameworks/libs/distributeddb/syncer/include/syncer_proxy.h
+++ b/frameworks/libs/distributeddb/syncer/include/syncer_proxy.h
@@ -41,8 +41,8 @@ public:
// param onComplete: The syncer finish callback. set by caller
// param onFinalize: will be callback when this Sync Operation finalized.
// return a Sync id. It will return a positive value if failed,
- int Sync(const std::vector<std::string> &devices, int mode,
- const std::function<void(const std::map<std::string, int> &)> &onComplete,
+ int Sync(const std::vector<DeviceSyncTarget> &syncTargets, int mode,
+ const std::function<void(const std::map<DeviceSyncTarget, int> &)> &onComplete,
const std::function<void(void)> &onFinalize, bool wait) override;
// Sync function. use SyncParma to reduce parameter.
diff --git a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp
index 772dee8f9..f6ef90aa1 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp
@@ -146,12 +146,12 @@ int GenericSyncer::Close(bool isClosedOperation)
return errCode;
}
-int GenericSyncer::Sync(const std::vector<std::string> &devices, int mode,
- const std::function<void(const std::map<std::string, int> &)> &onComplete,
+int GenericSyncer::Sync(const std::vector<DeviceSyncTarget> &syncTargets, int mode,
+ const std::function<void(const std::map<DeviceSyncTarget, int> &)> &onComplete,
const std::function<void(void)> &onFinalize, bool wait = false)
{
SyncParma param;
- param.devices = devices;
+ param.syncTargets = syncTargets;
param.mode = mode;
param.onComplete = onComplete;
param.onFinalize = onFinalize;
@@ -162,7 +162,11 @@ int GenericSyncer::Sync(const std::vector<std::string> &devices, int mode,
int GenericSyncer::Sync(const InternalSyncParma &param)
{
SyncParma syncParam;
- syncParam.devices = param.devices;
+ std::vector<DeviceSyncTarget> syncTargets;
+ for (const auto &device : param.devices) {
+ syncTargets.push_back({device, ""});
+ }
+ syncParam.syncTargets = syncTargets;
syncParam.mode = param.mode;
syncParam.isQuerySync = param.isQuerySync;
syncParam.syncQuery = param.syncQuery;
@@ -234,8 +238,9 @@ int GenericSyncer::CancelSync(uint32_t syncId)
int GenericSyncer::PrepareSync(const SyncParma &param, uint32_t syncId, uint64_t connectionId)
{
- auto *operation =
- new (std::nothrow) SyncOperation(syncId, param.devices, param.mode, param.onComplete, param.wait);
+ SyncOperation *operation = nullptr;
+ operation = new(std::nothrow) SyncOperation(syncId, param.syncTargets, param.mode, param.onComplete,
+ param.wait);
if (operation == nullptr) {
SubQueuedSyncSize();
return -E_OUT_OF_MEMORY;
@@ -246,8 +251,8 @@ int GenericSyncer::PrepareSync(const SyncParma &param, uint32_t syncId, uint64_t
std::lock_guard<std::mutex> autoLock(syncerLock_);
PerformanceAnalysis::GetInstance()->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
InitSyncOperation(operation, param);
- LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %s, devices = %s", syncId, param.mode,
- param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str());
+ LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %s, syncTargets = %s",
+ syncId, param.mode, param.wait, label_.c_str(), GetSyncTargetsStr(param.syncTargets).c_str());
engine = syncEngine_;
RefObject::IncObjRef(engine);
}
@@ -536,11 +541,18 @@ int GenericSyncer::SyncConditionCheck(const SyncParma &param, const ISyncEngine
return E_OK;
}
-bool GenericSyncer::IsValidDevices(const std::vector<std::string> &devices) const
+bool GenericSyncer::IsValidDevices(const std::vector<DeviceSyncTarget> &syncTargets) const
{
- if (devices.empty()) {
- LOGE("[Syncer] devices is empty!");
+ if (syncTargets.empty()) {
+ LOGE("[Syncer] syncTargets is empty!");
return false;
+ } else {
+ for (const auto &target : syncTargets) {
+ if (target.device.empty()) {
+ LOGE("[Syncer] there is a device in sync targets is empty!");
+ return false;
+ }
+ }
}
return true;
}
@@ -858,6 +870,21 @@ std::string GenericSyncer::GetSyncDevicesStr(const std::vector<std::string> &dev
return syncDevices.substr(0, syncDevices.size() - 1);
}
+std::string GenericSyncer::GetSyncTargetsStr(const std::vector<DeviceSyncTarget> &syncTargets) const
+{
+ std::string syncTargetsStr;
+ for (const auto &target : syncTargets) {
+ syncTargetsStr += DBCommon::StringMasking(target.device);
+ syncTargetsStr += "-";
+ syncTargetsStr += target.userId;
+ syncTargetsStr += ",";
+ }
+ if (syncTargetsStr.empty()) {
+ return "";
+ }
+ return syncTargetsStr.substr(0, syncTargetsStr.size() - 1);
+}
+
int GenericSyncer::StatusCheck() const
{
if (!initialized_) {
@@ -882,7 +909,7 @@ int GenericSyncer::SyncPreCheck(const SyncParma &param) const
if (errCode != E_OK) {
return errCode;
}
- if (!IsValidDevices(param.devices) || !IsValidMode(param.mode)) { // LCOV_EXCL_BR_LINE
+ if (!IsValidDevices(param.syncTargets) || !IsValidMode(param.mode)) { // LCOV_EXCL_BR_LINE
return -E_INVALID_ARGS;
}
if (IsQueuedManualSyncFull(param.mode, param.wait)) { // LCOV_EXCL_BR_LINE
diff --git a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h
index 183fbb1fa..1be3d2a99 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h
+++ b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h
@@ -46,8 +46,8 @@ public:
// param onComplete: The syncer finish callback. set by caller
// param onFinalize: will be callback when this Sync Operation finalized.
// return a Sync id. It will return a positive value if failed,
- int Sync(const std::vector<std::string> &devices, int mode,
- const std::function<void(const std::map<std::string, int> &)> &onComplete,
+ int Sync(const std::vector<DeviceSyncTarget> &syncTargets, int mode,
+ const std::function<void(const std::map<DeviceSyncTarget, int> &)> &onComplete,
const std::function<void(void)> &onFinalize, bool wait) override;
// Sync function. use SyncParma to reduce parameter.
@@ -160,7 +160,7 @@ protected:
virtual int SyncConditionCheck(const SyncParma &param, const ISyncEngine *engine, ISyncInterface *storage) const;
// Check if the devices arg is valid
- bool IsValidDevices(const std::vector<std::string> &devices) const;
+ bool IsValidDevices(const std::vector<DeviceSyncTarget> &syncTargets) const;
// Used Clear all SyncOperations.
// isClosedOperation is false while userChanged
@@ -185,6 +185,8 @@ protected:
std::string GetSyncDevicesStr(const std::vector<std::string> &devices) const;
+ std::string GetSyncTargetsStr(const std::vector<DeviceSyncTarget> &syncTargets) const;
+
void InitSyncOperation(SyncOperation *operation, const SyncParma &param);
int StatusCheck() const;
diff --git a/frameworks/libs/distributeddb/syncer/src/device/isync_engine.h b/frameworks/libs/distributeddb/syncer/src/device/isync_engine.h
index 3859a900d..db7d856a4 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/isync_engine.h
+++ b/frameworks/libs/distributeddb/syncer/src/device/isync_engine.h
@@ -78,7 +78,7 @@ public:
virtual void StopAutoSubscribeTimer() = 0;
// Check if number of subscriptions out of limit
- virtual int SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const = 0;
+ virtual int SubscribeLimitCheck(const std::vector<DeviceSyncTarget> &syncTargets, QuerySyncObject &query) const = 0;
// Check if the Sync Engine is active, some times synchronization is not allowed
virtual bool IsEngineActive() const = 0;
diff --git a/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h b/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h
index 91ed88936..cb896f707 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h
+++ b/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h
@@ -75,6 +75,8 @@ public:
// Get the current task deviceId.
virtual std::string GetDeviceId() const = 0;
+ // Get the current task userId.
+ virtual std::string GetUserId() const = 0;
virtual void SetTaskExecStatus(int status) = 0;
diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_data_sync_utils.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_data_sync_utils.cpp
index 986d841b9..31de1ef74 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_data_sync_utils.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_data_sync_utils.cpp
@@ -623,10 +623,11 @@ void SingleVerDataSyncUtils::UpdateSyncProcess(SingleVerSyncTaskContext *context
packet->GetTotalDataCount(), dataSize);
if (packet->GetMode() == SyncModeType::PUSH || packet->GetMode() == SyncModeType::QUERY_PUSH) {
// save total count to sync process
+ DeviceSyncTarget syncTarget = {context->GetDeviceId(), context->GetUserId()};
if (packet->GetTotalDataCount() > 0) {
- context->SetOperationSyncProcessTotal(context->GetDeviceId(), packet->GetTotalDataCount());
+ context->SetOperationSyncProcessTotal(syncTarget, packet->GetTotalDataCount());
}
- context->UpdateOperationFinishedCount(context->GetDeviceId(), static_cast<uint32_t>(dataSize));
+ context->UpdateOperationFinishedCount(syncTarget, static_cast<uint32_t>(dataSize));
}
}
diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_kv_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_kv_syncer.cpp
index 31ac7c4b1..3359745bc 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_kv_syncer.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_kv_syncer.cpp
@@ -59,7 +59,11 @@ void SingleVerKVSyncer::EnableAutoSync(bool enable)
LOGI("[Syncer] EnableAutoSync no online devices");
return;
}
- int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
+ std::vector<DeviceSyncTarget> syncTargets;
+ for (const auto &device : devices) {
+ syncTargets.push_back({device, ""});
+ }
+ int errCode = Sync(syncTargets, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
if (errCode != E_OK) {
LOGE("[Syncer] sync start by EnableAutoSync failed err %d", errCode);
}
@@ -134,11 +138,11 @@ void SingleVerKVSyncer::RemoteDataChanged(const std::string &device)
RefObject::IncObjRef(syncEngine_);
syncInterface_->IncRefCount();
int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, userId, appId, storeId, device] {
- std::vector<std::string> devices;
- devices.push_back(device);
+ std::vector<DeviceSyncTarget> syncTargets;
+ syncTargets.push_back({device, ""});
int errCode = E_OK;
if (RuntimeContext::GetInstance()->IsNeedAutoSync(userId, appId, storeId, device)) {
- errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
+ errCode = Sync(syncTargets, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
}
if (errCode != E_OK) {
LOGE("[SingleVerKVSyncer] sync start by RemoteDataChanged failed err %d", errCode);
@@ -186,11 +190,11 @@ int SingleVerKVSyncer::SyncConditionCheck(const SyncParma &param, const ISyncEng
LOGE("[SingleVerKVSyncer] subscribe query not support limit,offset or orderby");
return -E_NOT_SUPPORT;
}
- if (param.devices.size() > MAX_DEVICES_NUM) {
+ if (param.syncTargets.size() > MAX_DEVICES_NUM) {
LOGE("[SingleVerKVSyncer] devices is overlimit");
return -E_MAX_LIMITS;
}
- return engine->SubscribeLimitCheck(param.devices, query);
+ return engine->SubscribeLimitCheck(param.syncTargets, query);
}
void SingleVerKVSyncer::TriggerSubscribe(const std::string &device, const QuerySyncObject &query)
@@ -204,7 +208,11 @@ void SingleVerKVSyncer::TriggerSubscribe(const std::string &device, const QueryS
std::vector<std::string> devices;
devices.push_back(device);
SyncParma param;
- param.devices = devices;
+ std::vector<DeviceSyncTarget> syncTargets;
+ for (const auto &device : devices) {
+ syncTargets.push_back({device, ""});
+ }
+ param.syncTargets = syncTargets;
param.mode = SyncModeType::AUTO_SUBSCRIBE_QUERY;
param.onComplete = nullptr;
param.onFinalize = nullptr;
@@ -233,7 +241,11 @@ bool SingleVerKVSyncer::TryFullSync(const std::vector<std::string> &devices)
LOGD("[Syncer] autoSync no enable");
return false;
}
- int errCode = Sync(devices, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
+ std::vector<DeviceSyncTarget> syncTargets;
+ for (const auto &device : devices) {
+ syncTargets.push_back({device, ""});
+ }
+ int errCode = Sync(syncTargets, SyncModeType::AUTO_PUSH, nullptr, nullptr, false);
if (errCode != E_OK) {
LOGE("[Syncer] sync start by RemoteDataChanged failed err %d", errCode);
return false;
diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.cpp
index e392bb459..df639fc74 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.cpp
@@ -83,7 +83,7 @@ int SingleVerRelationalSyncer::GenerateEachSyncTask(const SyncParma &param, uint
LOGI("[SingleVerRelationalSyncer] SubSyncId %" PRIu32 " create by SyncId %" PRIu32 ", hashTableName = %s",
subSyncId, syncId, STR_MASK(DBCommon::TransferStringToHex(hashTableName)));
subParam.syncQuery = table;
- subParam.onComplete = [this, subSyncId, syncId, subParam](const std::map<std::string, int> &devicesMap) {
+ subParam.onComplete = [this, subSyncId, syncId, subParam](const std::map<DeviceSyncTarget, int> &devicesMap) {
DoOnSubSyncComplete(subSyncId, syncId, subParam, devicesMap);
};
{
@@ -103,7 +103,7 @@ int SingleVerRelationalSyncer::GenerateEachSyncTask(const SyncParma &param, uint
}
void SingleVerRelationalSyncer::DoOnSubSyncComplete(const uint32_t subSyncId, const uint32_t syncId,
- const SyncParma &param, const std::map<std::string, int> &devicesMap)
+ const SyncParma &param, const std::map<DeviceSyncTarget, int> &devicesMap)
{
bool allFinish = true;
{
@@ -138,8 +138,8 @@ void SingleVerRelationalSyncer::DoOnComplete(const SyncParma &param, uint32_t sy
if (!param.relationOnComplete) {
return;
}
- std::map<std::string, std::vector<TableStatus>> syncRes;
- std::map<std::string, std::vector<TableStatus>> tmpMap;
+ std::map<DeviceSyncTarget, std::vector<TableStatus>> syncRes;
+ std::map<DeviceSyncTarget, std::vector<TableStatus>> tmpMap;
{
std::lock_guard<std::mutex> lockGuard(syncMapLock_);
tmpMap = resMap_[syncId];
diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.h b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.h
index df637af8c..a67f4fe2a 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.h
+++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.h
@@ -46,7 +46,7 @@ private:
void DoOnComplete(const SyncParma &param, uint32_t syncId);
void DoOnSubSyncComplete(const uint32_t subSyncId, const uint32_t syncId,
- const SyncParma &param, const std::map<std::string, int> &devicesMap);
+ const SyncParma &param, const std::map<DeviceSyncTarget, int> &devicesMap);
void SchemaChangeCallback();
@@ -56,7 +56,7 @@ private:
mutable std::mutex syncMapLock_;
std::map<uint32_t, std::set<uint32_t>> fullSyncIdMap_;
- std::map<uint32_t, std::map<std::string, std::vector<TableStatus>>> resMap_;
+ std::map<uint32_t, std::map<DeviceSyncTarget, std::vector<TableStatus>>> resMap_;
};
}
#endif
diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_engine.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_engine.cpp
index c5254c5af..a78f9b3e0 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_engine.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_engine.cpp
@@ -52,10 +52,12 @@ void SingleVerSyncEngine::EnableClearRemoteStaleData(bool enable)
LOGI("[SingleVerSyncEngine][EnableClearRemoteStaleData] enabled %d", enable);
needClearRemoteStaleData_ = enable;
std::unique_lock<std::mutex> lock(contextMapLock_);
- for (auto &iter : syncTaskContextMap_) {
- auto context = static_cast<SingleVerSyncTaskContext *>(iter.second);
- if (context != nullptr) { // LCOV_EXCL_BR_LINE
- context->EnableClearRemoteStaleData(enable);
+ for (auto &deviceIter : syncTaskContextMap_) {
+ for (auto &userIdIter : deviceIter.second) {
+ auto context = static_cast<SingleVerSyncTaskContext *>(userIdIter.second);
+ if (context != nullptr) { // LCOV_EXCL_BR_LINE
+ context->EnableClearRemoteStaleData(enable);
+ }
}
}
}
diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp
index 9ef639894..f4591bc94 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp
@@ -117,7 +117,7 @@ int SingleVerSyncTaskContext::AddSyncOperation(SyncOperation *operation)
});
if (iter != requestTargetQueue_.end()) {
static_cast<SingleVerSyncTarget *>(*iter)->SetEndWaterMark(timeHelper_->GetTime());
- operation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
+ operation->SetStatus({deviceId_, userId_}, SyncOperation::OP_FINISHED_ALL);
return E_OK;
}
}
@@ -231,7 +231,7 @@ void SingleVerSyncTaskContext::Abort(int status)
{
std::lock_guard<std::mutex> lock(operationLock_);
if (syncOperation_ != nullptr) {
- syncOperation_->SetStatus(deviceId_, status, GetCommErrCode());
+ syncOperation_->SetStatus({deviceId_, userId_}, status, GetCommErrCode());
if ((status >= SyncOperation::OP_FINISHED_ALL)) {
UnlockObj();
if (syncOperation_->CheckIsAllFinished()) {
@@ -279,11 +279,8 @@ void SingleVerSyncTaskContext::ClearAllSyncTask()
continue; // not exit this scene
}
LOGI("[SingleVerSyncTaskContext] killing syncId=%d,dev=%s", tmpOperation->GetSyncId(), STR_MASK(deviceId_));
- if (target->IsAutoSync()) {
- tmpOperation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
- } else {
- tmpOperation->SetStatus(deviceId_, SyncOperation::OP_COMM_ABNORMAL);
- }
+ int status = target->IsAutoSync() ? SyncOperation::OP_FINISHED_ALL : SyncOperation::OP_COMM_ABNORMAL;
+ tmpOperation->SetStatus({deviceId_, userId_}, status);
if (tmpOperation->CheckIsAllFinished()) {
tmpOperation->Finished();
}
@@ -599,19 +596,19 @@ void SingleVerSyncTaskContext::StopFeedDogForGetData()
stateMachine_->StopFeedDogForGetData();
}
-void SingleVerSyncTaskContext::UpdateOperationFinishedCount(const std::string &deviceId, uint32_t count)
+void SingleVerSyncTaskContext::UpdateOperationFinishedCount(const DeviceSyncTarget &syncTarget, uint32_t count)
{
std::lock_guard<std::mutex> lock(operationLock_);
if (syncOperation_ != nullptr) {
- syncOperation_->UpdateFinishedCount(deviceId, count);
+ syncOperation_->UpdateFinishedCount(syncTarget, count);
}
}
-void SingleVerSyncTaskContext::SetOperationSyncProcessTotal(const std::string &deviceId, uint32_t total)
+void SingleVerSyncTaskContext::SetOperationSyncProcessTotal(const DeviceSyncTarget &syncTarget, uint32_t total)
{
std::lock_guard<std::mutex> lock(operationLock_);
if (syncOperation_ != nullptr) {
- syncOperation_->SetSyncProcessTotal(deviceId, total);
+ syncOperation_->SetSyncProcessTotal(syncTarget, total);
}
}
diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.h b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.h
index 0cb30f516..c0d5a9c72 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.h
+++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.h
@@ -137,8 +137,8 @@ public:
void StartFeedDogForGetData(uint32_t sessionId);
void StopFeedDogForGetData();
- void UpdateOperationFinishedCount(const std::string &deviceId, uint32_t count);
- void SetOperationSyncProcessTotal(const std::string &deviceId, uint32_t total);
+ void UpdateOperationFinishedCount(const DeviceSyncTarget &syncTarget, uint32_t count);
+ void SetOperationSyncProcessTotal(const DeviceSyncTarget &syncTarget, uint32_t total);
void SetInitWaterMark(WaterMark waterMark);
WaterMark GetInitWaterMark() const;
diff --git a/frameworks/libs/distributeddb/syncer/src/device/subscribe_manager.cpp b/frameworks/libs/distributeddb/syncer/src/device/subscribe_manager.cpp
index eb6db37e5..635e8b9b4 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/subscribe_manager.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/subscribe_manager.cpp
@@ -201,12 +201,12 @@ void SubscribeManager::GetRemoteSubscribeQueryIds(const std::string &device,
}
}
-int SubscribeManager::LocalSubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
+int SubscribeManager::LocalSubscribeLimitCheck(const std::vector<DeviceSyncTarget> &syncTargets, QuerySyncObject &query) const
{
std::shared_lock<std::shared_mutex> lock(localSubscribeMapLock_);
size_t devNum = localSubscribeMap_.size();
- for (const auto &device : devices) {
- if (localSubscribeMap_.find(device) != localSubscribeMap_.end()) {
+ for (const auto &syncTarget : syncTargets) {
+ if (localSubscribeMap_.find(syncTarget.device) != localSubscribeMap_.end()) {
continue;
}
devNum++;
diff --git a/frameworks/libs/distributeddb/syncer/src/device/subscribe_manager.h b/frameworks/libs/distributeddb/syncer/src/device/subscribe_manager.h
index bfbbfcd10..428a3df1b 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/subscribe_manager.h
+++ b/frameworks/libs/distributeddb/syncer/src/device/subscribe_manager.h
@@ -84,7 +84,7 @@ public:
bool IsLastRemoteContainSubscribe(const std::string &device, const std::string &queryId) const;
- int LocalSubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const;
+ int LocalSubscribeLimitCheck(const std::vector<DeviceSyncTarget> &syncTargets, QuerySyncObject &query) const;
bool IsQueryExistSubscribe(const std::string &queryId) const;
private:
diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp
index 5d4a60e55..e5ef1457f 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp
@@ -115,9 +115,11 @@ int SyncEngine::Close()
// Clear SyncContexts
{
std::unique_lock<std::mutex> lock(contextMapLock_);
- for (auto &iter : syncTaskContextMap_) {
- decContext.push_back(iter.second);
- iter.second = nullptr;
+ for (auto &deviceIter : syncTaskContextMap_) {
+ for (auto &userIdIter : deviceIter.second) {
+ decContext.push_back(userIdIter.second);
+ userIdIter.second = nullptr;
+ }
}
syncTaskContextMap_.clear();
}
@@ -163,22 +165,53 @@ int SyncEngine::AddSyncOperation(SyncOperation *operation)
return -E_INVALID_ARGS;
}
- std::vector<std::string> devices = operation->GetDevices();
+ if (operation->GetSyncTargets().empty()) {
+ return AddSyncOperationForDevices(operation);
+ }
+ return AddSyncOperationForSyncTargets(operation);
+}
+
+int SyncEngine::AddSyncOperationForDevices(SyncOperation *operation)
+{
+ std::vector<DeviceSyncTarget> syncTargets = operation->GetSyncTargets();
std::string localDeviceId;
int errCode = GetLocalDeviceId(localDeviceId);
- for (const auto &deviceId : devices) {
+ for (const auto &syncTarget : syncTargets) {
if (errCode != E_OK) {
- operation->SetStatus(deviceId, errCode == -E_BUSY ?
+ operation->SetStatus(syncTarget, errCode == -E_BUSY ?
SyncOperation::OP_BUSY_FAILURE : SyncOperation::OP_FAILED);
continue;
}
- if (!CheckDeviceIdValid(deviceId, localDeviceId)) {
- operation->SetStatus(deviceId, SyncOperation::OP_INVALID_ARGS);
+ if (!CheckDeviceIdValid(syncTarget.device, localDeviceId)) {
+ operation->SetStatus(syncTarget, SyncOperation::OP_INVALID_ARGS);
continue;
}
- operation->SetStatus(deviceId, SyncOperation::OP_WAITING);
- if (AddSyncOperForContext(deviceId, operation) != E_OK) {
- operation->SetStatus(deviceId, SyncOperation::OP_FAILED);
+ operation->SetStatus(syncTarget, SyncOperation::OP_WAITING);
+ if (AddSyncOperForContext(syncTarget.device, operation, syncTarget.userId) != E_OK) {
+ operation->SetStatus(syncTarget, SyncOperation::OP_FAILED);
+ }
+ }
+ return E_OK;
+}
+
+int SyncEngine::AddSyncOperationForSyncTargets(SyncOperation *operation)
+{
+ std::vector<DeviceSyncTarget> syncTargets = operation->GetSyncTargets();
+ std::string localDeviceId;
+ int errCode = GetLocalDeviceId(localDeviceId);
+ for (const auto &syncTarget : syncTargets) {
+ if (errCode != E_OK) {
+ operation->SetStatus(syncTarget, errCode == -E_BUSY ?
+ SyncOperation::OP_BUSY_FAILURE : SyncOperation::OP_FAILED);
+ continue;
+ }
+ if (!CheckDeviceIdValid(syncTarget.device, localDeviceId)) {
+ operation->SetStatus(syncTarget, SyncOperation::OP_INVALID_ARGS);
+ continue;
+ }
+ operation->SetStatus(syncTarget, SyncOperation::OP_WAITING);
+ if (AddSyncOperForContext(syncTarget.device, operation, syncTarget.userId) != E_OK) {
+ operation->SetStatus(syncTarget, SyncOperation::OP_FAILED);
}
}
return E_OK;
@@ -187,10 +220,12 @@ int SyncEngine::AddSyncOperation(SyncOperation *operation)
void SyncEngine::RemoveSyncOperation(int syncId)
{
std::lock_guard<std::mutex> lock(contextMapLock_);
- for (auto &iter : syncTaskContextMap_) {
- ISyncTaskContext *context = iter.second;
- if (context != nullptr) {
- context->RemoveSyncOperation(syncId);
+ for (auto &deviceIter : syncTaskContextMap_) {
+ for (auto &userIdIter : deviceIter.second) {
+ ISyncTaskContext *context = userIdIter.second;
+ if (context != nullptr) {
+ context->RemoveSyncOperation(syncId);
+ }
}
}
}
@@ -297,7 +332,8 @@ int SyncEngine::InitComunicator(const ISyncInterface *syncInterface)
}
errCode = communicator_->RegOnMessageCallback(
- [this](const std::string &targetDev, Message *inMsg) { MessageReciveCallback(targetDev, inMsg); }, []() {});
+ [this](const std::string &targetDev, const std::string &userId, Message *inMsg)
+ { MessageReciveCallback(targetDev, userId, inMsg); }, []() {});
if (errCode != E_OK) {
LOGE("[SyncEngine] SyncRequestCallback register failed! err = %d", errCode);
communicatorAggregator->ReleaseCommunicator(communicator_, GetUserId(syncInterface));
@@ -320,16 +356,16 @@ int SyncEngine::InitComunicator(const ISyncInterface *syncInterface)
return errCode;
}
-int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation)
+int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation, const std::string &userId)
{
int errCode = E_OK;
ISyncTaskContext *context = nullptr;
{
std::lock_guard<std::mutex> lock(contextMapLock_);
- context = FindSyncTaskContext(deviceId);
+ context = FindSyncTaskContext(deviceId, userId);
if (context == nullptr) {
if (!IsKilled()) {
- context = GetSyncTaskContext(deviceId, errCode);
+ context = GetSyncTaskContext(deviceId, userId, errCode);
}
if (context == nullptr) {
return errCode;
@@ -350,8 +386,8 @@ int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation
return errCode;
}
-void SyncEngine::MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator,
- Message *inMsg)
+void SyncEngine::MessageReciveCallbackTask(ISyncTaskContext *context, const std::string &userId,
+ const ICommunicator *communicator, Message *inMsg)
{
std::string deviceId = context->GetDeviceId();
@@ -372,10 +408,11 @@ void SyncEngine::MessageReciveCallbackTask(ISyncTaskContext *context, const ICom
delete inMsg;
inMsg = nullptr;
MSG_CALLBACK_OUT_NOT_DEL:
- ScheduleTaskOut(context, communicator);
+ ScheduleTaskOut(context, userId, communicator);
}
-void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg)
+void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const std::string &userId,
+ const ICommunicator *communicator, Message *inMsg)
{
std::string deviceId = context->GetDeviceId();
if (onRemoteDataChanged_ && deviceManager_->IsDeviceOnline(deviceId)) {
@@ -385,18 +422,18 @@ void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const ICommuni
}
delete inMsg;
inMsg = nullptr;
- ScheduleTaskOut(context, communicator);
+ ScheduleTaskOut(context, userId, communicator);
}
-void SyncEngine::ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator)
+void SyncEngine::ScheduleTaskOut(ISyncTaskContext *context, const std::string &userId, const ICommunicator *communicator)
{
- (void)DealMsgUtilQueueEmpty();
+ (void)DealMsgUtilQueueEmpty(userId);
DecExecTaskCount();
RefObject::DecObjRef(communicator);
RefObject::DecObjRef(context);
}
-int SyncEngine::DealMsgUtilQueueEmpty()
+int SyncEngine::DealMsgUtilQueueEmpty(const std::string &userId)
{
if (!isActive_) {
return -E_BUSY; // db is closing just return
@@ -417,11 +454,11 @@ int SyncEngine::DealMsgUtilQueueEmpty()
// it will deal with the first message in queue, we should increase object reference counts and sure that resources
// could be prevented from destroying by other threads.
do {
- ISyncTaskContext *nextContext = GetContextForMsg(inMsg->GetTarget(), errCode);
+ ISyncTaskContext *nextContext = GetContextForMsg(inMsg->GetTarget(), userId, errCode);
if (errCode != E_OK) {
break;
}
- errCode = ScheduleDealMsg(nextContext, inMsg);
+ errCode = ScheduleDealMsg(nextContext, userId, inMsg);
if (errCode != E_OK) {
RefObject::DecObjRef(nextContext);
}
@@ -434,12 +471,12 @@ int SyncEngine::DealMsgUtilQueueEmpty()
return errCode;
}
-ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, int &errCode)
+ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, const std::string &userId, int &errCode)
{
ISyncTaskContext *context = nullptr;
{
std::lock_guard<std::mutex> lock(contextMapLock_);
- context = FindSyncTaskContext(targetDev);
+ context = FindSyncTaskContext(targetDev, userId);
if (context != nullptr) { // LCOV_EXCL_BR_LINE
if (context->IsKilled()) {
errCode = -E_OBJ_IS_KILLED;
@@ -450,7 +487,7 @@ ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, int
errCode = -E_OBJ_IS_KILLED;
return nullptr;
}
- context = GetSyncTaskContext(targetDev, errCode);
+ context = GetSyncTaskContext(targetDev, userId, errCode);
if (context == nullptr) {
return nullptr;
}
@@ -461,7 +498,7 @@ ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, int
return context;
}
-int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg)
+int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, const std::string &userId, Message *inMsg)
{
if (inMsg == nullptr) {
LOGE("[SyncEngine] MessageReciveCallback inMsg is null!");
@@ -477,10 +514,11 @@ int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg)
int errCode = E_OK;
// deal remote local data changed message
if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
- RemoteDataChangedTask(context, comProxy, inMsg);
+ RemoteDataChangedTask(context, userId, comProxy, inMsg);
} else {
errCode = RuntimeContext::GetInstance()->ScheduleTask(
- [this, context, comProxy, inMsg] { MessageReciveCallbackTask(context, comProxy, inMsg); });
+ [this, context, userId, comProxy, inMsg]
+ { MessageReciveCallbackTask(context, userId, comProxy, inMsg); });
}
if (errCode != E_OK) {
@@ -490,10 +528,10 @@ int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg)
return errCode;
}
-void SyncEngine::MessageReciveCallback(const std::string &targetDev, Message *inMsg)
+void SyncEngine::MessageReciveCallback(const std::string &targetDev, const std::string &userId, Message *inMsg)
{
IncExecTaskCount();
- int errCode = MessageReciveCallbackInner(targetDev, inMsg);
+ int errCode = MessageReciveCallbackInner(targetDev, userId, inMsg);
if (errCode != E_OK) {
if (inMsg != nullptr) {
delete inMsg;
@@ -504,7 +542,7 @@ void SyncEngine::MessageReciveCallback(const std::string &targetDev, Message *in
}
}
-int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg)
+int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, const std::string &userId, Message *inMsg)
{
if (targetDev.empty() || inMsg == nullptr) {
LOGE("[SyncEngine][MessageReciveCallback] from a invalid device or inMsg is null ");
@@ -544,12 +582,12 @@ int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message
}
int errCode = E_OK;
- ISyncTaskContext *nextContext = GetContextForMsg(targetDev, errCode);
+ ISyncTaskContext *nextContext = GetContextForMsg(targetDev, userId, errCode);
if (errCode != E_OK) {
return errCode;
}
LOGD("[SyncEngine] MessageReciveCallback MSG ID = %d", inMsg->GetMessageId());
- return ScheduleDealMsg(nextContext, inMsg);
+ return ScheduleDealMsg(nextContext, userId, inMsg);
}
void SyncEngine::PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize)
@@ -598,34 +636,41 @@ int SyncEngine::GetMsgSize(const Message *inMsg) const
}
}
-ISyncTaskContext *SyncEngine::FindSyncTaskContext(const std::string &deviceId)
+ISyncTaskContext *SyncEngine::FindSyncTaskContext(const std::string &deviceId, const std::string &userId)
{
- auto iter = syncTaskContextMap_.find(deviceId);
- if (iter != syncTaskContextMap_.end()) {
- ISyncTaskContext *context = iter->second;
- return context;
+ auto deviceIter = syncTaskContextMap_.find(deviceId);
+ if (deviceIter != syncTaskContextMap_.end()) {
+ auto userIdIter = deviceIter->second.find(userId);
+ if (userIdIter != deviceIter->second.end()) {
+ ISyncTaskContext *context = userIdIter->second;
+ return context;
+ }
}
return nullptr;
}
-ISyncTaskContext *SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId)
+std::map<std::string, ISyncTaskContext *> SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId)
{
- ISyncTaskContext *context = nullptr;
+ std::map<std::string, ISyncTaskContext *> contexts;
std::lock_guard<std::mutex> lock(contextMapLock_);
- context = FindSyncTaskContext(deviceId);
- if (context == nullptr) {
+ if (syncTaskContextMap_.find(deviceId) == syncTaskContextMap_.end()) {
LOGI("[SyncEngine] dev=%s, context is null, no need to clear sync operation", STR_MASK(deviceId));
- return nullptr;
+ return contexts;
}
- if (context->IsKilled()) { // LCOV_EXCL_BR_LINE
- LOGI("[SyncEngine] context is killing");
- return nullptr;
+ for (const auto &context : syncTaskContextMap_[deviceId]) {
+ if (context.second == nullptr) {
+ continue;
+ }
+ if (context.second->IsKilled()) {
+ LOGI("[SyncEngine] context[%s] is killing", context.first.c_str());
+ }
+ RefObject::IncObjRef(context.second);
+ contexts[context.first] = context.second;
}
- RefObject::IncObjRef(context);
- return context;
+ return contexts;
}
-ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, int &errCode)
+ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, const std::string &userId, int &errCode)
{
auto storage = GetAndIncSyncInterface();
if (storage == nullptr) {
@@ -647,7 +692,7 @@ ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, in
context = nullptr;
return nullptr;
}
- syncTaskContextMap_.insert(std::pair<std::string, ISyncTaskContext *>(deviceId, context));
+ syncTaskContextMap_[deviceId].insert(std::pair<std::string, ISyncTaskContext *>(userId, context));
// IncRef for SyncEngine to make sure SyncEngine is valid when context access
RefObject::IncObjRef(this);
context->OnLastRef([this, deviceId, storage]() {
@@ -759,10 +804,12 @@ void SyncEngine::SetSyncRetry(bool isRetry)
isSyncRetry_ = isRetry;
LOGI("[SyncEngine] SetSyncRetry:%d ok", isRetry);
std::lock_guard<std::mutex> lock(contextMapLock_);
- for (auto &iter : syncTaskContextMap_) {
- ISyncTaskContext *context = iter.second;
- if (context != nullptr) { // LCOV_EXCL_BR_LINE
- context->SetSyncRetry(isRetry);
+ for (auto &deviceIter : syncTaskContextMap_) {
+ for (auto &userIdEntry : deviceIter.second) {
+ ISyncTaskContext *context = userIdEntry.second;
+ if (context != nullptr) { // LCOV_EXCL_BR_LINE
+ context->SetSyncRetry(isRetry);
+ }
}
}
}
@@ -860,7 +907,7 @@ void SyncEngine::OfflineHandleByDevice(const std::string &deviceId, ISyncInterfa
static_cast<SyncGenericInterface *>(storage)->GetDBInfo(dbInfo);
RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, deviceId);
// get context and Inc context if context is not nullptr
- ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId);
+ std::map<std::string, ISyncTaskContext *> contexts = GetSyncTaskContextAndInc(deviceId);
{
std::lock_guard<std::mutex> lock(communicatorProxyLock_);
if (communicatorProxy_ == nullptr) {
@@ -868,25 +915,31 @@ void SyncEngine::OfflineHandleByDevice(const std::string &deviceId, ISyncInterfa
}
if (communicatorProxy_->IsDeviceOnline(deviceId)) { // LCOV_EXCL_BR_LINE
LOGI("[SyncEngine] target dev=%s is online, no need to clear task.", STR_MASK(deviceId));
- RefObject::DecObjRef(context);
+ for (const auto &context : contexts) {
+ RefObject::DecObjRef(context.second);
+ }
return;
}
}
// means device is offline, clear local subscribe
subManager_->ClearLocalSubscribeQuery(deviceId);
// clear sync task
- if (context != nullptr) {
- context->ClearAllSyncTask();
- RefObject::DecObjRef(context);
+ for (const auto &context : contexts) {
+ if (context.second != nullptr) {
+ context.second->ClearAllSyncTask();
+ RefObject::DecObjRef(context.second);
+ }
}
}
void SyncEngine::ClearAllSyncTaskByDevice(const std::string &deviceId)
{
- ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId);
- if (context != nullptr) {
- context->ClearAllSyncTask();
- RefObject::DecObjRef(context);
+ std::map<std::string, ISyncTaskContext *> contexts = GetSyncTaskContextAndInc(deviceId);
+ for (const auto &context : contexts) {
+ if (context.second != nullptr) {
+ context.second->ClearAllSyncTask();
+ RefObject::DecObjRef(context.second);
+ }
}
}
@@ -932,7 +985,8 @@ ICommunicator *SyncEngine::AllocCommunicator(const std::string &identifier, int
}
errCode = communicator->RegOnMessageCallback(
- [this](const std::string &targetDev, Message *inMsg) { MessageReciveCallback(targetDev, inMsg); }, []() {});
+ [this](const std::string &targetDev, const std::string &userId, Message *inMsg)
+ { MessageReciveCallback(targetDev, userId, inMsg); }, []() {});
if (errCode != E_OK) {
LOGE("[SyncEngine] SyncRequestCallback register failed in SetEqualIdentifier! err = %d", errCode);
communicatorAggregator->ReleaseCommunicator(communicator, userId);
@@ -1030,9 +1084,9 @@ void SyncEngine::StopAutoSubscribeTimer()
{
}
-int SyncEngine::SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const
+int SyncEngine::SubscribeLimitCheck(const std::vector<DeviceSyncTarget> &syncTargets, QuerySyncObject &query) const
{
- return subManager_->LocalSubscribeLimitCheck(devices, query);
+ return subManager_->LocalSubscribeLimitCheck(syncTargets, query);
}
@@ -1065,13 +1119,15 @@ void SyncEngine::SchemaChange()
std::vector<ISyncTaskContext *> tmpContextVec;
{
std::lock_guard<std::mutex> lock(contextMapLock_);
- for (const auto &entry : syncTaskContextMap_) { // LCOV_EXCL_BR_LINE
- auto context = entry.second;
- if (context == nullptr || context->IsKilled()) {
- continue;
+ for (const auto &deviceEntry : syncTaskContextMap_) { // LCOV_EXCL_BR_LINE
+ for (const auto &userIdEntry : deviceEntry.second) {
+ auto context = userIdEntry.second;
+ if (context == nullptr || context->IsKilled()) {
+ continue;
+ }
+ RefObject::IncObjRef(context);
+ tmpContextVec.push_back(context);
}
- RefObject::IncObjRef(context);
- tmpContextVec.push_back(context);
}
}
for (const auto &entryContext : tmpContextVec) {
@@ -1112,9 +1168,11 @@ void SyncEngine::Dump(int fd)
DBDumpHelper::Dump(fd, "\t]\n\tcontext info [\n");
// dump context info
std::lock_guard<std::mutex> autoLock(contextMapLock_);
- for (const auto &entry : syncTaskContextMap_) {
- if (entry.second != nullptr) {
- entry.second->Dump(fd);
+ for (const auto &deviceEntry : syncTaskContextMap_) {
+ for (const auto &userIdEntry : deviceEntry.second) {
+ if (userIdEntry.second != nullptr) {
+ userIdEntry.second->Dump(fd);
+ }
}
}
DBDumpHelper::Dump(fd, "\t]\n\n");
@@ -1198,17 +1256,19 @@ void SyncEngine::AbortMachineIfNeed(uint32_t syncId)
std::vector<ISyncTaskContext *> abortContexts;
{
std::lock_guard<std::mutex> lock(contextMapLock_);
- for (const auto &entry : syncTaskContextMap_) {
- auto context = entry.second;
- if (context == nullptr || context->IsKilled()) { // LCOV_EXCL_BR_LINE
- continue;
- }
- RefObject::IncObjRef(context);
- if (context->GetSyncId() == syncId) {
+ for (const auto &deviceEntry : syncTaskContextMap_) {
+ for (const auto &userIdEntry : deviceEntry.second) {
+ auto context = userIdEntry.second;
+ if (context == nullptr || context->IsKilled()) { // LCOV_EXCL_BR_LINE
+ continue;
+ }
RefObject::IncObjRef(context);
- abortContexts.push_back(context);
+ if (context->GetSyncId() == syncId) {
+ RefObject::IncObjRef(context);
+ abortContexts.push_back(context);
+ }
+ RefObject::DecObjRef(context);
}
- RefObject::DecObjRef(context);
}
}
for (const auto &abortContext : abortContexts) {
@@ -1276,9 +1336,11 @@ void SyncEngine::TimeChange()
{
// copy context
std::lock_guard<std::mutex> lock(contextMapLock_);
- for (const auto &iter : syncTaskContextMap_) {
- RefObject::IncObjRef(iter.second);
- decContext.push_back(iter.second);
+ for (const auto &deviceIter : syncTaskContextMap_) {
+ for (const auto &userIdIter : deviceIter.second) {
+ RefObject::IncObjRef(userIdIter.second);
+ decContext.push_back(userIdIter.second);
+ }
}
}
for (auto &iter : decContext) {
@@ -1293,9 +1355,11 @@ int32_t SyncEngine::GetResponseTaskCount()
{
// copy context
std::lock_guard<std::mutex> lock(contextMapLock_);
- for (const auto &iter : syncTaskContextMap_) {
- RefObject::IncObjRef(iter.second);
- decContext.push_back(iter.second);
+ for (const auto &deviceIter : syncTaskContextMap_) {
+ for (const auto &userIdIter : deviceIter.second) {
+ RefObject::IncObjRef(userIdIter.second);
+ decContext.push_back(userIdIter.second);
+ }
}
}
int32_t taskCount = 0;
diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h
index 78c931922..b1696900d 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h
+++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h
@@ -114,7 +114,7 @@ public:
// used by SingleVerSyncer when remote/local db closed
void StopAutoSubscribeTimer() override;
- int SubscribeLimitCheck(const std::vector<std::string> &devices, QuerySyncObject &query) const override;
+ int SubscribeLimitCheck(const std::vector<DeviceSyncTarget> &syncTargets, QuerySyncObject &query) const override;
bool IsEngineActive() const override;
@@ -142,8 +142,8 @@ protected:
virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0;
// Find SyncTaskContext from the map
- ISyncTaskContext *FindSyncTaskContext(const std::string &deviceId);
- ISyncTaskContext *GetSyncTaskContextAndInc(const std::string &deviceId);
+ ISyncTaskContext *FindSyncTaskContext(const std::string &deviceId, const std::string &userId);
+ std::map<std::string, ISyncTaskContext *> GetSyncTaskContextAndInc(const std::string &deviceId);
void GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
void GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
@@ -151,12 +151,15 @@ protected:
ISyncInterface *GetAndIncSyncInterface();
void SetSyncInterface(ISyncInterface *syncInterface);
- ISyncTaskContext *GetSyncTaskContext(const std::string &deviceId, int &errCode);
+ ISyncTaskContext *GetSyncTaskContext(const std::string &deviceId, const std::string &userId, int &errCode);
+
+ int AddSyncOperationForDevices(SyncOperation *operation);
+ int AddSyncOperationForSyncTargets(SyncOperation *operation);
std::mutex storageMutex_;
ISyncInterface *syncInterface_;
// Used to store all send sync task infos (such as pull sync response, and push sync request)
- std::map<std::string, ISyncTaskContext *> syncTaskContextMap_;
+ std::map<std::string, std::map<std::string, ISyncTaskContext *>> syncTaskContextMap_;
std::mutex contextMapLock_;
std::shared_ptr<SubscribeManager> subManager_;
std::function<void(const InternalSyncParma &param)> queryAutoSyncCallback_;
@@ -171,20 +174,22 @@ private:
int InitComunicator(const ISyncInterface *syncInterface);
// Add the sync task info to the map.
- int AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation);
+ int AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation, const std::string &userId);
// Sync Request CallbackTask run at a sub thread.
- void MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg);
+ void MessageReciveCallbackTask(ISyncTaskContext *context, const std::string &userId,
+ const ICommunicator *communicator, Message *inMsg);
- void RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg);
+ void RemoteDataChangedTask(ISyncTaskContext *context, const std::string &userId, const ICommunicator *communicator,
+ Message *inMsg);
- void ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator);
+ void ScheduleTaskOut(ISyncTaskContext *context, const std::string &userId, const ICommunicator *communicator);
// wrapper of MessageReciveCallbackTask
- void MessageReciveCallback(const std::string &targetDev, Message *inMsg);
+ void MessageReciveCallback(const std::string &targetDev, const std::string &userId, Message *inMsg);
// Sync Request Callback
- int MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg);
+ int MessageReciveCallbackInner(const std::string &targetDev, const std::string &userId, Message *inMsg);
// Exec the given SyncTarget. and callback onComplete.
int ExecSyncTask(ISyncTaskContext *context);
@@ -196,12 +201,12 @@ private:
int GetMsgSize(const Message *inMsg) const;
// Do not run MessageReceiveCallbackTask until msgQueue is empty
- int DealMsgUtilQueueEmpty();
+ int DealMsgUtilQueueEmpty(const std::string &userId);
// Handle message in order.
- int ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg);
+ int ScheduleDealMsg(ISyncTaskContext *context, const std::string &userId, Message *inMsg);
- ISyncTaskContext *GetContextForMsg(const std::string &targetDev, int &errCode);
+ ISyncTaskContext *GetContextForMsg(const std::string &targetDev, const std::string &userId, int &errCode);
ICommunicator *AllocCommunicator(const std::string &identifier, int &errCode, std::string userId = "");
diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp
index 4fd93de50..aa48c934c 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp
@@ -121,7 +121,7 @@ void SyncTaskContext::SetOperationStatus(int status)
}
int finalStatus = status;
- int operationStatus = syncOperation_->GetStatus(deviceId_);
+ int operationStatus = syncOperation_->GetStatus({deviceId_, userId_});
if (status == SyncOperation::OP_SEND_FINISHED && operationStatus == SyncOperation::OP_RECV_FINISHED) {
if (GetTaskErrCode() == -E_EKEYREVOKED) { // LCOV_EXCL_BR_LINE
finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
@@ -135,7 +135,7 @@ void SyncTaskContext::SetOperationStatus(int status)
finalStatus = SyncOperation::OP_FINISHED_ALL;
}
}
- syncOperation_->SetStatus(deviceId_, finalStatus);
+ syncOperation_->SetStatus({deviceId_, userId_}, finalStatus);
if (finalStatus >= SyncOperation::OP_FINISHED_ALL) {
SaveLastPushTaskExecStatus(finalStatus);
}
@@ -215,7 +215,7 @@ int SyncTaskContext::GetOperationStatus() const
if (syncOperation_ == nullptr) {
return SyncOperation::OP_FINISHED_ALL;
}
- return syncOperation_->GetStatus(deviceId_);
+ return syncOperation_->GetStatus({deviceId_, userId_});
}
void SyncTaskContext::SetMode(int mode)
@@ -285,6 +285,12 @@ std::string SyncTaskContext::GetDeviceId() const
return deviceId_;
}
+// Get the current task deviceId.
+std::string SyncTaskContext::GetUserId() const
+{
+ return userId_;
+}
+
void SyncTaskContext::SetTaskExecStatus(int status)
{
taskExecStatus_ = status;
diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h
index 42d53e7dc..1ac06e362 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h
+++ b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h
@@ -77,6 +77,8 @@ public:
// Get the current task deviceId.
std::string GetDeviceId() const override;
+ // Get the current task userId.
+ std::string GetUserId() const override;
// Set the sync task queue exec status
void SetTaskExecStatus(int status) override;
@@ -260,6 +262,7 @@ protected:
volatile int status_;
volatile int taskExecStatus_;
std::string deviceId_;
+ std::string userId_;
std::string syncActionName_;
ISyncInterface *syncInterface_;
ICommunicator *communicator_;
diff --git a/frameworks/libs/distributeddb/syncer/src/device/syncer_proxy.cpp b/frameworks/libs/distributeddb/syncer/src/device/syncer_proxy.cpp
index 0c8d6fd62..facc73813 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/syncer_proxy.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/syncer_proxy.cpp
@@ -54,14 +54,14 @@ int SyncerProxy::Close(bool isClosedOperation)
return syncer_->Close(isClosedOperation);
}
-int SyncerProxy::Sync(const std::vector<std::string> &devices, int mode,
- const std::function<void(const std::map<std::string, int> &)> &onComplete,
+int SyncerProxy::Sync(const std::vector<DeviceSyncTarget> &syncTargets, int mode,
+ const std::function<void(const std::map<DeviceSyncTarget, int> &)> &onComplete,
const std::function<void(void)> &onFinalize, bool wait)
{
if (syncer_ == nullptr) { // LCOV_EXCL_BR_LINE
return -E_NOT_INIT;
}
- return syncer_->Sync(devices, mode, onComplete, onFinalize, wait);
+ return syncer_->Sync(syncTargets, mode, onComplete, onFinalize, wait);
}
int SyncerProxy::Sync(const SyncParma &parma, uint64_t connectionId)
diff --git a/frameworks/libs/distributeddb/syncer/src/sync_operation.cpp b/frameworks/libs/distributeddb/syncer/src/sync_operation.cpp
index 0e68d5fe6..73b3d662c 100644
--- a/frameworks/libs/distributeddb/syncer/src/sync_operation.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/sync_operation.cpp
@@ -20,9 +20,9 @@
#include "performance_analysis.h"
namespace DistributedDB {
-SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
- int mode, const UserCallback &userCallback, bool isBlockSync)
- : devices_(devices),
+SyncOperation::SyncOperation(uint32_t syncId, const std::vector<DeviceSyncTarget> &syncTargets, int mode,
+ const UserCallback &userCallback, bool isBlockSync)
+ : syncTargets_(syncTargets),
syncId_(syncId),
mode_(mode),
userCallback_(userCallback),
@@ -46,15 +46,15 @@ SyncOperation::~SyncOperation()
int SyncOperation::Initialize()
{
LOGD("[SyncOperation] Init SyncOperation id:%d.", syncId_);
- std::map<std::string, DeviceSyncProcess> tempSyncProcessMap;
+ std::map<DeviceSyncTarget, DeviceSyncProcess> tempSyncProcessMap;
{
AutoLock lockGuard(this);
- for (const std::string &deviceId : devices_) {
- statuses_.insert(std::pair<std::string, int>(deviceId, OP_WAITING));
+ for (const DeviceSyncTarget &syncTarget : syncTargets_) {
+ statuses_.insert(std::pair<DeviceSyncTarget, int>(syncTarget, OP_WAITING));
DeviceSyncProcess processInfo;
processInfo.errCode = static_cast<DBStatus>(OP_WAITING);
processInfo.syncId = syncId_;
- syncProcessMap_.insert(std::pair<std::string, DeviceSyncProcess>(deviceId, processInfo));
+ syncProcessMap_.insert(std::pair<DeviceSyncTarget, DeviceSyncProcess>(syncTarget, processInfo));
}
if (mode_ == AUTO_PUSH) {
@@ -89,9 +89,10 @@ void SyncOperation::SetOnSyncFinished(const OnSyncFinished &callback)
onFinished_ = callback;
}
-void SyncOperation::SetStatus(const std::string &deviceId, int status, int commErrCode)
+void SyncOperation::SetStatus(const DeviceSyncTarget &syncTarget, int status, int commErrCode)
{
- LOGD("[SyncOperation] SetStatus dev %s{private} status %d commErrCode %d", deviceId.c_str(), status, commErrCode);
+ LOGD("[SyncOperation] SetStatus dev %s{private} user %s status %d commErrCode %d", syncTarget.device.c_str(),
+ syncTarget.userId.c_str(), status, commErrCode);
AutoLock lockGuard(this);
if (IsKilled()) {
LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
@@ -103,12 +104,15 @@ void SyncOperation::SetStatus(const std::string &deviceId, int status, int commE
}
if (userSyncProcessCallback_) {
- if (syncProcessMap_[deviceId].errCode < static_cast<DBStatus>(OP_FINISHED_ALL)) {
- syncProcessMap_[deviceId].errCode = static_cast<DBStatus>(status);
+ if (syncProcessMap_.find(syncTarget) == syncProcessMap_.end()) {
+ LOGW("[SyncOperation] Not found dev %s{private} user %s in sync process!", syncTarget.device.c_str(),
+ syncTarget.userId.c_str());
+ } else if (syncProcessMap_[syncTarget].errCode < static_cast<DBStatus>(OP_FINISHED_ALL)) {
+ syncProcessMap_[syncTarget].errCode = static_cast<DBStatus>(status);
}
}
- auto iter = statuses_.find(deviceId);
+ auto iter = statuses_.find(syncTarget);
if (iter != statuses_.end()) {
if (iter->second >= OP_FINISHED_ALL) {
return;
@@ -117,7 +121,7 @@ void SyncOperation::SetStatus(const std::string &deviceId, int status, int commE
if (((status != OP_COMM_ABNORMAL) && (status != OP_TIMEOUT)) || (commErrCode == E_OK)) {
return;
}
- commErrCodeMap_.insert(std::pair<std::string, int>(deviceId, commErrCode));
+ commErrCodeMap_.insert(std::pair<DeviceSyncTarget, int>(syncTarget, commErrCode));
}
}
@@ -139,12 +143,18 @@ void SyncOperation::SetUnfinishedDevStatus(int status)
}
item.second = status;
}
+ for (auto &item : statuses_) {
+ if (item.second >= OP_FINISHED_ALL) {
+ continue;
+ }
+ item.second = status;
+ }
}
-int SyncOperation::GetStatus(const std::string &deviceId) const
+int SyncOperation::GetStatus(const DeviceSyncTarget &syncTarget) const
{
AutoLock lockGuard(this);
- auto iter = statuses_.find(deviceId);
+ auto iter = statuses_.find(syncTarget);
if (iter != statuses_.end()) {
return iter->second;
}
@@ -161,14 +171,13 @@ int SyncOperation::GetMode() const
return mode_;
}
-void SyncOperation::ReplaceCommErrCode(std::map<std::string, int> &finishStatus)
+void SyncOperation::ReplaceCommErrCode(std::map<DeviceSyncTarget, int> &finishStatus)
{
for (auto &item : finishStatus) {
if ((item.second != OP_COMM_ABNORMAL) && (item.second != OP_TIMEOUT)) {
continue;
}
- std::string deviceId = item.first;
- auto iter = commErrCodeMap_.find(deviceId);
+ auto iter = commErrCodeMap_.find(item.first);
if (iter != commErrCodeMap_.end()) {
item.second = iter->second;
}
@@ -177,8 +186,8 @@ void SyncOperation::ReplaceCommErrCode(std::map<std::string, int> &finishStatus)
void SyncOperation::Finished()
{
- std::map<std::string, int> tmpStatus;
- std::map<std::string, DeviceSyncProcess> tmpProcessMap;
+ std::map<DeviceSyncTarget, int> tmpStatus;
+ std::map<DeviceSyncTarget, DeviceSyncProcess> tmpProcessMap;
{
AutoLock lockGuard(this);
if (IsKilled() || isFinished_) {
@@ -223,7 +232,16 @@ void SyncOperation::Finished()
const std::vector<std::string> &SyncOperation::GetDevices() const
{
- return devices_;
+ std::vector<std::string> devices;
+ for (const auto &syncTarget : syncTargets_) {
+ devices.push_back(syncTarget.device);
+ }
+ return devices;
+}
+
+const std::vector<DeviceSyncTarget> &SyncOperation::GetSyncTargets() const
+{
+ return syncTargets_;
}
void SyncOperation::WaitIfNeed()
@@ -277,14 +295,18 @@ void SyncOperation::SetSyncProcessCallFun(DeviceSyncProcessCallback callBack)
}
}
-void SyncOperation::ExeSyncProcessCallFun(const std::map<std::string, DeviceSyncProcess> &syncProcessMap)
+void SyncOperation::ExeSyncProcessCallFun(const std::map<DeviceSyncTarget, DeviceSyncProcess> &syncProcessMap)
{
+ std::map<std::string, DeviceSyncProcess> deviceSyncProcessMap;
+ for (const auto &process : syncProcessMap) {
+ deviceSyncProcessMap[process.first.device] = process.second;
+ }
if (IsBlockSync()) {
- userSyncProcessCallback_(syncProcessMap);
+ userSyncProcessCallback_(deviceSyncProcessMap);
} else {
RefObject::IncObjRef(this);
- int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, syncProcessMap] {
- userSyncProcessCallback_(syncProcessMap);
+ int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(identifier_, [this, deviceSyncProcessMap] {
+ userSyncProcessCallback_(deviceSyncProcessMap);
RefObject::DecObjRef(this);
});
if (errCode != E_OK) {
@@ -294,24 +316,25 @@ void SyncOperation::ExeSyncProcessCallFun(const std::map<std::string, DeviceSync
}
}
-void SyncOperation::UpdateFinishedCount(const std::string &deviceId, uint32_t count)
+void SyncOperation::UpdateFinishedCount(const DeviceSyncTarget &syncTarget, uint32_t count)
{
if (this->userSyncProcessCallback_) {
- std::map<std::string, DeviceSyncProcess> tmpMap;
+ std::map<DeviceSyncTarget, DeviceSyncProcess> tmpMap;
{
AutoLock lockGuard(this);
if (IsKilled()) {
return;
}
- LOGD("[UpdateFinishedCount] deviceId %s{private} count %u", deviceId.c_str(), count);
- this->syncProcessMap_[deviceId].pullInfo.finishedCount += count;
+ LOGD("[UpdateFinishedCount] deviceId %s{private} user %s count %u", syncTarget.device.c_str(),
+ syncTarget.userId.c_str(), count);
+ this->syncProcessMap_[syncTarget].pullInfo.finishedCount += count;
tmpMap = this->syncProcessMap_;
}
ExeSyncProcessCallFun(tmpMap);
}
}
-void SyncOperation::SetSyncProcessTotal(const std::string &deviceId, uint32_t total)
+void SyncOperation::SetSyncProcessTotal(const DeviceSyncTarget &syncTarget, uint32_t total)
{
if (this->userSyncProcessCallback_) {
{
@@ -319,8 +342,9 @@ void SyncOperation::SetSyncProcessTotal(const std::string &deviceId, uint32_t to
if (IsKilled()) {
return;
}
- LOGD("[SetSyncProcessTotal] total=%u, syncId=%u, deviceId=%s{private}", total, syncId_, deviceId.c_str());
- this->syncProcessMap_[deviceId].pullInfo.total = total;
+ LOGD("[SetSyncProcessTotal] total=%u, syncId=%u, deviceId=%s{private}, userId=%s", total, syncId_,
+ syncTarget.device.c_str(), syncTarget.userId.c_str());
+ this->syncProcessMap_[syncTarget].pullInfo.total = total;
}
}
}
@@ -471,11 +495,12 @@ ProcessStatus SyncOperation::DBStatusTransProcess(int operationStatus)
return result == std::end(syncOperationProcessStatus) ? FINISHED : result->proStatus;
}
-std::string SyncOperation::GetFinishDetailMsg(const std::map<std::string, int> &finishStatus)
+std::string SyncOperation::GetFinishDetailMsg(const std::map<DeviceSyncTarget, int> &finishStatus)
{
std::string msg = "Sync detail is:";
- for (const auto &[dev, status]: finishStatus) {
- msg += "dev=" + DBCommon::StringMasking(dev);
+ for (const auto &[syncTarget, status]: finishStatus) {
+ msg += "dev=" + DBCommon::StringMasking(syncTarget.device);
+ msg += " user=" + syncTarget.userId;
if ((status > static_cast<int>(OP_FINISHED_ALL)) || (status < E_OK)) {
msg += " sync failed, reason is " + std::to_string(status);
} else {
diff --git a/frameworks/libs/distributeddb/syncer/src/sync_operation.h b/frameworks/libs/distributeddb/syncer/src/sync_operation.h
index f3f00a5de..d867d6e7e 100644
--- a/frameworks/libs/distributeddb/syncer/src/sync_operation.h
+++ b/frameworks/libs/distributeddb/syncer/src/sync_operation.h
@@ -59,11 +59,11 @@ public:
OP_NOTADB_OR_CORRUPTED,
};
- using UserCallback = std::function<void(std::map<std::string, int>)>;
+ using UserCallback = std::function<void(std::map<DeviceSyncTarget, int>)>;
using OnSyncFinished = std::function<void(int)>;
using OnSyncFinalize = std::function<void(void)>;
- SyncOperation(uint32_t syncId, const std::vector<std::string> &devices, int mode,
+ SyncOperation(uint32_t syncId, const std::vector<DeviceSyncTarget> &syncTargets, int mode,
const UserCallback &userCallback, bool isBlockSync);
DISABLE_COPY_ASSIGN_MOVE(SyncOperation);
@@ -78,7 +78,7 @@ public:
void SetOnSyncFinished(const OnSyncFinished &callback);
// Set the sync status, running or finished
- void SetStatus(const std::string &deviceId, int status, int commErrCode = E_OK);
+ void SetStatus(const DeviceSyncTarget &syncTarget, int status, int commErrCode = E_OK);
// Set the unfinished devices sync status, running or finished
void SetUnfinishedDevStatus(int status);
@@ -87,7 +87,7 @@ public:
void SetIdentifier(const std::vector<uint8_t> &identifier);
// Get the sync status, running or finished
- int GetStatus(const std::string &deviceId) const;
+ int GetStatus(const DeviceSyncTarget &syncTarget) const;
// Get the sync id.
uint32_t GetSyncId() const;
@@ -101,6 +101,8 @@ public:
// Get the deviceId of this sync status
const std::vector<std::string> &GetDevices() const;
+ const std::vector<DeviceSyncTarget> &GetSyncTargets() const;
+
// Wait if it's a block sync
void WaitIfNeed();
@@ -137,9 +139,9 @@ public:
void SetSyncProcessCallFun(DeviceSyncProcessCallback callBack);
- void SetSyncProcessTotal(const std::string &deviceId, uint32_t total);
+ void SetSyncProcessTotal(const DeviceSyncTarget &syncTarget, uint32_t total);
- void UpdateFinishedCount(const std::string &deviceId, uint32_t count);
+ void UpdateFinishedCount(const DeviceSyncTarget &syncTarget, uint32_t count);
protected:
virtual ~SyncOperation();
@@ -151,12 +153,12 @@ private:
// called by destruction
void Finalize();
- static std::string GetFinishDetailMsg(const std::map<std::string, int> &finishStatus);
+ static std::string GetFinishDetailMsg(const std::map<DeviceSyncTarget, int> &finishStatus);
- void ReplaceCommErrCode(std::map<std::string, int> &finishStatus);
+ void ReplaceCommErrCode(std::map<DeviceSyncTarget, int> &finishStatus);
// The device list
- const std::vector<std::string> devices_;
+ const std::vector<DeviceSyncTarget> syncTargets_;
// The Syncid
uint32_t syncId_;
@@ -177,10 +179,10 @@ private:
DeviceSyncProcessCallback userSyncProcessCallback_;
// The device id we sync with
- std::map<std::string, int> statuses_;
+ std::map<DeviceSyncTarget, int> statuses_;
// passthrough errCode
- std::map<std::string, int> commErrCodeMap_;
+ std::map<DeviceSyncTarget, int> commErrCodeMap_;
// Is this operation is a block sync
volatile bool isBlockSync_;
@@ -204,12 +206,12 @@ private:
std::string identifier_;
// The device id we syncProcess with
- std::map<std::string, DeviceSyncProcess> syncProcessMap_;
+ std::map<DeviceSyncTarget, DeviceSyncProcess> syncProcessMap_;
// Can be cancelled
bool canCancel_ = false;
- void ExeSyncProcessCallFun(const std::map<std::string, DeviceSyncProcess> &syncProcessMap);
+ void ExeSyncProcessCallFun(const std::map<DeviceSyncTarget, DeviceSyncProcess> &syncProcessMap);
};
} // namespace DistributedDB
diff --git a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp
index 9b0b725ac..79e18610e 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp
@@ -143,11 +143,13 @@ HWTEST_F(DistributedDBCommunicatorDeepTest, WaitAndRetrySend001, TestSize.Level2
{
// Preset
Message *msgForBB = nullptr;
- g_commBB->RegOnMessageCallback([&msgForBB](const std::string &srcTarget, Message *inMsg) {
+ g_commBB->RegOnMessageCallback([&msgForBB](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
msgForBB = inMsg;
}, nullptr);
Message *msgForCA = nullptr;
- g_commCA->RegOnMessageCallback([&msgForCA](const std::string &srcTarget, Message *inMsg) {
+ g_commCA->RegOnMessageCallback([&msgForCA](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
msgForCA = inMsg;
}, nullptr);
@@ -329,7 +331,8 @@ HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment001, TestSize.Level2)
{
// Preset
Message *recvMsgForBB = nullptr;
- g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
+ g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
recvMsgForBB = inMsg;
}, nullptr);
@@ -384,7 +387,8 @@ HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment002, TestSize.Level2)
{
// Preset
Message *recvMsgForCC = nullptr;
- g_commCC->RegOnMessageCallback([&recvMsgForCC](const std::string &srcTarget, Message *inMsg) {
+ g_commCC->RegOnMessageCallback([&recvMsgForCC](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
recvMsgForCC = inMsg;
}, nullptr);
@@ -450,7 +454,8 @@ HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment003, TestSize.Level3)
{
// Preset
std::atomic<int> count {0};
- OnMessageCallback callback = [&count](const std::string &srcTarget, Message *inMsg) {
+ OnMessageCallback callback = [&count](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
delete inMsg;
inMsg = nullptr;
count.fetch_add(1, std::memory_order_seq_cst);
@@ -516,7 +521,8 @@ HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment004, TestSize.Level2)
* @tc.steps: step1. connect device A with device B
*/
Message *recvMsgForBB = nullptr;
- g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
+ g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
recvMsgForBB = inMsg;
}, nullptr);
AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
diff --git a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_send_receive_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_send_receive_test.cpp
index b3f7e8ff0..af5b17868 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_send_receive_test.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_send_receive_test.cpp
@@ -135,7 +135,8 @@ static void CheckRecvMessage(Message *recvMsg, bool isEmpty, uint32_t msgId, uin
string srcTargetFor##src##label; \
Message *recvMsgFor##src##label = nullptr; \
g_comm##src##label->RegOnMessageCallback( \
- [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
+ [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, const std::string &userId, \
+ Message *inMsg) { \
srcTargetFor##src##label = srcTarget; \
recvMsgFor##src##label = inMsg; \
}, nullptr);
@@ -527,7 +528,8 @@ HWTEST_F(DistributedDBCommunicatorSendReceiveTest, ReceiveCheck001, TestSize.Lev
{
// Preset
int recvCount = 0;
- g_commAA->RegOnMessageCallback([&recvCount](const std::string &srcTarget, Message *inMsg) {
+ g_commAA->RegOnMessageCallback([&recvCount](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
recvCount++;
if (inMsg != nullptr) {
delete inMsg;
@@ -588,7 +590,8 @@ HWTEST_F(DistributedDBCommunicatorSendReceiveTest, ReceiveCheck002, TestSize.Lev
{
// Preset
int recvCount = 0;
- g_commAA->RegOnMessageCallback([&recvCount](const std::string &srcTarget, Message *inMsg) {
+ g_commAA->RegOnMessageCallback([&recvCount](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
recvCount++;
if (inMsg != nullptr) {
delete inMsg;
diff --git a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_test.cpp
index 8cda98e64..028d4e7bb 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_test.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_test.cpp
@@ -628,7 +628,8 @@ HWTEST_F(DistributedDBCommunicatorTest, ReportCommunicatorNotFound001, TestSize.
ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
ASSERT_NE(commBA, nullptr);
Message *recvMsgForBA = nullptr;
- commBA->RegOnMessageCallback([&recvMsgForBA](const std::string &srcTarget, Message *inMsg) {
+ commBA->RegOnMessageCallback([&recvMsgForBA](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
recvMsgForBA = inMsg;
}, nullptr);
commBA->Activate(USER_ID_1);
@@ -670,7 +671,8 @@ HWTEST_F(DistributedDBCommunicatorTest, ReportCommunicatorNotFound001, TestSize.
string srcTargetFor##src##label; \
Message *recvMsgFor##src##label = nullptr; \
comm##src##label->RegOnMessageCallback( \
- [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
+ [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, const std::string &userId, \
+ Message *inMsg) { \
srcTargetFor##src##label = srcTarget; \
recvMsgFor##src##label = inMsg; \
}, nullptr);
@@ -812,7 +814,8 @@ HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage002, TestSize.Level1)
ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
ASSERT_NE(commBA, nullptr);
std::vector<std::pair<std::string, Message *>> msgCallbackForBA;
- commBA->RegOnMessageCallback([&msgCallbackForBA](const std::string &srcTarget, Message *inMsg) {
+ commBA->RegOnMessageCallback([&msgCallbackForBA](const std::string &srcTarget, const std::string &userId,\
+ Message *inMsg) {
msgCallbackForBA.push_back({srcTarget, inMsg});
}, nullptr);
commBA->Activate();
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp
index 827a7b8e5..f24ceda49 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp
@@ -1186,7 +1186,7 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SyncLifeTest006, TestSize.Level1)
EXPECT_EQ(syncer->Initialize(syncDBInterface, true), E_OK);
virtualCommunicatorAggregator->OnlineDevice(DEVICE_B);
std::thread writeThread([syncer, &DEVICE_B]() {
- EXPECT_EQ(syncer->Sync({DEVICE_B}, PUSH_AND_PULL, nullptr, nullptr, true), E_OK);
+ EXPECT_EQ(syncer->Sync({{DEVICE_B, ""}}, PUSH_AND_PULL, nullptr, nullptr, true), E_OK);
});
std::thread closeThread([syncer, &syncDBInterface]() {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -1318,11 +1318,11 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SyncEngineTest002, TestSize.Level1)
/**
* @tc.steps: step2. add sync operation for DEVICE_A and DEVICE_B. It will create two context for A and B
*/
- std::vector<std::string> devices = {
- "DEVICES_A", "DEVICES_B"
+ std::vector<DeviceSyncTarget> syncTargets = {
+ {"DEVICES_A", ""}, {"DEVICES_B", ""}
};
const int syncId = 1;
- auto operation = new (std::nothrow) SyncOperation(syncId, devices, 0, nullptr, false);
+ auto operation = new (std::nothrow) SyncOperation(syncId, syncTargets, 0, nullptr, false);
if (operation != nullptr) {
enginePtr->AddSyncOperation(operation);
}
@@ -1356,16 +1356,16 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SyncEngineTest003, TestSize.Level1)
{
auto *enginePtr = new (std::nothrow) MockSyncEngine();
ASSERT_NE(enginePtr, nullptr);
- std::vector<std::string> devices = {
- "DEVICES_A", "DEVICES_B"
+ std::vector<DeviceSyncTarget> syncTargets = {
+ {"DEVICES_A", ""}, {"DEVICES_B", ""}
};
const int syncId = 1;
- auto operation = new (std::nothrow) SyncOperation(syncId, devices, 0, nullptr, true);
+ auto operation = new (std::nothrow) SyncOperation(syncId, syncTargets, 0, nullptr, true);
ASSERT_NE(operation, nullptr);
operation->Initialize();
enginePtr->AddSyncOperation(operation);
- for (const auto &device: devices) {
- EXPECT_EQ(operation->GetStatus(device), static_cast<int>(SyncOperation::OP_BUSY_FAILURE));
+ for (const auto &syncTarget : syncTargets) {
+ EXPECT_EQ(operation->GetStatus(syncTarget), static_cast<int>(SyncOperation::OP_BUSY_FAILURE));
}
RefObject::KillAndDecObjRef(operation);
RefObject::KillAndDecObjRef(enginePtr);
@@ -1383,7 +1383,7 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SyncEngineTest004, TestSize.Level0)
auto *enginePtr = new (std::nothrow) MockSyncEngine();
ASSERT_NE(enginePtr, nullptr);
int errCode = E_OK;
- auto *context = enginePtr->CallGetSyncTaskContext("dev", errCode);
+ auto *context = enginePtr->CallGetSyncTaskContext("dev", "", errCode);
EXPECT_EQ(context, nullptr);
EXPECT_EQ(errCode, -E_INVALID_DB);
RefObject::KillAndDecObjRef(enginePtr);
@@ -1720,7 +1720,8 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SyncTaskContextCheck002, TestSize.Leve
*/
auto syncTaskContext = new(std::nothrow) MockSyncTaskContext();
ASSERT_NE(syncTaskContext, nullptr);
- auto operation = new SyncOperation(1u, {}, static_cast<int>(SyncModeType::QUERY_PUSH), nullptr, false);
+ std::vector<DeviceSyncTarget> syncTargets = {};
+ auto operation = new SyncOperation(1u, syncTargets, static_cast<int>(SyncModeType::QUERY_PUSH), nullptr, false);
ASSERT_NE(operation, nullptr);
QuerySyncObject querySyncObject;
operation->SetQuery(querySyncObject);
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_relational_ver_p2p_sync_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_relational_ver_p2p_sync_test.cpp
index 4c79a098c..fbfa831e1 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_relational_ver_p2p_sync_test.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_relational_ver_p2p_sync_test.cpp
@@ -1274,7 +1274,7 @@ HWTEST_F(DistributedDBRelationalVerP2PSyncTest, AutoLaunchSync002, TestSize.Leve
* @tc.steps: step4. Call sync expect sync fail
*/
Query query = Query::Select(g_tableName);
- SyncOperation::UserCallback callBack = [](const std::map<std::string, int> &statusMap) {
+ SyncOperation::UserCallback callBack = [](const std::map<DeviceSyncTarget, int> &statusMap) {
for (const auto &entry : statusMap) {
EXPECT_EQ(entry.second, -E_NOT_FOUND);
}
@@ -1316,7 +1316,7 @@ HWTEST_F(DistributedDBRelationalVerP2PSyncTest, AutoLaunchSync003, TestSize.Leve
* @tc.steps: step4. Call sync expect sync fail
*/
Query query = Query::Select(g_tableName);
- SyncOperation::UserCallback callBack = [](const std::map<std::string, int> &statusMap) {
+ SyncOperation::UserCallback callBack = [](const std::map<DeviceSyncTarget, int> &statusMap) {
for (const auto &entry : statusMap) {
EXPECT_EQ(entry.second, -E_NOT_FOUND);
}
@@ -1504,9 +1504,9 @@ HWTEST_F(DistributedDBRelationalVerP2PSyncTest, AbilitySync004, TestSize.Level1)
Query query = Query::Select(g_tableName);
int res = DB_ERROR;
- auto callBack = [&res](std::map<std::string, int> resMap) {
- if (resMap.find("real_device") != resMap.end()) {
- res = resMap["real_device"];
+ auto callBack = [&res](std::map<DeviceSyncTarget, int> resMap) {
+ if (resMap.find({"real_device", ""}) != resMap.end()) {
+ res = resMap[{"real_device", ""}];
}
};
EXPECT_EQ(g_deviceB->GenericVirtualDevice::Sync(DistributedDB::SYNC_MODE_PULL_ONLY, query, callBack, true), E_OK);
@@ -2933,7 +2933,8 @@ HWTEST_F(DistributedDBRelationalVerP2PSyncTest, AutoLaunchSyncAfterRekey_002, Te
*/
HWTEST_F(DistributedDBRelationalVerP2PSyncTest, SyncTargetTest001, TestSize.Level1) {
MockSyncTaskContext syncTaskContext;
- SyncOperation *operation = new SyncOperation(1, {}, 0, nullptr, false);
+ std::vector<DeviceSyncTarget> syncTargets = {};
+ SyncOperation *operation = new SyncOperation(1, syncTargets, 0, nullptr, false);
EXPECT_NE(operation, nullptr);
std::thread addTarget([&syncTaskContext, &operation]() {
auto *newTarget = new (std::nothrow) SingleVerSyncTarget;
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_complex_sync_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_complex_sync_test.cpp
index b2f6fd4c9..73b78654f 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_complex_sync_test.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_complex_sync_test.cpp
@@ -468,14 +468,15 @@ HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync002, TestSize.Lev
* @tc.expected: step3. sync should return OP_FINISHED_ALL.
*/
std::this_thread::sleep_for(std::chrono::milliseconds(100));
- std::map<std::string, int> virtualResult;
+ std::map<DeviceSyncTarget, int> virtualResult;
g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, query,
- [&virtualResult](const std::map<std::string, int> &map) {
+ [&virtualResult](const std::map<DeviceSyncTarget, int> &map) {
virtualResult = map;
}, true);
EXPECT_TRUE(status == OK);
ASSERT_EQ(virtualResult.size(), devices.size());
- EXPECT_EQ(virtualResult[DEVICE_A], SyncOperation::OP_FINISHED_ALL);
+ int deviceAStatus = virtualResult[{DEVICE_A, ""}];
+ EXPECT_EQ(deviceAStatus, SyncOperation::OP_FINISHED_ALL);
g_communicatorAggregator->RegOnDispatch(nullptr);
subThread.join();
}
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_subscribe_sync_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_subscribe_sync_test.cpp
index bb9b2db60..df2db904a 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_subscribe_sync_test.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_subscribe_sync_test.cpp
@@ -718,8 +718,11 @@ HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager006, TestSi
ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
EXPECT_EQ(subManager.IsLastRemoteContainSubscribe(DEVICE_A, queryId), false);
- deviceAQueies.push_back(DEVICE_A);
- EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAQueies, queryCommonObj), E_OK);
+ std::vector<DeviceSyncTarget> deviceAUserQueies;
+ for (const auto &device : deviceAQueies) {
+ deviceAUserQueies.push_back({device, ""});
+ }
+ EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAUserQueies, queryCommonObj), E_OK);
/**
* @tc.steps: step4. add MAX_DEVICES_NUM device, then call LocalSubscribeLimitCheck
@@ -728,7 +731,11 @@ HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager006, TestSi
for (size_t i = 0 ; i < MAX_DEVICES_NUM; i++) {
deviceAQueies.push_back("device_" + std::to_string(i));
}
- EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAQueies, queryCommonObj), -E_MAX_LIMITS);
+ std::vector<DeviceSyncTarget> targets;
+ for (const auto &device : deviceAQueies) {
+ targets.push_back({device, ""});
+ }
+ EXPECT_EQ(subManager.LocalSubscribeLimitCheck(targets, queryCommonObj), -E_MAX_LIMITS);
}
/**
@@ -1180,9 +1187,10 @@ HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync009, TestSize.
* @tc.steps: step3. 32 unsubscribe
*/
LOGI("============step 3============");
- SyncOperation::UserCallback callback = [](std::map<std::string, int> res) {
+ SyncOperation::UserCallback callback = [](std::map<DeviceSyncTarget, int> res) {
ASSERT_EQ(res.size(), 1u);
- EXPECT_EQ(res["real_device"], SyncOperation::OP_FINISHED_ALL);
+ DeviceSyncTarget target = {"real_device", ""};
+ EXPECT_EQ(res[target], SyncOperation::OP_FINISHED_ALL);
};
for (const auto &dev: devices) {
dev->UnSubscribe(QuerySyncObject(query), true, 1, callback); // sync id is 1
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_check_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_check_test.cpp
index 19a066635..5acb15f7f 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_check_test.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_single_ver_p2p_sync_check_test.cpp
@@ -1946,18 +1946,19 @@ HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify001, TestSize.Leve
* @tc.expected: step2. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
*/
std::map<std::string, DBStatus> result;
- std::map<std::string, int> virtualRes;
+ std::map<DeviceSyncTarget, int> virtualRes;
status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
EXPECT_EQ(status, OK);
EXPECT_EQ(result.size(), devices.size());
EXPECT_EQ(result[DEVICE_B], TIME_OUT);
std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
Query query = Query::Select();
- g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
+ g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<DeviceSyncTarget, int> resMap) {
virtualRes = std::move(resMap);
}, true);
EXPECT_EQ(virtualRes.size(), devices.size());
- EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_TIMEOUT));
+ DeviceSyncTarget target = {DEVICE_A, ""};
+ EXPECT_EQ(virtualRes[target], static_cast<int>(SyncOperation::OP_TIMEOUT));
std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
/**
@@ -1973,11 +1974,11 @@ HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify001, TestSize.Leve
EXPECT_EQ(result.size(), devices.size());
EXPECT_EQ(result[DEVICE_B], OK);
std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
- g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
+ g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<DeviceSyncTarget, int> resMap) {
virtualRes = std::move(resMap);
}, true);
EXPECT_EQ(virtualRes.size(), devices.size());
- EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_FINISHED_ALL));
+ EXPECT_EQ(virtualRes[target], static_cast<int>(SyncOperation::OP_FINISHED_ALL));
g_deviceB->DelayGetSyncData(0);
}
@@ -2013,9 +2014,9 @@ HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify002, TestSize.Leve
* @tc.steps: step3. deviceB call sync and wait
*/
std::thread asyncThread([]() {
- std::map<std::string, int> virtualRes;
+ std::map<DeviceSyncTarget, int> virtualRes;
Query query = Query::Select();
- g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
+ g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<DeviceSyncTarget, int> resMap) {
virtualRes = std::move(resMap);
}, true);
});
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp
index b68cae6e7..23494c5f6 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp
@@ -109,7 +109,8 @@ int GenericVirtualDevice::Initialize(VirtualCommunicatorAggregator *comAggregato
return -E_OUT_OF_MEMORY;
}
communicateHandle_->RegOnMessageCallback(
- std::bind(&GenericVirtualDevice::MessageCallback, this, std::placeholders::_1, std::placeholders::_2), []() {});
+ std::bind(&GenericVirtualDevice::MessageCallback, this, std::placeholders::_1, std::placeholders::_2,
+ std::placeholders::_3), []() {});
context_->Initialize(remoteDeviceId_, storage_, metadata_, communicateHandle_);
context_->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
context_->RegOnSyncTask(std::bind(&GenericVirtualDevice::StartResponseTask, this));
@@ -132,7 +133,7 @@ std::string GenericVirtualDevice::GetDeviceId() const
return deviceId_;
}
-int GenericVirtualDevice::MessageCallback(const std::string &deviceId, Message *inMsg)
+int GenericVirtualDevice::MessageCallback(const std::string &deviceId, const std::string &userId, Message *inMsg)
{
if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
if (onRemoteDataChanged_) {
@@ -236,7 +237,7 @@ void GenericVirtualDevice::OnDeviceSyncProcess(const std::map<std::string, Devic
int GenericVirtualDevice::Sync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess)
{
- auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, option.mode, nullptr, option.isWait);
+ auto operation = new (std::nothrow) SyncOperation(1, {{remoteDeviceId_, ""}}, option.mode, nullptr, option.isWait);
if (operation == nullptr) {
return -E_OUT_OF_MEMORY;
}
@@ -264,7 +265,7 @@ int GenericVirtualDevice::Sync(const DeviceSyncOption &option, const DeviceSyncP
int GenericVirtualDevice::Sync(SyncMode mode, bool wait)
{
- auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, nullptr, wait);
+ auto operation = new (std::nothrow) SyncOperation(1, {{remoteDeviceId_, ""}}, mode, nullptr, wait);
if (operation == nullptr) {
return -E_OUT_OF_MEMORY;
}
@@ -286,7 +287,7 @@ int GenericVirtualDevice::Sync(SyncMode mode, const Query &query, bool wait)
int GenericVirtualDevice::Sync(SyncMode mode, const Query &query,
const SyncOperation::UserCallback &callBack, bool wait)
{
- auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, callBack, wait);
+ auto operation = new (std::nothrow) SyncOperation(1, {{remoteDeviceId_, ""}}, mode, callBack, wait);
if (operation == nullptr) {
return -E_OUT_OF_MEMORY;
}
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h
index 462dd6948..3c6976d63 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h
@@ -34,7 +34,7 @@ public:
int Initialize(VirtualCommunicatorAggregator *comAggregator, ISyncInterface *syncInterface);
void SetDeviceId(const std::string &deviceId);
std::string GetDeviceId() const;
- int MessageCallback(const std::string &deviceId, Message *inMsg);
+ int MessageCallback(const std::string &deviceId, const std::string &userId, Message *inMsg);
void OnRemoteDataChanged(const std::function<void(const std::string &)> &callback);
void Online();
void Offline();
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/kv_virtual_device.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/kv_virtual_device.cpp
index c2b58e7b8..43d327079 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/kv_virtual_device.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/kv_virtual_device.cpp
@@ -97,7 +97,7 @@ int KvVirtualDevice::Commit()
int KvVirtualDevice::Subscribe(QuerySyncObject query, bool wait, int id)
{
- auto operation = new (std::nothrow) SyncOperation(id, {remoteDeviceId_}, SUBSCRIBE_QUERY, nullptr, wait);
+ auto operation = new (std::nothrow) SyncOperation(id, {{remoteDeviceId_, ""}}, SUBSCRIBE_QUERY, nullptr, wait);
if (operation == nullptr) {
return -E_OUT_OF_MEMORY;
}
@@ -120,7 +120,7 @@ int KvVirtualDevice::UnSubscribe(QuerySyncObject query, bool wait, int id)
int KvVirtualDevice::UnSubscribe(const QuerySyncObject &query, bool wait, int id,
const SyncOperation::UserCallback &callback)
{
- auto operation = new (std::nothrow) SyncOperation(id, {remoteDeviceId_}, UNSUBSCRIBE_QUERY, callback, wait);
+ auto operation = new (std::nothrow) SyncOperation(id, {{remoteDeviceId_, ""}}, UNSUBSCRIBE_QUERY, callback, wait);
if (operation == nullptr) {
return -E_OUT_OF_MEMORY;
}
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h
index 1545ea636..fa6f8f0df 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h
@@ -29,9 +29,9 @@ public:
subManager_ = std::make_shared<SubscribeManager>();
}
- ISyncTaskContext *CallGetSyncTaskContext(const std::string &deviceId, int &errCode)
+ ISyncTaskContext *CallGetSyncTaskContext(const std::string &deviceId, const std::string &userId, int &errCode)
{
- return SyncEngine::GetSyncTaskContext(deviceId, errCode);
+ return SyncEngine::GetSyncTaskContext(deviceId, userId, errCode);
}
};
} // namespace DistributedDB
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp
index 9d6df07de..f2b85cd0e 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp
@@ -97,7 +97,7 @@ void VirtualCommunicator::CallbackOnMessage(const std::string &srcTarget, Messag
std::lock_guard<std::mutex> lock(onMessageLock_);
if (isEnable_ && onMessage_ && (srcTarget != deviceId_) && ((inMsg->GetMessageId() != dropMsgId_) ||
(dropMsgTimes_ == 0))) {
- onMessage_(srcTarget, inMsg);
+ onMessage_(srcTarget, "", inMsg);
} else {
LOGD("drop msg from dev=%s, localDev=%s", srcTarget.c_str(), deviceId_.c_str());
if (dropMsgTimes_ > 0) {
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/liao-yonghuang/lyh.git
git@gitee.com:liao-yonghuang/lyh.git
liao-yonghuang
lyh
lyh
master

搜索帮助