代码拉取完成,页面将自动刷新
diff --git a/frameworks/libs/distributeddb/interfaces/include/store_types.h b/frameworks/libs/distributeddb/interfaces/include/store_types.h
index a5dd8fd81..279897a61 100644
--- a/frameworks/libs/distributeddb/interfaces/include/store_types.h
+++ b/frameworks/libs/distributeddb/interfaces/include/store_types.h
@@ -319,5 +319,17 @@ enum class DataOperator : uint32_t {
UPDATE_TIME = 0x01,
RESET_UPLOAD_CLOUD = 0x02
};
+
+struct DeviceSyncTarget {
+ std::string device;
+ std::string userId;
+ bool operator<(const DeviceSyncTarget &other) const
+ {
+ if (device == other.device) {
+ return userId < other.userId;
+ }
+ return device < other.device;
+ }
+};
} // namespace DistributedDB
#endif // KV_STORE_TYPE_H
diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp
index 08a28ff5d..db7ca0e44 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp
@@ -323,13 +323,14 @@ int SyncEngine::InitComunicator(const ISyncInterface *syncInterface)
int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation *operation)
{
int errCode = E_OK;
+ std::string targetUserId = GetTargetUserId(deviceId);
ISyncTaskContext *context = nullptr;
{
std::lock_guard<std::mutex> lock(contextMapLock_);
- context = FindSyncTaskContext(deviceId);
+ context = FindSyncTaskContext({deviceId, targetUserId});
if (context == nullptr) {
if (!IsKilled()) {
- context = GetSyncTaskContext(deviceId, errCode);
+ context = GetSyncTaskContext({deviceId, targetUserId}, errCode);
}
if (context == nullptr) {
return errCode;
@@ -342,8 +343,6 @@ int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation
RefObject::IncObjRef(context);
}
- std::string targetUserId = GetTargetUserId(context->GetDeviceId());
- context->SetTargetUserId(targetUserId);
errCode = context->AddSyncOperation(operation);
if (operation != nullptr) {
operation->SetSyncContext(context); // make the life cycle of context and operation are same
@@ -419,11 +418,10 @@ int SyncEngine::DealMsgUtilQueueEmpty()
// it will deal with the first message in queue, we should increase object reference counts and sure that resources
// could be prevented from destroying by other threads.
do {
- ISyncTaskContext *nextContext = GetContextForMsg(inMsg->GetTarget(), errCode);
+ ISyncTaskContext *nextContext = GetContextForMsg({inMsg->GetTarget(), inMsg->GetSenderUserId()}, errCode);
if (errCode != E_OK) {
break;
}
- nextContext->SetTargetUserId(inMsg->GetSenderUserId());
errCode = ScheduleDealMsg(nextContext, inMsg);
if (errCode != E_OK) {
RefObject::DecObjRef(nextContext);
@@ -437,12 +435,12 @@ int SyncEngine::DealMsgUtilQueueEmpty()
return errCode;
}
-ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, int &errCode)
+ISyncTaskContext *SyncEngine::GetContextForMsg(const DeviceSyncTarget &target, int &errCode)
{
ISyncTaskContext *context = nullptr;
{
std::lock_guard<std::mutex> lock(contextMapLock_);
- context = FindSyncTaskContext(targetDev);
+ context = FindSyncTaskContext(target);
if (context != nullptr) { // LCOV_EXCL_BR_LINE
if (context->IsKilled()) {
errCode = -E_OBJ_IS_KILLED;
@@ -453,7 +451,7 @@ ISyncTaskContext *SyncEngine::GetContextForMsg(const std::string &targetDev, int
errCode = -E_OBJ_IS_KILLED;
return nullptr;
}
- context = GetSyncTaskContext(targetDev, errCode);
+ context = GetSyncTaskContext(target, errCode);
if (context == nullptr) {
return nullptr;
}
@@ -547,11 +545,10 @@ int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message
}
int errCode = E_OK;
- ISyncTaskContext *nextContext = GetContextForMsg(targetDev, errCode);
+ ISyncTaskContext *nextContext = GetContextForMsg({targetDev, inMsg->GetSenderUserId()}, errCode);
if (errCode != E_OK) {
return errCode;
}
- nextContext->SetTargetUserId(inMsg->GetSenderUserId());
LOGD("[SyncEngine] MessageReciveCallback MSG ID = %d", inMsg->GetMessageId());
return ScheduleDealMsg(nextContext, inMsg);
}
@@ -602,9 +599,9 @@ int SyncEngine::GetMsgSize(const Message *inMsg) const
}
}
-ISyncTaskContext *SyncEngine::FindSyncTaskContext(const std::string &deviceId)
+ISyncTaskContext *SyncEngine::FindSyncTaskContext(const DeviceSyncTarget &target)
{
- auto iter = syncTaskContextMap_.find(deviceId);
+ auto iter = syncTaskContextMap_.find(target);
if (iter != syncTaskContextMap_.end()) {
ISyncTaskContext *context = iter->second;
return context;
@@ -612,24 +609,29 @@ ISyncTaskContext *SyncEngine::FindSyncTaskContext(const std::string &deviceId)
return nullptr;
}
-ISyncTaskContext *SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId)
+std::vector<ISyncTaskContext *> SyncEngine::GetSyncTaskContextAndInc(const std::string &deviceId)
{
- ISyncTaskContext *context = nullptr;
+ std::vector<ISyncTaskContext *> contexts;
std::lock_guard<std::mutex> lock(contextMapLock_);
- context = FindSyncTaskContext(deviceId);
- if (context == nullptr) {
- LOGI("[SyncEngine] dev=%s, context is null, no need to clear sync operation", STR_MASK(deviceId));
- return nullptr;
- }
- if (context->IsKilled()) { // LCOV_EXCL_BR_LINE
- LOGI("[SyncEngine] context is killing");
- return nullptr;
+ for (const auto &iter : syncTaskContextMap_) {
+ if (iter.first.device != deviceId) {
+ continue;
+ }
+ if (iter.second == nullptr) {
+ LOGI("[SyncEngine] dev=%s, context is null, no need to clear sync operation", STR_MASK(deviceId));
+ return {};
+ }
+ if (iter.second->IsKilled()) { // LCOV_EXCL_BR_LINE
+ LOGI("[SyncEngine] context is killing");
+ return {};
+ }
+ RefObject::IncObjRef(iter.second);
+ contexts.push_back(iter.second);
}
- RefObject::IncObjRef(context);
- return context;
+ return contexts;
}
-ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, int &errCode)
+ISyncTaskContext *SyncEngine::GetSyncTaskContext(const DeviceSyncTarget &target, int &errCode)
{
auto storage = GetAndIncSyncInterface();
if (storage == nullptr) {
@@ -643,19 +645,20 @@ ISyncTaskContext *SyncEngine::GetSyncTaskContext(const std::string &deviceId, in
LOGE("[SyncEngine] SyncTaskContext alloc failed, may be no memory available!");
return nullptr;
}
- errCode = context->Initialize(deviceId, storage, metadata_, communicatorProxy_);
+ errCode = context->Initialize(target.device, storage, metadata_, communicatorProxy_);
if (errCode != E_OK) {
- LOGE("[SyncEngine] context init failed err %d, dev %s", errCode, STR_MASK(deviceId));
+ LOGE("[SyncEngine] context init failed err %d, dev %s", errCode, STR_MASK(target.device));
RefObject::DecObjRef(context);
storage->DecRefCount();
context = nullptr;
return nullptr;
}
- syncTaskContextMap_.insert(std::pair<std::string, ISyncTaskContext *>(deviceId, context));
+ context->SetTargetUserId(target.userId);
+ syncTaskContextMap_.insert(std::pair<DeviceSyncTarget, ISyncTaskContext *>(target, context));
// IncRef for SyncEngine to make sure SyncEngine is valid when context access
RefObject::IncObjRef(this);
- context->OnLastRef([this, deviceId, storage]() {
- LOGD("[SyncEngine] SyncTaskContext for id %s finalized", STR_MASK(deviceId));
+ context->OnLastRef([this, target, storage]() {
+ LOGD("[SyncEngine] SyncTaskContext for id %s finalized", STR_MASK(target.device));
RefObject::DecObjRef(this);
storage->DecRefCount();
});
@@ -871,33 +874,37 @@ void SyncEngine::OfflineHandleByDevice(const std::string &deviceId, ISyncInterfa
static_cast<SyncGenericInterface *>(storage)->GetDBInfo(dbInfo);
RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, deviceId);
// get context and Inc context if context is not nullptr
- ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId);
- {
- std::lock_guard<std::mutex> lock(communicatorProxyLock_);
- if (communicatorProxy_ == nullptr) {
- return;
+ std::vector<ISyncTaskContext *> contexts = GetSyncTaskContextAndInc(deviceId);
+ for (const auto &context : contexts) {
+ {
+ std::lock_guard<std::mutex> lock(communicatorProxyLock_);
+ if (communicatorProxy_ == nullptr) {
+ return;
+ }
+ if (communicatorProxy_->IsDeviceOnline(deviceId)) { // LCOV_EXCL_BR_LINE
+ LOGI("[SyncEngine] target dev=%s is online, no need to clear task.", STR_MASK(deviceId));
+ RefObject::DecObjRef(context);
+ return;
+ }
}
- if (communicatorProxy_->IsDeviceOnline(deviceId)) { // LCOV_EXCL_BR_LINE
- LOGI("[SyncEngine] target dev=%s is online, no need to clear task.", STR_MASK(deviceId));
+ // means device is offline, clear local subscribe
+ subManager_->ClearLocalSubscribeQuery(deviceId);
+ // clear sync task
+ if (context != nullptr) {
+ context->ClearAllSyncTask();
RefObject::DecObjRef(context);
- return;
}
}
- // means device is offline, clear local subscribe
- subManager_->ClearLocalSubscribeQuery(deviceId);
- // clear sync task
- if (context != nullptr) {
- context->ClearAllSyncTask();
- RefObject::DecObjRef(context);
- }
}
void SyncEngine::ClearAllSyncTaskByDevice(const std::string &deviceId)
{
- ISyncTaskContext *context = GetSyncTaskContextAndInc(deviceId);
- if (context != nullptr) {
- context->ClearAllSyncTask();
- RefObject::DecObjRef(context);
+ std::vector<ISyncTaskContext *> contexts = GetSyncTaskContextAndInc(deviceId);
+ for (const auto &context : contexts) {
+ if (context != nullptr) {
+ context->ClearAllSyncTask();
+ RefObject::DecObjRef(context);
+ }
}
}
diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h
index f81b7a474..cbed36ed9 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h
+++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h
@@ -142,8 +142,8 @@ protected:
virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0;
// Find SyncTaskContext from the map
- ISyncTaskContext *FindSyncTaskContext(const std::string &deviceId);
- ISyncTaskContext *GetSyncTaskContextAndInc(const std::string &deviceId);
+ ISyncTaskContext *FindSyncTaskContext(const DeviceSyncTarget &target);
+ std::vector<ISyncTaskContext *> GetSyncTaskContextAndInc(const std::string &deviceId);
void GetQueryAutoSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
void GetSubscribeSyncParam(const std::string &device, const QuerySyncObject &query, InternalSyncParma &outParam);
@@ -151,12 +151,12 @@ protected:
ISyncInterface *GetAndIncSyncInterface();
void SetSyncInterface(ISyncInterface *syncInterface);
- ISyncTaskContext *GetSyncTaskContext(const std::string &deviceId, int &errCode);
+ ISyncTaskContext *GetSyncTaskContext(const DeviceSyncTarget &target, int &errCode);
std::mutex storageMutex_;
ISyncInterface *syncInterface_;
// Used to store all send sync task infos (such as pull sync response, and push sync request)
- std::map<std::string, ISyncTaskContext *> syncTaskContextMap_;
+ std::map<DeviceSyncTarget, ISyncTaskContext *> syncTaskContextMap_;
std::mutex contextMapLock_;
std::shared_ptr<SubscribeManager> subManager_;
std::function<void(const InternalSyncParma ¶m)> queryAutoSyncCallback_;
@@ -201,7 +201,7 @@ private:
// Handle message in order.
int ScheduleDealMsg(ISyncTaskContext *context, Message *inMsg);
- ISyncTaskContext *GetContextForMsg(const std::string &targetDev, int &errCode);
+ ISyncTaskContext *GetContextForMsg(const DeviceSyncTarget &target, int &errCode);
ICommunicator *AllocCommunicator(const std::string &identifier, int &errCode, std::string userId = "");
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp
index 827a7b8e5..39bbf08c5 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/distributeddb_mock_sync_module_test.cpp
@@ -1383,7 +1383,7 @@ HWTEST_F(DistributedDBMockSyncModuleTest, SyncEngineTest004, TestSize.Level0)
auto *enginePtr = new (std::nothrow) MockSyncEngine();
ASSERT_NE(enginePtr, nullptr);
int errCode = E_OK;
- auto *context = enginePtr->CallGetSyncTaskContext("dev", errCode);
+ auto *context = enginePtr->CallGetSyncTaskContext({"dev", "user"}, errCode);
EXPECT_EQ(context, nullptr);
EXPECT_EQ(errCode, -E_INVALID_DB);
RefObject::KillAndDecObjRef(enginePtr);
diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h
index 1545ea636..72a537b95 100644
--- a/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h
+++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_sync_engine.h
@@ -29,9 +29,9 @@ public:
subManager_ = std::make_shared<SubscribeManager>();
}
- ISyncTaskContext *CallGetSyncTaskContext(const std::string &deviceId, int &errCode)
+ ISyncTaskContext * CallGetSyncTaskContext(const DeviceSyncTarget &target, int &errCode)
{
- return SyncEngine::GetSyncTaskContext(deviceId, errCode);
+ return SyncEngine::GetSyncTaskContext(target, errCode);
}
};
} // namespace DistributedDB
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。