1 Star 0 Fork 0

廖永煌/lyh

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
0506.diff 88.95 KB
一键复制 编辑 原始数据 按行查看 历史
廖永煌 提交于 2025-05-08 16:11 +08:00 . 1
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a70dc8a01..ff1d2789f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -321,7 +321,8 @@ endif()
set(DB_PATH ${PROJECT_SOURCE_DIR}/frameworks/libs/distributeddb)
-add_executable(distributed_ut ${DISTRIBUTEDDB_SRC} ${SECUREC_SRC} ${SQLITE_SRC})
+add_executable(distributed_ut ${DISTRIBUTEDDB_SRC} ${SECUREC_SRC} ${SQLITE_SRC}
+ frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_relational_multi_user_sync_test.cpp)
target_link_libraries(
distributed_ut
diff --git a/frameworks/libs/distributeddb/communicator/include/icommunicator.h b/frameworks/libs/distributeddb/communicator/include/icommunicator.h
index 4b73ab326..578bec232 100644
--- a/frameworks/libs/distributeddb/communicator/include/icommunicator.h
+++ b/frameworks/libs/distributeddb/communicator/include/icommunicator.h
@@ -26,7 +26,7 @@
namespace DistributedDB {
// inMsg is heap memory, its ownership transfers by calling OnMessageCallback
-using OnMessageCallback = std::function<void(const std::string &srcTarget, Message *inMsg)>;
+using OnMessageCallback = std::function<void(const std::string &srcTarget, const std::string userId, Message *inMsg)>;
constexpr uint32_t SEND_TIME_OUT = 3000; // 3s
struct SendConfig {
diff --git a/frameworks/libs/distributeddb/communicator/src/communicator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator.cpp
index 608adaf91..027424553 100644
--- a/frameworks/libs/distributeddb/communicator/src/communicator.cpp
+++ b/frameworks/libs/distributeddb/communicator/src/communicator.cpp
@@ -144,7 +144,7 @@ int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg
return errCode;
}
-void Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf)
+void Communicator::OnBufferReceive(const std::string &srcTarget, const std::string &userId, const SerialBuffer *inBuf)
{
std::lock_guard<std::mutex> messageHandleLockGuard(messageHandleMutex_);
if (srcTarget.size() != 0 && inBuf != nullptr && onMessageHandle_) {
@@ -166,7 +166,7 @@ void Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuf
return;
}
LOGI("[Comm][Receive] label=%.3s, srcTarget=%s{private}.", VEC_TO_STR(commLabel_), srcTarget.c_str());
- onMessageHandle_(srcTarget, message);
+ onMessageHandle_(srcTarget, userId, message);
} else {
LOGE("[Comm][Receive] label=%.3s, src.size=%zu or buf or handle invalid.", VEC_TO_STR(commLabel_),
srcTarget.size());
diff --git a/frameworks/libs/distributeddb/communicator/src/communicator.h b/frameworks/libs/distributeddb/communicator/src/communicator.h
index 976ce0ae9..5bee40aaf 100644
--- a/frameworks/libs/distributeddb/communicator/src/communicator.h
+++ b/frameworks/libs/distributeddb/communicator/src/communicator.h
@@ -56,7 +56,7 @@ public:
const OnSendEnd &onEnd) override;
// Call by CommunicatorAggregator directly
- void OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf);
+ void OnBufferReceive(const std::string &srcTarget, const std::string &userId, const SerialBuffer *inBuf);
// Call by CommunicatorAggregator directly
void OnConnectChange(const std::string &target, bool isConnect);
diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp
index ea724de99..ee3bbaf75 100644
--- a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp
+++ b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp
@@ -276,7 +276,7 @@ void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel, co
// Do Redeliver, the communicator is responsible to deal with the frame
std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
for (auto &entry : framesToRedeliver) {
- commMap_[userId].at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer);
+ commMap_[userId].at(commLabel).first->OnBufferReceive(entry.srcTarget, userId, entry.buffer);
}
}
@@ -749,7 +749,7 @@ int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const s
{
// Ignore nonactivated communicator, which is regarded as inexistent
if (commMap_[userId].count(toLabel) != 0 && commMap_[userId].at(toLabel).second) {
- commMap_[userId].at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer);
+ commMap_[userId].at(toLabel).first->OnBufferReceive(srcTarget, userId, inFrameBuffer);
// Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
inFrameBuffer = nullptr;
return E_OK;
@@ -771,7 +771,7 @@ int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const s
}
}
if (communicator != nullptr && (userId.empty() || isEmpty)) {
- communicator->OnBufferReceive(srcTarget, inFrameBuffer);
+ communicator->OnBufferReceive(srcTarget, userId, inFrameBuffer);
inFrameBuffer = nullptr;
return E_OK;
}
diff --git a/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h b/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h
index 08870ff23..c39b5205b 100644
--- a/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h
+++ b/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h
@@ -204,6 +204,8 @@ public:
const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete,
const Query &query, bool wait) = 0;
+ DB_API virtual DBStatus Sync(const DeviceSyncParam &param, const KvDeviceSyncOnCompleteCallback &onComplete) = 0;
+
// Sync with device, provides sync count information
DB_API virtual DBStatus Sync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess) = 0;
diff --git a/frameworks/libs/distributeddb/interfaces/include/relational/relational_store_delegate.h b/frameworks/libs/distributeddb/interfaces/include/relational/relational_store_delegate.h
index b58f83fe2..2370db55d 100644
--- a/frameworks/libs/distributeddb/interfaces/include/relational/relational_store_delegate.h
+++ b/frameworks/libs/distributeddb/interfaces/include/relational/relational_store_delegate.h
@@ -61,6 +61,8 @@ public:
DB_API virtual DBStatus Sync(const std::vector<std::string> &devices, SyncMode mode,
const Query &query, const SyncStatusCallback &onComplete, bool wait) = 0;
+ DB_API virtual DBStatus Sync(const DeviceSyncParam &param, const RdbDeviceSyncOnCompleteCallback &onComplete) = 0;
+
DB_API virtual int32_t GetCloudSyncTaskCount()
{
return 0;
diff --git a/frameworks/libs/distributeddb/interfaces/include/store_types.h b/frameworks/libs/distributeddb/interfaces/include/store_types.h
index a5dd8fd81..9e189b10f 100644
--- a/frameworks/libs/distributeddb/interfaces/include/store_types.h
+++ b/frameworks/libs/distributeddb/interfaces/include/store_types.h
@@ -20,6 +20,7 @@
#include <map>
#include <set>
#include <string>
+#include <utility>
#include "query.h"
#include "types_export.h"
@@ -196,6 +197,29 @@ struct SyncProcess {
struct DeviceSyncOption {
std::vector<std::string> devices;
+ std::string userId;
+ SyncMode mode = SYNC_MODE_PULL_ONLY;
+ Query query; // isQuery must be set to true if the query is set
+ bool isQuery = false;
+ bool isWait = true;
+};
+
+struct DeviceSyncTarget {
+ std::string device;
+ std::string userId;
+
+ DeviceSyncTarget(std::string device, std::string userId) : device(std::move(device)), userId(std::move(userId)) {}
+
+ bool operator<(const DeviceSyncTarget& other) const {
+ if (device == other.device) {
+ return userId < other.userId;
+ }
+ return device < other.device;
+ }
+};
+
+struct DeviceSyncParam {
+ std::vector<DeviceSyncTarget> syncTargets;
SyncMode mode = SYNC_MODE_PULL_ONLY;
Query query; // isQuery must be set to true if the query is set
bool isQuery = false;
@@ -224,6 +248,11 @@ using SyncProcessCallback = std::function<void(const std::map<std::string, SyncP
using DeviceSyncProcessCallback = std::function<void(const std::map<std::string, DeviceSyncProcess> &processMap)>;
+using KvDeviceSyncOnCompleteCallback = std::function<void(const std::map<DeviceSyncTarget, DBStatus> &devicesMap)>;
+
+using RdbDeviceSyncOnCompleteCallback = std::function<void(const std::map<DeviceSyncTarget,
+ std::vector<TableStatus>> &devicesMap)>;
+
struct RemoteCondition {
std::string sql; // The sql statement;
std::vector<std::string> bindArgs; // The bind args.
diff --git a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp
index e38585b3d..916650947 100644
--- a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp
+++ b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp
@@ -624,6 +624,42 @@ DBStatus KvStoreNbDelegateImpl::Sync(const std::vector<std::string> &devices, Sy
return OK;
}
+DBStatus KvStoreNbDelegateImpl::Sync(const DeviceSyncParam &param, const KvDeviceSyncOnCompleteCallback &onComplete)
+{
+ if (conn_ == nullptr) {
+ LOGE("%s", INVALID_CONNECTION);
+ return DB_ERROR;
+ }
+ if (param.mode > SYNC_MODE_PUSH_PULL) {
+ LOGE("not support other mode");
+ return NOT_SUPPORT;
+ }
+
+ QuerySyncObject querySyncObj(param.query);
+ if (!querySyncObj.GetRelationTableNames().empty()) {
+ LOGE("check query table names from tables failed!");
+ return NOT_SUPPORT;
+ }
+
+ if (!DBCommon::CheckQueryWithoutMultiTable(param.query)) {
+ LOGE("not support for invalid query");
+ return NOT_SUPPORT;
+ }
+ if (querySyncObj.GetSortType() != SortType::NONE || querySyncObj.IsQueryByRange()) {
+ LOGE("not support order by timestamp and query by range");
+ return NOT_SUPPORT;
+ }
+ PragmaSync pragmaData(param, [this, onComplete](const std::map<DeviceSyncTarget, int> &statuses) {
+ OnSyncComplete(statuses, onComplete);
+ });
+ int errCode = conn_->Pragma(PRAGMA_SYNC_DEVICES, &pragmaData);
+ if (errCode < E_OK) {
+ LOGE("[KvStoreNbDelegate] QuerySync data failed:%d", errCode);
+ return TransferDBErrno(errCode);
+ }
+ return OK;
+}
+
void KvStoreNbDelegateImpl::OnDeviceSyncProcess(const std::map<std::string, DeviceSyncProcess> &processMap,
const DeviceSyncProcessCallback &onProcess) const
{
@@ -1028,6 +1064,19 @@ void KvStoreNbDelegateImpl::OnSyncComplete(const std::map<std::string, int> &sta
}
}
+void KvStoreNbDelegateImpl::OnSyncComplete(const std::map<DeviceSyncTarget, int> &statuses,
+ const KvDeviceSyncOnCompleteCallback &onComplete) const
+{
+ std::map<DeviceSyncTarget, DBStatus> result;
+ for (const auto &pair : statuses) {
+ DBStatus status = SyncOperation::DBStatusTrans(pair.second);
+ result.insert(std::pair<DeviceSyncTarget, DBStatus>(pair.first, status));
+ }
+ if (onComplete) {
+ onComplete(result);
+ }
+}
+
DBStatus KvStoreNbDelegateImpl::SetEqualIdentifier(const std::string &identifier,
const std::vector<std::string> &targets)
{
diff --git a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h
index e03bc6f3c..ea45512cb 100644
--- a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h
+++ b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h
@@ -133,6 +133,8 @@ public:
const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete,
const Query &query, bool wait) override;
+ DBStatus Sync(const DeviceSyncParam &param, const KvDeviceSyncOnCompleteCallback &onComplete);
+
// Sync with devices, provides sync count information
DBStatus Sync(const DeviceSyncOption &option, const DeviceSyncProcessCallback &onProcess) override;
@@ -205,6 +207,9 @@ private:
void OnSyncComplete(const std::map<std::string, int> &statuses,
const std::function<void(const std::map<std::string, DBStatus> &devicesMap)> &onComplete) const;
+
+ void OnSyncComplete(const std::map<DeviceSyncTarget, int> &statuses,
+ const KvDeviceSyncOnCompleteCallback &onComplete) const;
void OnDeviceSyncProcess(const std::map<std::string, DeviceSyncProcess> &processMap,
const DeviceSyncProcessCallback &onProcess) const;
diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp b/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp
index 5cf4ebdb9..989f86d1d 100644
--- a/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp
+++ b/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp
@@ -121,6 +121,35 @@ DBStatus RelationalStoreDelegateImpl::Sync(const std::vector<std::string> &devic
return OK;
}
+DBStatus RelationalStoreDelegateImpl::Sync(const DeviceSyncParam &param,
+ const RdbDeviceSyncOnCompleteCallback &onComplete)
+{
+ if (conn_ == nullptr) {
+ LOGE("Invalid connection for operation!");
+ return DB_ERROR;
+ }
+
+ if (param.mode > SYNC_MODE_PUSH_PULL) {
+ LOGE("not support other mode");
+ return NOT_SUPPORT;
+ }
+
+ if (!param.isQuery || !DBCommon::CheckQueryWithoutMultiTable(param.query)) {
+ LOGE("not set query or not support query with tables");
+ return NOT_SUPPORT;
+ }
+ RelationalStoreConnection::SyncInfo syncInfo{param.syncTargets, param.mode,
+ [this, onComplete](const std::map<DeviceSyncTarget, std::vector<TableStatus>> &devicesStatus) {
+ OnSyncComplete(devicesStatus, onComplete);
+ }, param.query, param.isWait};
+ int errCode = conn_->SyncToDevice(syncInfo);
+ if (errCode != E_OK) {
+ LOGW("[RelationalStore Delegate] sync data to device failed:%d", errCode);
+ return TransferDBErrno(errCode);
+ }
+ return OK;
+}
+
DBStatus RelationalStoreDelegateImpl::RemoveDeviceData(const std::string &device, const std::string &tableName)
{
if (conn_ == nullptr) {
@@ -185,6 +214,23 @@ void RelationalStoreDelegateImpl::OnSyncComplete(const std::map<std::string, std
}
}
+void RelationalStoreDelegateImpl::OnSyncComplete(const std::map<DeviceSyncTarget, std::vector<TableStatus>> &devicesStatus,
+ const RdbDeviceSyncOnCompleteCallback &onComplete)
+{
+ std::map<DeviceSyncTarget, std::vector<TableStatus>> res;
+ for (const auto &[syncTarget, tablesStatus] : devicesStatus) {
+ for (const auto &tableStatus : tablesStatus) {
+ TableStatus table;
+ table.tableName = tableStatus.tableName;
+ table.status = SyncOperation::DBStatusTrans(tableStatus.status);
+ res[syncTarget].push_back(table);
+ }
+ }
+ if (onComplete) {
+ onComplete(res);
+ }
+}
+
DBStatus RelationalStoreDelegateImpl::RemoteQuery(const std::string &device, const RemoteCondition &condition,
uint64_t timeout, std::shared_ptr<ResultSet> &result)
{
diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.h b/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.h
index f53b07233..d9faa9e69 100644
--- a/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.h
+++ b/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.h
@@ -34,6 +34,8 @@ public:
DBStatus Sync(const std::vector<std::string> &devices, SyncMode mode,
const Query &query, const SyncStatusCallback &onComplete, bool wait) override;
+ DBStatus Sync(const DeviceSyncParam &param, const RdbDeviceSyncOnCompleteCallback &onComplete) override;
+
DBStatus RemoveDeviceDataInner(const std::string &device, ClearMode mode) override;
DBStatus CreateDistributedTableInner(const std::string &tableName, TableSyncType type) override;
@@ -98,6 +100,8 @@ public:
private:
static void OnSyncComplete(const std::map<std::string, std::vector<TableStatus>> &devicesStatus,
const SyncStatusCallback &onComplete);
+ static void OnSyncComplete(const std::map<DeviceSyncTarget, std::vector<TableStatus>> &devicesStatus,
+ const RdbDeviceSyncOnCompleteCallback &onComplete);
#ifdef USE_DISTRIBUTEDDB_CLOUD
DBStatus ClearWatermark(const ClearMetaDataOption &option);
diff --git a/frameworks/libs/distributeddb/storage/include/kvdb_pragma.h b/frameworks/libs/distributeddb/storage/include/kvdb_pragma.h
index dd47e8fd5..44b912839 100644
--- a/frameworks/libs/distributeddb/storage/include/kvdb_pragma.h
+++ b/frameworks/libs/distributeddb/storage/include/kvdb_pragma.h
@@ -103,9 +103,22 @@ struct PragmaSync {
{
}
+ PragmaSync(const DeviceSyncParam &param,
+ const std::function<void(const std::map<DeviceSyncTarget, int> &devicesMap)> &onComplete)
+ : syncTargets_(param.syncTargets),
+ mode_(param.mode),
+ wait_(param.isWait),
+ isQuerySync_(param.isQuery),
+ query_(param.query),
+ onCompleteV2_(onComplete)
+ {
+ }
+
std::vector<std::string> devices_;
+ std::vector<DeviceSyncTarget> syncTargets_;
int mode_;
std::function<void(const std::map<std::string, int> &devicesMap)> onComplete_;
+ std::function<void(const std::map<DeviceSyncTarget, int> &devicesMap)> onCompleteV2_;
bool wait_;
bool isQuerySync_;
QuerySyncObject query_;
diff --git a/frameworks/libs/distributeddb/storage/include/relational_store_connection.h b/frameworks/libs/distributeddb/storage/include/relational_store_connection.h
index 67bdc3548..72f6c05a1 100644
--- a/frameworks/libs/distributeddb/storage/include/relational_store_connection.h
+++ b/frameworks/libs/distributeddb/storage/include/relational_store_connection.h
@@ -32,9 +32,31 @@ using RelationalObserverAction =
class RelationalStoreConnection : public IConnection, public virtual RefObject {
public:
struct SyncInfo {
- const std::vector<std::string> &devices;
+ SyncInfo(const std::vector<std::string> &devices, SyncMode mode, const SyncStatusCallback onComplete,
+ const Query &query, bool wait)
+ : devices(devices),
+ mode(mode),
+ onComplete(onComplete),
+ query(query),
+ wait(wait)
+ {
+ }
+
+ SyncInfo(const std::vector<DeviceSyncTarget> &syncTargets, SyncMode mode,
+ const RdbDeviceSyncOnCompleteCallback onComplete, const Query &query, bool wait)
+ : syncTargets(syncTargets),
+ mode(mode),
+ onCompleteV2(onComplete),
+ query(query),
+ wait(wait)
+ {
+ }
+
+ const std::vector<std::string> devices;
+ const std::vector<DeviceSyncTarget> syncTargets;
SyncMode mode = SYNC_MODE_PUSH_PULL;
const SyncStatusCallback onComplete = nullptr;
+ const RdbDeviceSyncOnCompleteCallback onCompleteV2 = nullptr;
const Query &query;
bool wait = true;
};
diff --git a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp
index 94275ffec..15c5e17ab 100644
--- a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp
+++ b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp
@@ -146,7 +146,11 @@ int SyncAbleKvDBConnection::PragmaSyncAction(const PragmaSync *syncParameter)
}
ISyncer::SyncParma syncParam;
- syncParam.devices = syncParameter->devices_;
+ if (syncParameter->syncTargets_.empty()) {
+ syncParam.devices = syncParameter->devices_;
+ } else {
+ syncParam.syncTargets = syncParameter->syncTargets_;
+ }
syncParam.mode = syncParameter->mode_;
syncParam.wait = syncParameter->wait_;
syncParam.isQuerySync = syncParameter->isQuerySync_;
@@ -164,6 +168,12 @@ int SyncAbleKvDBConnection::PragmaSyncAction(const PragmaSync *syncParameter)
OnDeviceSyncProcess(syncRecordMap, onSyncProcess);
};
}
+ if (syncParameter->onCompleteV2_) {
+ syncParam.onCompleteV2 = [this, onComplete = syncParameter->onCompleteV2_](
+ const std::map<DeviceSyncTarget, int> &statuses) {
+ OnSyncComplete(statuses, onComplete);
+ };
+ }
int errCode = kvDB->Sync(syncParam, GetConnectionId());
if (errCode != E_OK) {
@@ -224,6 +234,23 @@ void SyncAbleKvDBConnection::OnSyncComplete(const std::map<std::string, int> &st
}
}
+void SyncAbleKvDBConnection::OnSyncComplete(const std::map<DeviceSyncTarget, int> &statuses,
+ const std::function<void(const std::map<DeviceSyncTarget, int> &devicesMap)> &onComplete)
+{
+ AutoLock lockGuard(this);
+ if (!IsKilled() && onComplete) {
+ // Drop the lock before invoking the callback.
+ // Do pragma-sync again in the prev sync callback is supported.
+ UnlockObj();
+ // The connection may be closed after UnlockObj().
+ // RACE: 'KillObj()' against 'onComplete()'.
+ if (!IsKilled()) {
+ onComplete(statuses);
+ }
+ LockObj();
+ }
+}
+
void SyncAbleKvDBConnection::OnDeviceSyncProcess(const std::map<std::string, DeviceSyncProcess> &syncRecordMap,
const DeviceSyncProcessCallback &onProcess)
{
diff --git a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h
index 4594b3dd9..cc625b827 100644
--- a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h
+++ b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h
@@ -68,8 +68,12 @@ private:
void OnSyncComplete(const std::map<std::string, int> &statuses,
const std::function<void(const std::map<std::string, int> &devicesMap)> &onComplete, bool wait);
+ void OnSyncComplete(const std::map<DeviceSyncTarget, int> &statuses,
+ const std::function<void(const std::map<DeviceSyncTarget, int> &devicesMap)> &onComplete);
+
+
void OnDeviceSyncProcess(const std::map<std::string, DeviceSyncProcess> &syncRecordMap,
- const DeviceSyncProcessCallback &onProcess);
+ const DeviceSyncProcessCallback &onProcess);
int GetQueuedSyncSize(int *queuedSyncSize) const;
diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp
index f9db627ac..449798278 100644
--- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp
+++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store_connection.cpp
@@ -255,11 +255,19 @@ int SQLiteRelationalStoreConnection::SyncToDevice(SyncInfo &info)
}
ISyncer::SyncParma syncParam;
- syncParam.devices = info.devices;
+ if (info.syncTargets.empty()) {
+ syncParam.devices = info.devices;
+ } else {
+ syncParam.syncTargets = info.syncTargets;
+ }
syncParam.mode = info.mode;
syncParam.wait = info.wait;
syncParam.isQuerySync = true;
- syncParam.relationOnComplete = info.onComplete;
+ if (info.onCompleteV2 != nullptr) {
+ syncParam.relationOnCompleteV2 = info.onCompleteV2;
+ } else {
+ syncParam.relationOnComplete = info.onComplete;
+ }
syncParam.syncQuery = QuerySyncObject(info.query);
if (syncParam.syncQuery.GetSortType() != SortType::NONE) {
LOGE("not support order by timestamp, type: %d", static_cast<int>(syncParam.syncQuery.GetSortType()));
diff --git a/frameworks/libs/distributeddb/syncer/include/isyncer.h b/frameworks/libs/distributeddb/syncer/include/isyncer.h
index 61e0c44c4..03b1f895a 100644
--- a/frameworks/libs/distributeddb/syncer/include/isyncer.h
+++ b/frameworks/libs/distributeddb/syncer/include/isyncer.h
@@ -37,8 +37,11 @@ class ISyncer {
public:
struct SyncParma {
std::vector<std::string> devices;
+ std::vector<DeviceSyncTarget> syncTargets;
std::function<void(const std::map<std::string, int> &devicesMap)> onComplete;
+ std::function<void(const std::map<DeviceSyncTarget, int> &devicesMap)> onCompleteV2;
SyncStatusCallback relationOnComplete;
+ RdbDeviceSyncOnCompleteCallback relationOnCompleteV2;
std::function<void(void)> onFinalize;
int mode = 0;
bool wait = false;
diff --git a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp
index 772dee8f9..63990d7a7 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp
@@ -234,8 +234,14 @@ int GenericSyncer::CancelSync(uint32_t syncId)
int GenericSyncer::PrepareSync(const SyncParma &param, uint32_t syncId, uint64_t connectionId)
{
- auto *operation =
- new (std::nothrow) SyncOperation(syncId, param.devices, param.mode, param.onComplete, param.wait);
+ SyncOperation *operation = nullptr;
+ if (param.syncTargets.empty()) {
+ operation = new(std::nothrow) SyncOperation(syncId, param.devices, param.mode, param.onComplete,
+ param.wait);
+ } else {
+ operation = new(std::nothrow) SyncOperation(syncId, param.syncTargets, param.mode, param.onCompleteV2,
+ param.wait);
+ }
if (operation == nullptr) {
SubQueuedSyncSize();
return -E_OUT_OF_MEMORY;
@@ -246,8 +252,9 @@ int GenericSyncer::PrepareSync(const SyncParma &param, uint32_t syncId, uint64_t
std::lock_guard<std::mutex> autoLock(syncerLock_);
PerformanceAnalysis::GetInstance()->StepTimeRecordStart(PT_TEST_RECORDS::RECORD_SYNC_TOTAL);
InitSyncOperation(operation, param);
- LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %s, devices = %s", syncId, param.mode,
- param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str());
+ LOGI("[Syncer] GenerateSyncId %" PRIu32 ", mode = %d, wait = %d, label = %s, devices = %s, syncTargets = %s",
+ syncId, param.mode, param.wait, label_.c_str(), GetSyncDevicesStr(param.devices).c_str(),
+ GetSyncTargetsStr(param.syncTargets).c_str());
engine = syncEngine_;
RefObject::IncObjRef(engine);
}
@@ -536,11 +543,21 @@ int GenericSyncer::SyncConditionCheck(const SyncParma &param, const ISyncEngine
return E_OK;
}
-bool GenericSyncer::IsValidDevices(const std::vector<std::string> &devices) const
+bool GenericSyncer::IsValidDevices(const std::vector<std::string> &devices,
+ const std::vector<DeviceSyncTarget> &syncTargets) const
{
- if (devices.empty()) {
- LOGE("[Syncer] devices is empty!");
- return false;
+ if (syncTargets.empty()) {
+ if (devices.empty()) {
+ LOGE("[Syncer] devices is empty!");
+ return false;
+ }
+ } else {
+ for (const auto &target : syncTargets) {
+ if (target.device.empty()) {
+ LOGE("[Syncer] there is a device in sync targets is empty!");
+ return false;
+ }
+ }
}
return true;
}
@@ -858,6 +875,21 @@ std::string GenericSyncer::GetSyncDevicesStr(const std::vector<std::string> &dev
return syncDevices.substr(0, syncDevices.size() - 1);
}
+std::string GenericSyncer::GetSyncTargetsStr(const std::vector<DeviceSyncTarget> &syncTargets) const
+{
+ std::string syncTargetsStr;
+ for (const auto &target : syncTargets) {
+ syncTargetsStr += DBCommon::StringMasking(target.device);
+ syncTargetsStr += "-";
+ syncTargetsStr += target.userId;
+ syncTargetsStr += ",";
+ }
+ if (syncTargetsStr.empty()) {
+ return "";
+ }
+ return syncTargetsStr.substr(0, syncTargetsStr.size() - 1);
+}
+
int GenericSyncer::StatusCheck() const
{
if (!initialized_) {
@@ -882,7 +914,7 @@ int GenericSyncer::SyncPreCheck(const SyncParma &param) const
if (errCode != E_OK) {
return errCode;
}
- if (!IsValidDevices(param.devices) || !IsValidMode(param.mode)) { // LCOV_EXCL_BR_LINE
+ if (!IsValidDevices(param.devices, param.syncTargets) || !IsValidMode(param.mode)) { // LCOV_EXCL_BR_LINE
return -E_INVALID_ARGS;
}
if (IsQueuedManualSyncFull(param.mode, param.wait)) { // LCOV_EXCL_BR_LINE
diff --git a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h
index 183fbb1fa..ef5b1820a 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h
+++ b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h
@@ -160,7 +160,8 @@ protected:
virtual int SyncConditionCheck(const SyncParma &param, const ISyncEngine *engine, ISyncInterface *storage) const;
// Check if the devices arg is valid
- bool IsValidDevices(const std::vector<std::string> &devices) const;
+ bool IsValidDevices(const std::vector<std::string> &devices,
+ const std::vector<DeviceSyncTarget> &syncTargets) const;
// Used Clear all SyncOperations.
// isClosedOperation is false while userChanged
@@ -185,6 +186,8 @@ protected:
std::string GetSyncDevicesStr(const std::vector<std::string> &devices) const;
+ std::string GetSyncTargetsStr(const std::vector<DeviceSyncTarget> &syncTargets) const;
+
void InitSyncOperation(SyncOperation *operation, const SyncParma &param);
int StatusCheck() const;
diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.cpp
index e392bb459..247f55a9f 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.cpp
@@ -86,6 +86,10 @@ int SingleVerRelationalSyncer::GenerateEachSyncTask(const SyncParma &param, uint
subParam.onComplete = [this, subSyncId, syncId, subParam](const std::map<std::string, int> &devicesMap) {
DoOnSubSyncComplete(subSyncId, syncId, subParam, devicesMap);
};
+ subParam.onCompleteV2 = [this, subSyncId, syncId, subParam]
+ (const std::map<DeviceSyncTarget, int> &devicesMap) {
+ DoOnSubSyncCompleteV2(subSyncId, syncId, subParam, devicesMap);
+ };
{
std::lock_guard<std::mutex> lockGuard(syncMapLock_);
fullSyncIdMap_[syncId].insert(subSyncId);
@@ -123,6 +127,27 @@ void SingleVerRelationalSyncer::DoOnSubSyncComplete(const uint32_t subSyncId, co
}
}
+void SingleVerRelationalSyncer::DoOnSubSyncCompleteV2(const uint32_t subSyncId, const uint32_t syncId,
+ const SyncParma &param, const std::map<DeviceSyncTarget, int> &devicesMap)
+{
+ bool allFinish = true;
+ {
+ std::lock_guard<std::mutex> lockGuard(syncMapLock_);
+ fullSyncIdMap_[syncId].erase(subSyncId);
+ allFinish = fullSyncIdMap_[syncId].empty();
+ TableStatus tableStatus;
+ tableStatus.tableName = param.syncQuery.GetRelationTableName();
+ for (const auto &item : devicesMap) {
+ tableStatus.status = static_cast<DBStatus>(item.second);
+ resMapV2_[syncId][item.first].push_back(tableStatus);
+ }
+ }
+ // block sync do callback in sync function
+ if (allFinish && !param.wait) {
+ DoOnComplete(param, syncId);
+ }
+}
+
void SingleVerRelationalSyncer::DoRollBack(std::set<uint32_t> &subSyncIdSet)
{
for (const auto &removeId : subSyncIdSet) {
@@ -135,6 +160,10 @@ void SingleVerRelationalSyncer::DoRollBack(std::set<uint32_t> &subSyncIdSet)
void SingleVerRelationalSyncer::DoOnComplete(const SyncParma &param, uint32_t syncId)
{
+ if (param.relationOnCompleteV2) {
+ DoOnCompleteV2(param, syncId);
+ return;
+ }
if (!param.relationOnComplete) {
return;
}
@@ -158,6 +187,31 @@ void SingleVerRelationalSyncer::DoOnComplete(const SyncParma &param, uint32_t sy
}
}
+void SingleVerRelationalSyncer::DoOnCompleteV2(const SyncParma &param, uint32_t syncId)
+{
+ if (!param.relationOnCompleteV2) {
+ return;
+ }
+ std::map<DeviceSyncTarget, std::vector<TableStatus>> syncRes;
+ std::map<DeviceSyncTarget, std::vector<TableStatus>> tmpMap;
+ {
+ std::lock_guard<std::mutex> lockGuard(syncMapLock_);
+ tmpMap = resMapV2_[syncId];
+ }
+ for (const auto &devicesRes : tmpMap) {
+ for (const auto &tableRes : devicesRes.second) {
+ syncRes[devicesRes.first].push_back(
+ {tableRes.tableName, static_cast<DBStatus>(tableRes.status)});
+ }
+ }
+ param.relationOnCompleteV2(syncRes);
+ {
+ std::lock_guard<std::mutex> lockGuard(syncMapLock_);
+ resMapV2_.erase(syncId);
+ fullSyncIdMap_.erase(syncId);
+ }
+}
+
void SingleVerRelationalSyncer::EnableAutoSync(bool enable)
{
(void)enable;
diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.h b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.h
index df637af8c..792376be5 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.h
+++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_relational_syncer.h
@@ -45,8 +45,11 @@ private:
void DoRollBack(std::set<uint32_t> &subSyncIdSet);
void DoOnComplete(const SyncParma &param, uint32_t syncId);
+ void DoOnCompleteV2(const SyncParma &param, uint32_t syncId);
void DoOnSubSyncComplete(const uint32_t subSyncId, const uint32_t syncId,
const SyncParma &param, const std::map<std::string, int> &devicesMap);
+ void DoOnSubSyncCompleteV2(const uint32_t subSyncId, const uint32_t syncId,
+ const SyncParma &param, const std::map<DeviceSyncTarget, int> &devicesMap);
void SchemaChangeCallback();
@@ -57,6 +60,7 @@ private:
mutable std::mutex syncMapLock_;
std::map<uint32_t, std::set<uint32_t>> fullSyncIdMap_;
std::map<uint32_t, std::map<std::string, std::vector<TableStatus>>> resMap_;
+ std::map<uint32_t, std::map<DeviceSyncTarget, std::vector<TableStatus>>> resMapV2_;
};
}
#endif
diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_engine.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_engine.cpp
index c5254c5af..a78f9b3e0 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_engine.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_engine.cpp
@@ -52,10 +52,12 @@ void SingleVerSyncEngine::EnableClearRemoteStaleData(bool enable)
LOGI("[SingleVerSyncEngine][EnableClearRemoteStaleData] enabled %d", enable);
needClearRemoteStaleData_ = enable;
std::unique_lock<std::mutex> lock(contextMapLock_);
- for (auto &iter : syncTaskContextMap_) {
- auto context = static_cast<SingleVerSyncTaskContext *>(iter.second);
- if (context != nullptr) { // LCOV_EXCL_BR_LINE
- context->EnableClearRemoteStaleData(enable);
+ for (auto &deviceIter : syncTaskContextMap_) {
+ for (auto &userIdIter : deviceIter.second) {
+ auto context = static_cast<SingleVerSyncTaskContext *>(userIdIter.second);
+ if (context != nullptr) { // LCOV_EXCL_BR_LINE
+ context->EnableClearRemoteStaleData(enable);
+ }
}
}
}
diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp
index 9ef639894..dcf0658d9 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp
@@ -117,7 +117,11 @@ int SingleVerSyncTaskContext::AddSyncOperation(SyncOperation *operation)
});
if (iter != requestTargetQueue_.end()) {
static_cast<SingleVerSyncTarget *>(*iter)->SetEndWaterMark(timeHelper_->GetTime());
- operation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
+ if (userId_.empty()) {
+ operation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
+ } else {
+ operation->SetStatus({deviceId_, userId_}, SyncOperation::OP_FINISHED_ALL);
+ }
return E_OK;
}
}
@@ -231,7 +235,11 @@ void SingleVerSyncTaskContext::Abort(int status)
{
std::lock_guard<std::mutex> lock(operationLock_);
if (syncOperation_ != nullptr) {
- syncOperation_->SetStatus(deviceId_, status, GetCommErrCode());
+ if (userId_.empty()) {
+ syncOperation_->SetStatus(deviceId_, status, GetCommErrCode());
+ } else {
+ syncOperation_->SetStatus({deviceId_, userId_}, status, GetCommErrCode());
+ }
if ((status >= SyncOperation::OP_FINISHED_ALL)) {
UnlockObj();
if (syncOperation_->CheckIsAllFinished()) {
@@ -279,10 +287,11 @@ void SingleVerSyncTaskContext::ClearAllSyncTask()
continue; // not exit this scene
}
LOGI("[SingleVerSyncTaskContext] killing syncId=%d,dev=%s", tmpOperation->GetSyncId(), STR_MASK(deviceId_));
- if (target->IsAutoSync()) {
- tmpOperation->SetStatus(deviceId_, SyncOperation::OP_FINISHED_ALL);
+ int status = target->IsAutoSync() ? SyncOperation::OP_FINISHED_ALL : SyncOperation::OP_COMM_ABNORMAL;
+ if (userId_.empty()) {
+ tmpOperation->SetStatus(deviceId_, status);
} else {
- tmpOperation->SetStatus(deviceId_, SyncOperation::OP_COMM_ABNORMAL);
+ tmpOperation->SetStatus({deviceId_, userId_}, status);
}
if (tmpOperation->CheckIsAllFinished()) {
tmpOperation->Finished();
diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp
index 5d4a60e55..9b6063c15 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp
@@ -115,9 +115,11 @@ int SyncEngine::Close()
// Clear SyncContexts
{
std::unique_lock<std::mutex> lock(contextMapLock_);
- for (auto &iter : syncTaskContextMap_) {
- decContext.push_back(iter.second);
- iter.second = nullptr;
+ for (auto &deviceIter : syncTaskContextMap_) {
+ for (auto &userIdIter : deviceIter.second) {
+ decContext.push_back(userIdIter.second);
+ userIdIter.second = nullptr;
+ }
}
syncTaskContextMap_.clear();
}
@@ -163,6 +165,14 @@ int SyncEngine::AddSyncOperation(SyncOperation *operation)
return -E_INVALID_ARGS;
}
+ if (operation->GetSyncTargets().empty()) {
+ return AddSyncOperationForDevices(operation);
+ }
+ return AddSyncOperationForSyncTargets(operation);
+}
+
+int SyncEngine::AddSyncOperationForDevices(SyncOperation *operation)
+{
std::vector<std::string> devices = operation->GetDevices();
std::string localDeviceId;
int errCode = GetLocalDeviceId(localDeviceId);
@@ -177,20 +187,45 @@ int SyncEngine::AddSyncOperation(SyncOperation *operation)
continue;
}
operation->SetStatus(deviceId, SyncOperation::OP_WAITING);
- if (AddSyncOperForContext(deviceId, operation) != E_OK) {
+ if (AddSyncOperForContext(deviceId, operation, "") != E_OK) {
operation->SetStatus(deviceId, SyncOperation::OP_FAILED);
}
}
return E_OK;
}
+int SyncEngine::AddSyncOperationForSyncTargets(SyncOperation *operation)
+{
+ std::vector<DeviceSyncTarget> syncTargets = operation->GetSyncTargets();
+ std::string localDeviceId;
+ int errCode = GetLocalDeviceId(localDeviceId);
+ for (const auto &syncTarget : syncTargets) {
+ if (errCode != E_OK) {
+ operation->SetStatus(syncTarget, errCode == -E_BUSY ?
+ SyncOperation::OP_BUSY_FAILURE : SyncOperation::OP_FAILED);
+ continue;
+ }
+ if (!CheckDeviceIdValid(syncTarget.device, localDeviceId)) {
+ operation->SetStatus(syncTarget, SyncOperation::OP_INVALID_ARGS);
+ continue;
+ }
+ operation->SetStatus(syncTarget, SyncOperation::OP_WAITING);
+ if (AddSyncOperForContext(syncTarget.device, operation, syncTarget.userId) != E_OK) {
+ operation->SetStatus(syncTarget, SyncOperation::OP_FAILED);
+ }
+ }
+ return E_OK;
+}
+
void SyncEngine::RemoveSyncOperation(int syncId)
{
std::lock_guard<std::mutex> lock(contextMapLock_);
- for (auto &iter : syncTaskContextMap_) {
- ISyncTaskContext *context = iter.second;
- if (context != nullptr) {
- context->RemoveSyncOperation(syncId);
+ for (auto &deviceIter : syncTaskContextMap_) {
+ for (auto &userIdIter : deviceIter.second) {
+ ISyncTaskContext *context = userIdIter.second;
+ if (context != nullptr) {
+ context->RemoveSyncOperation(syncId);
+ }
}
}
}
@@ -297,7 +332,8 @@ int SyncEngine::InitComunicator(const ISyncInterface *syncInterface)
}
errCode = communicator_->RegOnMessageCallback(
- [this](const std::string &targetDev, Message *inMsg) { MessageReciveCallback(targetDev, inMsg); }, []() {});
+ [this](const std::string &targetDev, const std::string &userId, Message *inMsg)
+ { MessageReciveCallback(targetDev, userId, inMsg); }, []() {});
if (errCode != E_OK) {
LOGE("[SyncEngine] SyncRequestCallback register failed! err = %d", errCode);
communicatorAggregator->ReleaseCommunicator(communicator_, GetUserId(syncInterface));
@@ -320,16 +356,16 @@ int SyncEngine::InitComunicator(const ISyncInterface *syncInterface)
return errCode;
}
-int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation)
+int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation, const std::string &userId)
{
int errCode = E_OK;
ISyncTaskContext *context = nullptr;
{
std::lock_guard<std::mutex> lock(contextMapLock_);
- context = FindSyncTaskContext(deviceId);
+ context = FindSyncTaskContext(deviceId, userId);
if (context == nullptr) {
if (!IsKilled()) {
- context = GetSyncTaskContext(deviceId, errCode);
+ context = GetSyncTaskContext(deviceId, userId, errCode);
}
if (context == nullptr) {
return errCode;
@@ -350,8 +386,8 @@ int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation
return errCode;
}
-void SyncEngine::MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator,
- Message *inMsg)
+void SyncEngine::MessageReciveCallbackTask(ISyncTaskContext *context, const std::string &userId,
+ const ICommunicator *communicator, Message *inMsg)
{
std::string deviceId = context->GetDeviceId();
@@ -372,10 +408,11 @@ void SyncEngine::MessageReciveCallbackTask(ISyncTaskContext *context, const ICom
delete inMsg;
inMsg = nullptr;
MSG_CALLBACK_OUT_NOT_DEL:
- ScheduleTaskOut(context, communicator);
+ ScheduleTaskOut(context, userId, communicator);
}
-void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg)
+void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const std::string &userId,
+ const ICommunicator *communicator, Message *inMsg)
{
std::string deviceId = context->GetDeviceId();
if (onRemoteDataChanged_ && deviceManager_->IsDeviceOnline(deviceId)) {
@@ -385,18 +422,18 @@ void SyncEngine::RemoteDataChangedTask(ISyncTaskContext *context, const ICommuni
}
delete inMsg;
inMsg = nullptr;
- ScheduleTaskOut(context, communicator);
+ ScheduleTaskOut(context, userId, communicator);
}
-void SyncEngine::ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator)
+void SyncEngine::ScheduleTaskOut(ISyncTaskContext *context, const std::string &userId, const ICommunicator *communicator)
{
- (void)DealMsgUtilQueueEmpty();
+ (void)DealMsgUtilQueueEmpty(userId);
DecExecTaskCount();
RefObject::DecObjRef(communicator);
RefObject::DecObjRef(context);
}
-int SyncEngine::DealMsgUtilQueueEmpty()
+int SyncEngine::DealMsgUtilQueueEmpty(const std::string &userId)
{
if (!isActive_) {
return -E_BUSY; // db is closing just return
@@ -417,11 +454,11 @@ int SyncEngine::DealMsgUtilQueueEmpty()
// it will deal with the first message in queue, we should increase object reference counts and sure that resources
// could be prevented from destroying by other threads.
do {
- ISyncTaskContext *nextContext = GetContextForMsg(inMsg->GetTarget(), errCode);
+ ISyncTaskContext *nextContext = GetContextForMsg(inMsg->GetTarget(), userId, errCode);
if (errCode != E_OK) {
break;
}
- errCode = ScheduleDealMsg(nextContext, inMsg);
+ errCode = ScheduleDealMsg(nextContext, userId, inMsg);
if (errCode != E_OK) {
RefObject::DecObjRef(nextContext);
}
@@ -434,12 +471,12 @@ int SyncEngine::DealMsgUtilQueueEmpty()
return errCode;
}
-ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, int &errCode)
+ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, const std::string &userId, int &errCode)
{
ISyncTaskContext *context = nullptr;
{
std::lock_guard<std::mutex> lock(contextMapLock_);
- context = FindSyncTaskContext(targetDev);
+ context = FindSyncTaskContext(targetDev, userId);
if (context != nullptr) { // LCOV_EXCL_BR_LINE
if (context->IsKilled()) {
errCode = -E_OBJ_IS_KILLED;
@@ -450,7 +487,7 @@ ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, int
errCode = -E_OBJ_IS_KILLED;
return nullptr;
}
- context = GetSyncTaskContext(targetDev, errCode);
+ context = GetSyncTaskContext(targetDev, userId, errCode);
if (context == nullptr) {
return nullptr;
}
@@ -461,7 +498,7 @@ ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, int
return context;
}
-int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg)
+int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, const std::string &userId, Message *inMsg)
{
if (inMsg == nullptr) {
LOGE("[SyncEngine] MessageReciveCallback inMsg is null!");
@@ -477,10 +514,11 @@ int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg)
int errCode = E_OK;
// deal remote local data changed message
if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
- RemoteDataChangedTask(context, comProxy, inMsg);
+ RemoteDataChangedTask(context, userId, comProxy, inMsg);
} else {
errCode = RuntimeContext::GetInstance()->ScheduleTask(
- [this, context, comProxy, inMsg] { MessageReciveCallbackTask(context, comProxy, inMsg); });
+ [this, context, userId, comProxy, inMsg]
+ { MessageReciveCallbackTask(context, userId, comProxy, inMsg); });
}
if (errCode != E_OK) {
@@ -490,10 +528,10 @@ int SyncEngine::ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg)
return errCode;
}
-void SyncEngine::MessageReciveCallback(const std::string &targetDev, Message *inMsg)
+void SyncEngine::MessageReciveCallback(const std::string &targetDev, const std::string &userId, Message *inMsg)
{
IncExecTaskCount();
- int errCode = MessageReciveCallbackInner(targetDev, inMsg);
+ int errCode = MessageReciveCallbackInner(targetDev, userId, inMsg);
if (errCode != E_OK) {
if (inMsg != nullptr) {
delete inMsg;
@@ -504,7 +542,7 @@ void SyncEngine::MessageReciveCallback(const std::string &targetDev, Message *in
}
}
-int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg)
+int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, const std::string &userId, Message *inMsg)
{
if (targetDev.empty() || inMsg == nullptr) {
LOGE("[SyncEngine][MessageReciveCallback] from a invalid device or inMsg is null ");
@@ -544,12 +582,12 @@ int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message
}
int errCode = E_OK;
- ISyncTaskContext *nextContext = GetContextForMsg(targetDev, errCode);
+ ISyncTaskContext *nextContext = GetContextForMsg(targetDev, userId, errCode);
if (errCode != E_OK) {
return errCode;
}
LOGD("[SyncEngine] MessageReciveCallback MSG ID = %d", inMsg->GetMessageId());
- return ScheduleDealMsg(nextContext, inMsg);
+ return ScheduleDealMsg(nextContext, userId, inMsg);
}
void SyncEngine::PutMsgIntoQueue(const std::string &targetDev, Message *inMsg, int msgSize)
@@ -598,34 +636,41 @@ int SyncEngine::GetMsgSize(const Message *inMsg) const
}
}
-ISyncTaskContext *SyncEngine::FindSyncTaskContext(const std::string &deviceId)
+ISyncTaskContext *SyncEngine::FindSyncTaskContext(const std::string &deviceId, const std::string &userId)
{
- auto iter = syncTaskContextMap_.find(deviceId);
- if (iter != syncTaskContextMap_.end()) {
- ISyncTaskContext *context = iter->second;
- return context;
+ auto deviceIter = syncTaskContextMap_.find(deviceId);
+ if (deviceIter != syncTaskContextMap_.end()) {
+ auto userIdIter = deviceIter->second.find(userId);
+ if (userIdIter != deviceIter->second.end()) {
+ ISyncTaskContext *context = userIdIter->second;
+ return context;
+ }
}
return nullptr;
}
-ISyncTaskContext *SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId)
+std::map<std::string, ISyncTaskContext *> SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId)
{
- ISyncTaskContext *context = nullptr;
+ std::map<std::string, ISyncTaskContext *> contexts;
std::lock_guard<std::mutex> lock(contextMapLock_);
- context = FindSyncTaskContext(deviceId);
- if (context == nullptr) {
+ if (syncTaskContextMap_.find(deviceId) == syncTaskContextMap_.end()) {
LOGI("[SyncEngine] dev=%s, context is null, no need to clear sync operation", STR_MASK(deviceId));
- return nullptr;
+ return contexts;
}
- if (context->IsKilled()) { // LCOV_EXCL_BR_LINE
- LOGI("[SyncEngine] context is killing");
- return nullptr;
+ for (const auto &context : syncTaskContextMap_[deviceId]) {
+ if (context.second == nullptr) {
+ continue;
+ }
+ if (context.second->IsKilled()) {
+ LOGI("[SyncEngine] context[%s] is killing", context.first.c_str());
+ }
+ RefObject::IncObjRef(context.second);
+ contexts[context.first] = context.second;
}
- RefObject::IncObjRef(context);
- return context;
+ return contexts;
}
-ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, int &errCode)
+ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, const std::string &userId, int &errCode)
{
auto storage = GetAndIncSyncInterface();
if (storage == nullptr) {
@@ -647,7 +692,7 @@ ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, in
context = nullptr;
return nullptr;
}
- syncTaskContextMap_.insert(std::pair<std::string, ISyncTaskContext *>(deviceId, context));
+ syncTaskContextMap_[deviceId].insert(std::pair<std::string, ISyncTaskContext *>(userId, context));
// IncRef for SyncEngine to make sure SyncEngine is valid when context access
RefObject::IncObjRef(this);
context->OnLastRef([this, deviceId, storage]() {
@@ -759,10 +804,12 @@ void SyncEngine::SetSyncRetry(bool isRetry)
isSyncRetry_ = isRetry;
LOGI("[SyncEngine] SetSyncRetry:%d ok", isRetry);
std::lock_guard<std::mutex> lock(contextMapLock_);
- for (auto &iter : syncTaskContextMap_) {
- ISyncTaskContext *context = iter.second;
- if (context != nullptr) { // LCOV_EXCL_BR_LINE
- context->SetSyncRetry(isRetry);
+ for (auto &deviceIter : syncTaskContextMap_) {
+ for (auto &userIdEntry : deviceIter.second) {
+ ISyncTaskContext *context = userIdEntry.second;
+ if (context != nullptr) { // LCOV_EXCL_BR_LINE
+ context->SetSyncRetry(isRetry);
+ }
}
}
}
@@ -860,7 +907,7 @@ void SyncEngine::OfflineHandleByDevice(const std::string &deviceId, ISyncInterfa
static_cast<SyncGenericInterface *>(storage)->GetDBInfo(dbInfo);
RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, deviceId);
// get context and Inc context if context is not nullptr
- ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId);
+ std::map<std::string, ISyncTaskContext *> contexts = GetSyncTaskContextAndInc(deviceId);
{
std::lock_guard<std::mutex> lock(communicatorProxyLock_);
if (communicatorProxy_ == nullptr) {
@@ -868,25 +915,31 @@ void SyncEngine::OfflineHandleByDevice(const std::string &deviceId, ISyncInterfa
}
if (communicatorProxy_->IsDeviceOnline(deviceId)) { // LCOV_EXCL_BR_LINE
LOGI("[SyncEngine] target dev=%s is online, no need to clear task.", STR_MASK(deviceId));
- RefObject::DecObjRef(context);
+ for (const auto &context : contexts) {
+ RefObject::DecObjRef(context.second);
+ }
return;
}
}
// means device is offline, clear local subscribe
subManager_->ClearLocalSubscribeQuery(deviceId);
// clear sync task
- if (context != nullptr) {
- context->ClearAllSyncTask();
- RefObject::DecObjRef(context);
+ for (const auto &context : contexts) {
+ if (context.second != nullptr) {
+ context.second->ClearAllSyncTask();
+ RefObject::DecObjRef(context.second);
+ }
}
}
void SyncEngine::ClearAllSyncTaskByDevice(const std::string &deviceId)
{
- ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId);
- if (context != nullptr) {
- context->ClearAllSyncTask();
- RefObject::DecObjRef(context);
+ std::map<std::string, ISyncTaskContext *> contexts = GetSyncTaskContextAndInc(deviceId);
+ for (const auto &context : contexts) {
+ if (context.second != nullptr) {
+ context.second->ClearAllSyncTask();
+ RefObject::DecObjRef(context.second);
+ }
}
}
@@ -932,7 +985,8 @@ ICommunicator *SyncEngine::AllocCommunicator(const std::string &identifier, int
}
errCode = communicator->RegOnMessageCallback(
- [this](const std::string &targetDev, Message *inMsg) { MessageReciveCallback(targetDev, inMsg); }, []() {});
+ [this](const std::string &targetDev, const std::string &userId, Message *inMsg)
+ { MessageReciveCallback(targetDev, userId, inMsg); }, []() {});
if (errCode != E_OK) {
LOGE("[SyncEngine] SyncRequestCallback register failed in SetEqualIdentifier! err = %d", errCode);
communicatorAggregator->ReleaseCommunicator(communicator, userId);
@@ -1065,13 +1119,15 @@ void SyncEngine::SchemaChange()
std::vector<ISyncTaskContext *> tmpContextVec;
{
std::lock_guard<std::mutex> lock(contextMapLock_);
- for (const auto &entry : syncTaskContextMap_) { // LCOV_EXCL_BR_LINE
- auto context = entry.second;
- if (context == nullptr || context->IsKilled()) {
- continue;
+ for (const auto &deviceEntry : syncTaskContextMap_) { // LCOV_EXCL_BR_LINE
+ for (const auto &userIdEntry : deviceEntry.second) {
+ auto context = userIdEntry.second;
+ if (context == nullptr || context->IsKilled()) {
+ continue;
+ }
+ RefObject::IncObjRef(context);
+ tmpContextVec.push_back(context);
}
- RefObject::IncObjRef(context);
- tmpContextVec.push_back(context);
}
}
for (const auto &entryContext : tmpContextVec) {
@@ -1112,9 +1168,11 @@ void SyncEngine::Dump(int fd)
DBDumpHelper::Dump(fd, "\t]\n\tcontext info [\n");
// dump context info
std::lock_guard<std::mutex> autoLock(contextMapLock_);
- for (const auto &entry : syncTaskContextMap_) {
- if (entry.second != nullptr) {
- entry.second->Dump(fd);
+ for (const auto &deviceEntry : syncTaskContextMap_) {
+ for (const auto &userIdEntry : deviceEntry.second) {
+ if (userIdEntry.second != nullptr) {
+ userIdEntry.second->Dump(fd);
+ }
}
}
DBDumpHelper::Dump(fd, "\t]\n\n");
@@ -1198,17 +1256,19 @@ void SyncEngine::AbortMachineIfNeed(uint32_t syncId)
std::vector<ISyncTaskContext *> abortContexts;
{
std::lock_guard<std::mutex> lock(contextMapLock_);
- for (const auto &entry : syncTaskContextMap_) {
- auto context = entry.second;
- if (context == nullptr || context->IsKilled()) { // LCOV_EXCL_BR_LINE
- continue;
- }
- RefObject::IncObjRef(context);
- if (context->GetSyncId() == syncId) {
+ for (const auto &deviceEntry : syncTaskContextMap_) {
+ for (const auto &userIdEntry : deviceEntry.second) {
+ auto context = userIdEntry.second;
+ if (context == nullptr || context->IsKilled()) { // LCOV_EXCL_BR_LINE
+ continue;
+ }
RefObject::IncObjRef(context);
- abortContexts.push_back(context);
+ if (context->GetSyncId() == syncId) {
+ RefObject::IncObjRef(context);
+ abortContexts.push_back(context);
+ }
+ RefObject::DecObjRef(context);
}
- RefObject::DecObjRef(context);
}
}
for (const auto &abortContext : abortContexts) {
@@ -1276,9 +1336,11 @@ void SyncEngine::TimeChange()
{
// copy context
std::lock_guard<std::mutex> lock(contextMapLock_);
- for (const auto &iter : syncTaskContextMap_) {
- RefObject::IncObjRef(iter.second);
- decContext.push_back(iter.second);
+ for (const auto &deviceIter : syncTaskContextMap_) {
+ for (const auto &userIdIter : deviceIter.second) {
+ RefObject::IncObjRef(userIdIter.second);
+ decContext.push_back(userIdIter.second);
+ }
}
}
for (auto &iter : decContext) {
@@ -1293,9 +1355,11 @@ int32_t SyncEngine::GetResponseTaskCount()
{
// copy context
std::lock_guard<std::mutex> lock(contextMapLock_);
- for (const auto &iter : syncTaskContextMap_) {
- RefObject::IncObjRef(iter.second);
- decContext.push_back(iter.second);
+ for (const auto &deviceIter : syncTaskContextMap_) {
+ for (const auto &userIdIter : deviceIter.second) {
+ RefObject::IncObjRef(userIdIter.second);
+ decContext.push_back(userIdIter.second);
+ }
}
}
int32_t taskCount = 0;
diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h
index 78c931922..f85f7ba91 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h
+++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h
@@ -142,8 +142,8 @@ protected:
virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0;
// Find SyncTaskContext from the map
- ISyncTaskContext *FindSyncTaskContext(const std::string &deviceId);
- ISyncTaskContext *GetSyncTaskContextAndInc(const std::string &deviceId);
+ ISyncTaskContext *FindSyncTaskContext(const std::string &deviceId, const std::string &userId);
+ std::map<std::string, ISyncTaskContext *> GetSyncTaskContextAndInc(const std::string &deviceId);
void GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
void GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
@@ -151,12 +151,15 @@ protected:
ISyncInterface *GetAndIncSyncInterface();
void SetSyncInterface(ISyncInterface *syncInterface);
- ISyncTaskContext *GetSyncTaskContext(const std::string &deviceId, int &errCode);
+ ISyncTaskContext *GetSyncTaskContext(const std::string &deviceId, const std::string &userId, int &errCode);
+
+ int AddSyncOperationForDevices(SyncOperation *operation);
+ int AddSyncOperationForSyncTargets(SyncOperation *operation);
std::mutex storageMutex_;
ISyncInterface *syncInterface_;
// Used to store all send sync task infos (such as pull sync response, and push sync request)
- std::map<std::string, ISyncTaskContext *> syncTaskContextMap_;
+ std::map<std::string, std::map<std::string, ISyncTaskContext *>> syncTaskContextMap_;
std::mutex contextMapLock_;
std::shared_ptr<SubscribeManager> subManager_;
std::function<void(const InternalSyncParma &param)> queryAutoSyncCallback_;
@@ -171,20 +174,22 @@ private:
int InitComunicator(const ISyncInterface *syncInterface);
// Add the sync task info to the map.
- int AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation);
+ int AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation, const std::string &userId);
// Sync Request CallbackTask run at a sub thread.
- void MessageReciveCallbackTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg);
+ void MessageReciveCallbackTask(ISyncTaskContext *context, const std::string &userId,
+ const ICommunicator *communicator, Message *inMsg);
- void RemoteDataChangedTask(ISyncTaskContext *context, const ICommunicator *communicator, Message *inMsg);
+ void RemoteDataChangedTask(ISyncTaskContext *context, const std::string &userId, const ICommunicator *communicator,
+ Message *inMsg);
- void ScheduleTaskOut(ISyncTaskContext *context, const ICommunicator *communicator);
+ void ScheduleTaskOut(ISyncTaskContext *context, const std::string &userId, const ICommunicator *communicator);
// wrapper of MessageReciveCallbackTask
- void MessageReciveCallback(const std::string &targetDev, Message *inMsg);
+ void MessageReciveCallback(const std::string &targetDev, const std::string &userId, Message *inMsg);
// Sync Request Callback
- int MessageReciveCallbackInner(const std::string &targetDev, Message *inMsg);
+ int MessageReciveCallbackInner(const std::string &targetDev, const std::string &userId, Message *inMsg);
// Exec the given SyncTarget. and callback onComplete.
int ExecSyncTask(ISyncTaskContext *context);
@@ -196,12 +201,12 @@ private:
int GetMsgSize(const Message *inMsg) const;
// Do not run MessageReceiveCallbackTask until msgQueue is empty
- int DealMsgUtilQueueEmpty();
+ int DealMsgUtilQueueEmpty(const std::string &userId);
// Handle message in order.
- int ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg);
+ int ScheduleDealMsg(ISyncTaskContext *context, const std::string &userId, Message *inMsg);
- ISyncTaskContext *GetContextForMsg(const std::string &targetDev, int &errCode);
+ ISyncTaskContext *GetContextForMsg(const std::string &targetDev, const std::string &userId, int &errCode);
ICommunicator *AllocCommunicator(const std::string &identifier, int &errCode, std::string userId = "");
diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp
index 4fd93de50..05774ad5d 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp
@@ -121,7 +121,8 @@ void SyncTaskContext::SetOperationStatus(int status)
}
int finalStatus = status;
- int operationStatus = syncOperation_->GetStatus(deviceId_);
+ int operationStatus = userId_.empty() ? syncOperation_->GetStatus(deviceId_) :
+ syncOperation_->GetStatus({deviceId_, userId_});
if (status == SyncOperation::OP_SEND_FINISHED && operationStatus == SyncOperation::OP_RECV_FINISHED) {
if (GetTaskErrCode() == -E_EKEYREVOKED) { // LCOV_EXCL_BR_LINE
finalStatus = SyncOperation::OP_EKEYREVOKED_FAILURE;
@@ -135,7 +136,11 @@ void SyncTaskContext::SetOperationStatus(int status)
finalStatus = SyncOperation::OP_FINISHED_ALL;
}
}
- syncOperation_->SetStatus(deviceId_, finalStatus);
+ if (userId_.empty()) {
+ syncOperation_->SetStatus(deviceId_, finalStatus);
+ } else {
+ syncOperation_->SetStatus({deviceId_, userId_}, finalStatus);
+ }
if (finalStatus >= SyncOperation::OP_FINISHED_ALL) {
SaveLastPushTaskExecStatus(finalStatus);
}
@@ -215,7 +220,10 @@ int SyncTaskContext::GetOperationStatus() const
if (syncOperation_ == nullptr) {
return SyncOperation::OP_FINISHED_ALL;
}
- return syncOperation_->GetStatus(deviceId_);
+ if (userId_.empty()) {
+ return syncOperation_->GetStatus(deviceId_);
+ }
+ return syncOperation_->GetStatus({deviceId_, userId_});
}
void SyncTaskContext::SetMode(int mode)
diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h
index 42d53e7dc..0a8d0d6a0 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h
+++ b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h
@@ -260,6 +260,7 @@ protected:
volatile int status_;
volatile int taskExecStatus_;
std::string deviceId_;
+ std::string userId_;
std::string syncActionName_;
ISyncInterface *syncInterface_;
ICommunicator *communicator_;
diff --git a/frameworks/libs/distributeddb/syncer/src/sync_operation.cpp b/frameworks/libs/distributeddb/syncer/src/sync_operation.cpp
index 0e68d5fe6..0c635c4f6 100644
--- a/frameworks/libs/distributeddb/syncer/src/sync_operation.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/sync_operation.cpp
@@ -20,8 +20,8 @@
#include "performance_analysis.h"
namespace DistributedDB {
-SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices,
- int mode, const UserCallback &userCallback, bool isBlockSync)
+SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &devices, int mode,
+ const UserCallback &userCallback, bool isBlockSync)
: devices_(devices),
syncId_(syncId),
mode_(mode),
@@ -36,6 +36,22 @@ SyncOperation::SyncOperation(uint32_t syncId, const std::vector<std::string> &de
{
}
+SyncOperation::SyncOperation(uint32_t syncId, const std::vector<DeviceSyncTarget> &syncTargets, int mode,
+ const UserCallbackV2 &userCallback, bool isBlockSync)
+ : syncTargets_(syncTargets),
+ syncId_(syncId),
+ mode_(mode),
+ userCallbackV2_(userCallback),
+ isBlockSync_(isBlockSync),
+ isAutoSync_(false),
+ isFinished_(false),
+ semaphore_(nullptr),
+ query_(QuerySyncObject()),
+ isQuerySync_(false),
+ isAutoSubscribe_(false)
+{
+}
+
SyncOperation::~SyncOperation()
{
RefObject::DecObjRef(context_);
@@ -56,6 +72,13 @@ int SyncOperation::Initialize()
processInfo.syncId = syncId_;
syncProcessMap_.insert(std::pair<std::string, DeviceSyncProcess>(deviceId, processInfo));
}
+ for (const DeviceSyncTarget &syncTarget : syncTargets_) {
+ statusesV2_.insert(std::pair<DeviceSyncTarget, int>(syncTarget, OP_WAITING));
+ DeviceSyncProcess processInfo;
+ processInfo.errCode = static_cast<DBStatus>(OP_WAITING);
+ processInfo.syncId = syncId_;
+ syncProcessMapV2_.insert(std::pair<DeviceSyncTarget, DeviceSyncProcess>(syncTarget, processInfo));
+ }
if (mode_ == AUTO_PUSH) {
mode_ = PUSH;
@@ -103,7 +126,9 @@ void SyncOperation::SetStatus(const std::string &deviceId, int status, int commE
}
if (userSyncProcessCallback_) {
- if (syncProcessMap_[deviceId].errCode < static_cast<DBStatus>(OP_FINISHED_ALL)) {
+ if (syncProcessMap_.find(deviceId) == syncProcessMap_.end()) {
+ LOGW("[SyncOperation] Not found dev %s{private} in sync process!", deviceId.c_str());
+ } else if (syncProcessMap_[deviceId].errCode < static_cast<DBStatus>(OP_FINISHED_ALL)) {
syncProcessMap_[deviceId].errCode = static_cast<DBStatus>(status);
}
}
@@ -121,6 +146,42 @@ void SyncOperation::SetStatus(const std::string &deviceId, int status, int commE
}
}
+void SyncOperation::SetStatus(const DeviceSyncTarget &syncTarget, int status, int commErrCode)
+{
+ LOGD("[SyncOperation] SetStatus dev %s{private} user %s status %d commErrCode %d", syncTarget.device.c_str(),
+ syncTarget.userId.c_str(), status, commErrCode);
+ AutoLock lockGuard(this);
+ if (IsKilled()) {
+ LOGE("[SyncOperation] SetStatus failed, the SyncOperation has been killed!");
+ return;
+ }
+ if (isFinished_) {
+ LOGI("[SyncOperation] SetStatus already finished");
+ return;
+ }
+
+ if (userSyncProcessCallback_) {
+ if (syncProcessMapV2_.find(syncTarget) == syncProcessMapV2_.end()) {
+ LOGW("[SyncOperation] Not found dev %s{private} user %s in sync process!", syncTarget.device.c_str(),
+ syncTarget.userId.c_str());
+ } else if (syncProcessMapV2_[syncTarget].errCode < static_cast<DBStatus>(OP_FINISHED_ALL)) {
+ syncProcessMapV2_[syncTarget].errCode = static_cast<DBStatus>(status);
+ }
+ }
+
+ auto iter = statusesV2_.find(syncTarget);
+ if (iter != statusesV2_.end()) {
+ if (iter->second >= OP_FINISHED_ALL) {
+ return;
+ }
+ iter->second = status;
+ if (((status != OP_COMM_ABNORMAL) && (status != OP_TIMEOUT)) || (commErrCode == E_OK)) {
+ return;
+ }
+ commErrCodeMapV2_.insert(std::pair<DeviceSyncTarget, int>(syncTarget, commErrCode));
+ }
+}
+
void SyncOperation::SetUnfinishedDevStatus(int status)
{
LOGD("[SyncOperation] SetUnfinishedDevStatus status %d", status);
@@ -139,6 +200,12 @@ void SyncOperation::SetUnfinishedDevStatus(int status)
}
item.second = status;
}
+ for (auto &item : statusesV2_) {
+ if (item.second >= OP_FINISHED_ALL) {
+ continue;
+ }
+ item.second = status;
+ }
}
int SyncOperation::GetStatus(const std::string &deviceId) const
@@ -151,6 +218,16 @@ int SyncOperation::GetStatus(const std::string &deviceId) const
return -E_INVALID_ARGS;
}
+int SyncOperation::GetStatus(const DeviceSyncTarget &syncTarget) const
+{
+ AutoLock lockGuard(this);
+ auto iter = statusesV2_.find(syncTarget);
+ if (iter != statusesV2_.end()) {
+ return iter->second;
+ }
+ return -E_INVALID_ARGS;
+}
+
uint32_t SyncOperation::GetSyncId() const
{
return syncId_;
@@ -226,6 +303,11 @@ const std::vector<std::string> &SyncOperation::GetDevices() const
return devices_;
}
+const std::vector<DeviceSyncTarget> &SyncOperation::GetSyncTargets() const
+{
+ return syncTargets_;
+}
+
void SyncOperation::WaitIfNeed()
{
if (isBlockSync_ && (semaphore_ != nullptr)) {
@@ -333,6 +415,11 @@ bool SyncOperation::CheckIsAllFinished() const
return false;
}
}
+ for (const auto &iter : statusesV2_) {
+ if (iter.second < OP_FINISHED_ALL) {
+ return false;
+ }
+ }
return true;
}
diff --git a/frameworks/libs/distributeddb/syncer/src/sync_operation.h b/frameworks/libs/distributeddb/syncer/src/sync_operation.h
index f3f00a5de..387ecb9be 100644
--- a/frameworks/libs/distributeddb/syncer/src/sync_operation.h
+++ b/frameworks/libs/distributeddb/syncer/src/sync_operation.h
@@ -60,11 +60,14 @@ public:
};
using UserCallback = std::function<void(std::map<std::string, int>)>;
+ using UserCallbackV2 = std::function<void(std::map<DeviceSyncTarget, int>)>;
using OnSyncFinished = std::function<void(int)>;
using OnSyncFinalize = std::function<void(void)>;
- SyncOperation(uint32_t syncId, const std::vector<std::string> &devices, int mode,
- const UserCallback &userCallback, bool isBlockSync);
+ SyncOperation(uint32_t syncId, const std::vector<std::string> &devices, int mode, const UserCallback &userCallback,
+ bool isBlockSync);
+ SyncOperation(uint32_t syncId, const std::vector<DeviceSyncTarget> &syncTargets, int mode,
+ const UserCallbackV2 &userCallback, bool isBlockSync);
DISABLE_COPY_ASSIGN_MOVE(SyncOperation);
@@ -79,6 +82,7 @@ public:
// Set the sync status, running or finished
void SetStatus(const std::string &deviceId, int status, int commErrCode = E_OK);
+ void SetStatus(const DeviceSyncTarget &syncTarget, int status, int commErrCode = E_OK);
// Set the unfinished devices sync status, running or finished
void SetUnfinishedDevStatus(int status);
@@ -88,6 +92,7 @@ public:
// Get the sync status, running or finished
int GetStatus(const std::string &deviceId) const;
+ int GetStatus(const DeviceSyncTarget &syncTarget) const;
// Get the sync id.
uint32_t GetSyncId() const;
@@ -101,6 +106,8 @@ public:
// Get the deviceId of this sync status
const std::vector<std::string> &GetDevices() const;
+ const std::vector<DeviceSyncTarget> &GetSyncTargets() const;
+
// Wait if it's a block sync
void WaitIfNeed();
@@ -157,6 +164,7 @@ private:
// The device list
const std::vector<std::string> devices_;
+ const std::vector<DeviceSyncTarget> syncTargets_;
// The Syncid
uint32_t syncId_;
@@ -166,6 +174,7 @@ private:
// The callback caller registered
UserCallback userCallback_;
+ UserCallbackV2 userCallbackV2_;
// The callback caller registered, when sync timeout, call
OnSyncFinished onFinished_;
@@ -178,9 +187,11 @@ private:
// The device id we sync with
std::map<std::string, int> statuses_;
+ std::map<DeviceSyncTarget, int> statusesV2_;
// passthrough errCode
std::map<std::string, int> commErrCodeMap_;
+ std::map<DeviceSyncTarget, int> commErrCodeMapV2_;
// Is this operation is a block sync
volatile bool isBlockSync_;
@@ -205,6 +216,7 @@ private:
// The device id we syncProcess with
std::map<std::string, DeviceSyncProcess> syncProcessMap_;
+ std::map<DeviceSyncTarget, DeviceSyncProcess> syncProcessMapV2_;
// Can be cancelled
bool canCancel_ = false;
diff --git a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp
index 9b0b725ac..79e18610e 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_deep_test.cpp
@@ -143,11 +143,13 @@ HWTEST_F(DistributedDBCommunicatorDeepTest, WaitAndRetrySend001, TestSize.Level2
{
// Preset
Message *msgForBB = nullptr;
- g_commBB->RegOnMessageCallback([&msgForBB](const std::string &srcTarget, Message *inMsg) {
+ g_commBB->RegOnMessageCallback([&msgForBB](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
msgForBB = inMsg;
}, nullptr);
Message *msgForCA = nullptr;
- g_commCA->RegOnMessageCallback([&msgForCA](const std::string &srcTarget, Message *inMsg) {
+ g_commCA->RegOnMessageCallback([&msgForCA](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
msgForCA = inMsg;
}, nullptr);
@@ -329,7 +331,8 @@ HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment001, TestSize.Level2)
{
// Preset
Message *recvMsgForBB = nullptr;
- g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
+ g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
recvMsgForBB = inMsg;
}, nullptr);
@@ -384,7 +387,8 @@ HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment002, TestSize.Level2)
{
// Preset
Message *recvMsgForCC = nullptr;
- g_commCC->RegOnMessageCallback([&recvMsgForCC](const std::string &srcTarget, Message *inMsg) {
+ g_commCC->RegOnMessageCallback([&recvMsgForCC](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
recvMsgForCC = inMsg;
}, nullptr);
@@ -450,7 +454,8 @@ HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment003, TestSize.Level3)
{
// Preset
std::atomic<int> count {0};
- OnMessageCallback callback = [&count](const std::string &srcTarget, Message *inMsg) {
+ OnMessageCallback callback = [&count](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
delete inMsg;
inMsg = nullptr;
count.fetch_add(1, std::memory_order_seq_cst);
@@ -516,7 +521,8 @@ HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment004, TestSize.Level2)
* @tc.steps: step1. connect device A with device B
*/
Message *recvMsgForBB = nullptr;
- g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
+ g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
recvMsgForBB = inMsg;
}, nullptr);
AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
diff --git a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_send_receive_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_send_receive_test.cpp
index b3f7e8ff0..af5b17868 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_send_receive_test.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_send_receive_test.cpp
@@ -135,7 +135,8 @@ static void CheckRecvMessage(Message *recvMsg, bool isEmpty, uint32_t msgId, uin
string srcTargetFor##src##label; \
Message *recvMsgFor##src##label = nullptr; \
g_comm##src##label->RegOnMessageCallback( \
- [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
+ [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, const std::string &userId, \
+ Message *inMsg) { \
srcTargetFor##src##label = srcTarget; \
recvMsgFor##src##label = inMsg; \
}, nullptr);
@@ -527,7 +528,8 @@ HWTEST_F(DistributedDBCommunicatorSendReceiveTest, ReceiveCheck001, TestSize.Lev
{
// Preset
int recvCount = 0;
- g_commAA->RegOnMessageCallback([&recvCount](const std::string &srcTarget, Message *inMsg) {
+ g_commAA->RegOnMessageCallback([&recvCount](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
recvCount++;
if (inMsg != nullptr) {
delete inMsg;
@@ -588,7 +590,8 @@ HWTEST_F(DistributedDBCommunicatorSendReceiveTest, ReceiveCheck002, TestSize.Lev
{
// Preset
int recvCount = 0;
- g_commAA->RegOnMessageCallback([&recvCount](const std::string &srcTarget, Message *inMsg) {
+ g_commAA->RegOnMessageCallback([&recvCount](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
recvCount++;
if (inMsg != nullptr) {
delete inMsg;
diff --git a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_test.cpp
index 8cda98e64..028d4e7bb 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_test.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_test.cpp
@@ -628,7 +628,8 @@ HWTEST_F(DistributedDBCommunicatorTest, ReportCommunicatorNotFound001, TestSize.
ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
ASSERT_NE(commBA, nullptr);
Message *recvMsgForBA = nullptr;
- commBA->RegOnMessageCallback([&recvMsgForBA](const std::string &srcTarget, Message *inMsg) {
+ commBA->RegOnMessageCallback([&recvMsgForBA](const std::string &srcTarget, const std::string &userId,
+ Message *inMsg) {
recvMsgForBA = inMsg;
}, nullptr);
commBA->Activate(USER_ID_1);
@@ -670,7 +671,8 @@ HWTEST_F(DistributedDBCommunicatorTest, ReportCommunicatorNotFound001, TestSize.
string srcTargetFor##src##label; \
Message *recvMsgFor##src##label = nullptr; \
comm##src##label->RegOnMessageCallback( \
- [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
+ [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, const std::string &userId, \
+ Message *inMsg) { \
srcTargetFor##src##label = srcTarget; \
recvMsgFor##src##label = inMsg; \
}, nullptr);
@@ -812,7 +814,8 @@ HWTEST_F(DistributedDBCommunicatorTest, ReDeliverMessage002, TestSize.Level1)
ICommunicator *commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errCode);
ASSERT_NE(commBA, nullptr);
std::vector<std::pair<std::string, Message *>> msgCallbackForBA;
- commBA->RegOnMessageCallback([&msgCallbackForBA](const std::string &srcTarget, Message *inMsg) {
+ commBA->RegOnMessageCallback([&msgCallbackForBA](const std::string &srcTarget, const std::string &userId,\
+ Message *inMsg) {
msgCallbackForBA.push_back({srcTarget, inMsg});
}, nullptr);
commBA->Activate();
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp
index 827a7b8e5..ea6ad502a 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp
@@ -1383,7 +1383,7 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SyncEngineTest004, TestSize.Level0)
auto *enginePtr = new (std::nothrow) MockSyncEngine();
ASSERT_NE(enginePtr, nullptr);
int errCode = E_OK;
- auto *context = enginePtr->CallGetSyncTaskContext("dev", errCode);
+ auto *context = enginePtr->CallGetSyncTaskContext("dev", "", errCode);
EXPECT_EQ(context, nullptr);
EXPECT_EQ(errCode, -E_INVALID_DB);
RefObject::KillAndDecObjRef(enginePtr);
@@ -1720,7 +1720,8 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SyncTaskContextCheck002, TestSize.Leve
*/
auto syncTaskContext = new(std::nothrow) MockSyncTaskContext();
ASSERT_NE(syncTaskContext, nullptr);
- auto operation = new SyncOperation(1u, {}, static_cast<int>(SyncModeType::QUERY_PUSH), nullptr, false);
+ std::vector<std::string> devices = {};
+ auto operation = new SyncOperation(1u, devices, static_cast<int>(SyncModeType::QUERY_PUSH), nullptr, false);
ASSERT_NE(operation, nullptr);
QuerySyncObject querySyncObject;
operation->SetQuery(querySyncObject);
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_relational_ver_p2p_sync_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_relational_ver_p2p_sync_test.cpp
index 4c79a098c..47f9c6962 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_relational_ver_p2p_sync_test.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_relational_ver_p2p_sync_test.cpp
@@ -2933,7 +2933,8 @@ HWTEST_F(DistributedDBRelationalVerP2PSyncTest, AutoLaunchSyncAfterRekey_002, Te
*/
HWTEST_F(DistributedDBRelationalVerP2PSyncTest, SyncTargetTest001, TestSize.Level1) {
MockSyncTaskContext syncTaskContext;
- SyncOperation *operation = new SyncOperation(1, {}, 0, nullptr, false);
+ std::vector<std::string> devices = {};
+ SyncOperation *operation = new SyncOperation(1, devices, 0, nullptr, false);
EXPECT_NE(operation, nullptr);
std::thread addTarget([&syncTaskContext, &operation]() {
auto *newTarget = new (std::nothrow) SingleVerSyncTarget;
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp
index b68cae6e7..0f031a566 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.cpp
@@ -109,7 +109,8 @@ int GenericVirtualDevice::Initialize(VirtualCommunicatorAggregator *comAggregato
return -E_OUT_OF_MEMORY;
}
communicateHandle_->RegOnMessageCallback(
- std::bind(&GenericVirtualDevice::MessageCallback, this, std::placeholders::_1, std::placeholders::_2), []() {});
+ std::bind(&GenericVirtualDevice::MessageCallback, this, std::placeholders::_1, std::placeholders::_2,
+ std::placeholders::_3), []() {});
context_->Initialize(remoteDeviceId_, storage_, metadata_, communicateHandle_);
context_->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
context_->RegOnSyncTask(std::bind(&GenericVirtualDevice::StartResponseTask, this));
@@ -132,7 +133,7 @@ std::string GenericVirtualDevice::GetDeviceId() const
return deviceId_;
}
-int GenericVirtualDevice::MessageCallback(const std::string &deviceId, Message *inMsg)
+int GenericVirtualDevice::MessageCallback(const std::string &deviceId, const std::string &userId, Message *inMsg)
{
if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
if (onRemoteDataChanged_) {
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h
index 462dd6948..3c6976d63 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/generic_virtual_device.h
@@ -34,7 +34,7 @@ public:
int Initialize(VirtualCommunicatorAggregator *comAggregator, ISyncInterface *syncInterface);
void SetDeviceId(const std::string &deviceId);
std::string GetDeviceId() const;
- int MessageCallback(const std::string &deviceId, Message *inMsg);
+ int MessageCallback(const std::string &deviceId, const std::string &userId, Message *inMsg);
void OnRemoteDataChanged(const std::function<void(const std::string &)> &callback);
void Online();
void Offline();
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h
index 1545ea636..fa6f8f0df 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h
@@ -29,9 +29,9 @@ public:
subManager_ = std::make_shared<SubscribeManager>();
}
- ISyncTaskContext *CallGetSyncTaskContext(const std::string &deviceId, int &errCode)
+ ISyncTaskContext *CallGetSyncTaskContext(const std::string &deviceId, const std::string &userId, int &errCode)
{
- return SyncEngine::GetSyncTaskContext(deviceId, errCode);
+ return SyncEngine::GetSyncTaskContext(deviceId, userId, errCode);
}
};
} // namespace DistributedDB
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp
index 9d6df07de..f2b85cd0e 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp
@@ -97,7 +97,7 @@ void VirtualCommunicator::CallbackOnMessage(const std::string &srcTarget, Messag
std::lock_guard<std::mutex> lock(onMessageLock_);
if (isEnable_ && onMessage_ && (srcTarget != deviceId_) && ((inMsg->GetMessageId() != dropMsgId_) ||
(dropMsgTimes_ == 0))) {
- onMessage_(srcTarget, inMsg);
+ onMessage_(srcTarget, "", inMsg);
} else {
LOGD("drop msg from dev=%s, localDev=%s", srcTarget.c_str(), deviceId_.c_str());
if (dropMsgTimes_ > 0) {
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/liao-yonghuang/lyh.git
git@gitee.com:liao-yonghuang/lyh.git
liao-yonghuang
lyh
lyh
master

搜索帮助