1 Star 0 Fork 0

廖永煌/lyh

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
br_122.diff 15.41 KB
一键复制 编辑 原始数据 按行查看 历史
廖永煌 提交于 2025-05-17 11:18 +08:00 . 1
diff --git a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h
index 6034cb15e..a5dfd3f8a 100644
--- a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h
+++ b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h
@@ -120,7 +120,7 @@ private:
// Function with suffix NoMutex should be called with mutex in the caller
int TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
- const LabelType &toLabel, const std::string &userId = "");
+ const LabelType &toLabel, const UserInfo &userInfo);
// Auxiliary function for cutting short primary function
int RegCallbackToAdapter();
@@ -164,7 +164,7 @@ private:
uint64_t IncreaseSendSequenceId(const std::string &target);
int GetDataUserId(const ParseResult &inResult, const LabelType &toLabel, const DataUserInfoProc &userInfoProc,
- std::string &userId);
+ UserInfo &userInfo);
DECLARE_OBJECT_TAG(CommunicatorAggregator);
diff --git a/frameworks/libs/distributeddb/communicator/include/frame_retainer.h b/frameworks/libs/distributeddb/communicator/include/frame_retainer.h
index 4a7c80e9f..854bc0923 100644
--- a/frameworks/libs/distributeddb/communicator/include/frame_retainer.h
+++ b/frameworks/libs/distributeddb/communicator/include/frame_retainer.h
@@ -29,12 +29,14 @@ class SerialBuffer; // Forward Declarations
struct FrameInfo {
SerialBuffer *buffer = nullptr;
std::string srcTarget;
+ std::string senderUser;
LabelType commLabel;
uint32_t frameId = 0u;
};
struct RetainWork {
SerialBuffer *buffer = nullptr;
+ std::string senderUser;
uint32_t frameId = 0u;
uint32_t remainTime = 0u; // in second
};
diff --git a/frameworks/libs/distributeddb/communicator/include/message.h b/frameworks/libs/distributeddb/communicator/include/message.h
index 416e08316..ec52a076b 100644
--- a/frameworks/libs/distributeddb/communicator/include/message.h
+++ b/frameworks/libs/distributeddb/communicator/include/message.h
@@ -142,6 +142,11 @@ public:
target_ = inTarget;
}
+ void SetSenderUserId(const std::string &userId)
+ {
+ senderUserId_ = userId;
+ }
+
void SetPriority(Priority inPriority)
{
prio_ = inPriority;
@@ -185,6 +190,11 @@ public:
return target_;
}
+ std::string GetSenderUserId() const
+ {
+ return senderUserId_;
+ }
+
Priority GetPriority() const
{
return prio_;
@@ -212,6 +222,7 @@ private:
// Field carry supplemental info
std::string target_;
+ std::string senderUserId_;
Priority prio_ = Priority::LOW;
};
} // namespace DistributedDB
diff --git a/frameworks/libs/distributeddb/communicator/src/communicator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator.cpp
index c26d35e34..1bec2634f 100644
--- a/frameworks/libs/distributeddb/communicator/src/communicator.cpp
+++ b/frameworks/libs/distributeddb/communicator/src/communicator.cpp
@@ -144,13 +144,15 @@ int Communicator::SendMessage(const std::string &dstTarget, const Message *inMsg
return errCode;
}
-void Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf)
+void Communicator::OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf,
+ const std::string &senderUser)
{
std::lock_guard<std::mutex> messageHandleLockGuard(messageHandleMutex_);
- if (srcTarget.size() != 0 && inBuf != nullptr && onMessageHandle_) {
+ if (!srcTarget.empty() && inBuf != nullptr && onMessageHandle_) {
int error = E_OK;
// if error is not E_OK, null pointer will be returned
Message *message = ProtocolProto::ToMessage(inBuf, error);
+ message->SetSenderUserId(senderUser);
delete inBuf;
inBuf = nullptr;
// message is not nullptr if error is E_OK or error is E_NOT_REGISTER.
diff --git a/frameworks/libs/distributeddb/communicator/src/communicator.h b/frameworks/libs/distributeddb/communicator/src/communicator.h
index 41ede2362..d58ac48a5 100644
--- a/frameworks/libs/distributeddb/communicator/src/communicator.h
+++ b/frameworks/libs/distributeddb/communicator/src/communicator.h
@@ -56,7 +56,7 @@ public:
const OnSendEnd &onEnd) override;
// Call by CommunicatorAggregator directly
- void OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf);
+ void OnBufferReceive(const std::string &srcTarget, const SerialBuffer *inBuf, const std::string &senderUser);
// Call by CommunicatorAggregator directly
void OnConnectChange(const std::string &target, bool isConnect);
diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp
index ea724de99..561947ab8 100644
--- a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp
+++ b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp
@@ -276,7 +276,7 @@ void CommunicatorAggregator::ActivateCommunicator(const LabelType &commLabel, co
// Do Redeliver, the communicator is responsible to deal with the frame
std::list<FrameInfo> framesToRedeliver = retainer_.FetchFramesForSpecificCommunicator(commLabel);
for (auto &entry : framesToRedeliver) {
- commMap_[userId].at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer);
+ commMap_[userId].at(commLabel).first->OnBufferReceive(entry.srcTarget, entry.buffer, entry.senderUser);
}
}
@@ -674,8 +674,8 @@ int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget,
const ParseResult &inResult, const DataUserInfoProc &userInfoProc)
{
LabelType toLabel = inResult.GetCommLabel();
- std::string userId;
- int ret = GetDataUserId(inResult, toLabel, userInfoProc, userId);
+ UserInfo userInfo;
+ int ret = GetDataUserId(inResult, toLabel, userInfoProc, userInfo);
if (ret != E_OK) {
LOGE("[CommAggr][AppReceive] get data user id err, ret=%d", ret);
delete inFrameBuffer;
@@ -684,7 +684,8 @@ int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget,
}
{
std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
- int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel, userId);
+ int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel,
+ userInfo);
if (errCode == E_OK) { // Attention: Here is equal to E_OK
return E_OK;
}
@@ -694,7 +695,7 @@ int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget,
{
std::lock_guard<std::mutex> onCommLackLockGuard(onCommLackMutex_);
if (onCommLackHandle_) {
- errCode = onCommLackHandle_(toLabel, userId);
+ errCode = onCommLackHandle_(toLabel, userInfo.receiveUser);
LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread
} else {
LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently.");
@@ -702,7 +703,7 @@ int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget,
}
// Here we have to lock commMapMutex_ and search communicator again.
std::lock_guard<std::mutex> commMapLockGuard(commMapMutex_);
- int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel, userId);
+ int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(srcTarget, inFrameBuffer, toLabel, userInfo);
if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK.
LOGI("[CommAggr][AppReceive] Communicator of %.3s found after try again(rare case).", VEC_TO_STR(toLabel));
return E_OK;
@@ -715,13 +716,13 @@ int CommunicatorAggregator::OnAppLayerFrameReceive(const std::string &srcTarget,
return errCode; // The caller will display errCode in log
}
// Do Retention, the retainer is responsible to deal with the frame
- retainer_.RetainFrame(FrameInfo{inFrameBuffer, srcTarget, toLabel, inResult.GetFrameId()});
+ retainer_.RetainFrame(FrameInfo{inFrameBuffer, srcTarget, userInfo.senderUser, toLabel, inResult.GetFrameId()});
inFrameBuffer = nullptr;
return E_OK;
}
int CommunicatorAggregator::GetDataUserId(const ParseResult &inResult, const LabelType &toLabel,
- const DataUserInfoProc &userInfoProc, std::string &userId)
+ const DataUserInfoProc &userInfoProc, UserInfo &userInfo)
{
if (userInfoProc.processCommunicator == nullptr) {
LOGE("[CommAggr][GetDataUserId] processCommunicator is nullptr");
@@ -736,8 +737,8 @@ int CommunicatorAggregator::GetDataUserId(const ParseResult &inResult, const Lab
LOGE("[CommAggr][GetDataUserId] userId dismatched, drop packet");
return ret;
}
- if (userInfos.size() >= 1) {
- userId = userInfos[0].receiveUser;
+ if (!userInfos.empty()) {
+ userInfo = userInfos[0];
} else {
LOGW("[CommAggr][GetDataUserId] userInfos is empty");
}
@@ -745,11 +746,13 @@ int CommunicatorAggregator::GetDataUserId(const ParseResult &inResult, const Lab
}
int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget,
- SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const std::string &userId)
+ SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const UserInfo &userInfo)
{
// Ignore nonactivated communicator, which is regarded as inexistent
- if (commMap_[userId].count(toLabel) != 0 && commMap_[userId].at(toLabel).second) {
- commMap_[userId].at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer);
+ const std::string &senderUser = userInfo.senderUser;
+ const std::string &receiveUser = userInfo.receiveUser;
+ if (commMap_[receiveUser].count(toLabel) != 0 && commMap_[receiveUser].at(toLabel).second) {
+ commMap_[receiveUser].at(toLabel).first->OnBufferReceive(srcTarget, inFrameBuffer, senderUser);
// Frame handed over to communicator who is responsible to delete it. The frame is deleted here after return.
inFrameBuffer = nullptr;
return E_OK;
@@ -762,7 +765,7 @@ int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const s
communicator = entry.second.first;
isEmpty = userCommMap.first.empty();
LOGW("[CommAggr][TryDeliver] Found communicator of %s, but required user is %s",
- userCommMap.first.c_str(), userId.c_str());
+ userCommMap.first.c_str(), receiveUser.c_str());
break;
}
}
@@ -770,8 +773,8 @@ int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const s
break;
}
}
- if (communicator != nullptr && (userId.empty() || isEmpty)) {
- communicator->OnBufferReceive(srcTarget, inFrameBuffer);
+ if (communicator != nullptr && (receiveUser.empty() || isEmpty)) {
+ communicator->OnBufferReceive(srcTarget, inFrameBuffer, senderUser);
inFrameBuffer = nullptr;
return E_OK;
}
diff --git a/frameworks/libs/distributeddb/communicator/src/frame_retainer.cpp b/frameworks/libs/distributeddb/communicator/src/frame_retainer.cpp
index fc0eb0bba..c51fec9ed 100644
--- a/frameworks/libs/distributeddb/communicator/src/frame_retainer.cpp
+++ b/frameworks/libs/distributeddb/communicator/src/frame_retainer.cpp
@@ -85,7 +85,7 @@ void FrameRetainer::RetainFrame(const FrameInfo &inFrame)
if (inFrame.buffer == nullptr) {
return; // Never gonna happen
}
- RetainWork work{inFrame.buffer, inFrame.frameId, MAX_RETAIN_TIME};
+ RetainWork work{inFrame.buffer, inFrame.senderUser, inFrame.frameId, MAX_RETAIN_TIME};
if (work.buffer->GetSize() > MAX_RETAIN_FRAME_SIZE) {
LOGE("[Retainer][Retain] Frame size=%u over limit=%u.", work.buffer->GetSize(), MAX_RETAIN_FRAME_SIZE);
delete work.buffer;
@@ -140,7 +140,7 @@ std::list<FrameInfo> FrameRetainer::FetchFramesForSpecificCommunicator(const Lab
for (auto &entry : fetchOrder) {
RetainWork &work = perLabel[entry.second][entry.first];
LogRetainInfo("[Retainer][Fetch] FETCH-OUT", inCommLabel, entry.second, entry.first, work);
- outFrameList.emplace_back(FrameInfo{work.buffer, entry.second, inCommLabel, work.frameId});
+ outFrameList.emplace_back(FrameInfo{work.buffer, entry.second, work.senderUser, inCommLabel, work.frameId});
// Update statistics
totalSizeByByte_ -= work.buffer->GetSize();
totalRetainFrames_--;
diff --git a/frameworks/libs/distributeddb/interfaces/include/iprocess_communicator.h b/frameworks/libs/distributeddb/interfaces/include/iprocess_communicator.h
index 30b447ea9..3ac611525 100644
--- a/frameworks/libs/distributeddb/interfaces/include/iprocess_communicator.h
+++ b/frameworks/libs/distributeddb/interfaces/include/iprocess_communicator.h
@@ -39,6 +39,7 @@ struct ExtendInfo {
struct UserInfo {
std::string receiveUser;
+ std::string senderUser;
};
class ExtendHeaderHandle {
diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp
index 00e508510..08a28ff5d 100644
--- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp
+++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp
@@ -342,6 +342,8 @@ int SyncEngine::AddSyncOperForContext(const std::string &deviceId, SyncOperation
RefObject::IncObjRef(context);
}
+ std::string targetUserId = GetTargetUserId(context->GetDeviceId());
+ context->SetTargetUserId(targetUserId);
errCode = context->AddSyncOperation(operation);
if (operation != nullptr) {
operation->SetSyncContext(context); // make the life cycle of context and operation are same
@@ -421,6 +423,7 @@ int SyncEngine::DealMsgUtilQueueEmpty()
if (errCode != E_OK) {
break;
}
+ nextContext->SetTargetUserId(inMsg->GetSenderUserId());
errCode = ScheduleDealMsg(nextContext, inMsg);
if (errCode != E_OK) {
RefObject::DecObjRef(nextContext);
@@ -548,6 +551,7 @@ int SyncEngine::MessageReciveCallbackInner(const std::string &targetDev, Message
if (errCode != E_OK) {
return errCode;
}
+ nextContext->SetTargetUserId(inMsg->GetSenderUserId());
LOGD("[SyncEngine] MessageReciveCallback MSG ID = %d", inMsg->GetMessageId());
return ScheduleDealMsg(nextContext, inMsg);
}
@@ -665,8 +669,6 @@ int SyncEngine::ExecSyncTask(ISyncTaskContext *context)
return -E_OBJ_IS_KILLED;
}
auto timeout = GetTimeout(context->GetDeviceId());
- std::string targetUserId = GetTargetUserId(context->GetDeviceId());
- context->SetTargetUserId(targetUserId);
AutoLock lockGuard(context);
int status = context->GetTaskExecStatus();
if ((status == SyncTaskContext::RUNNING) || context->IsKilled()) {
@@ -680,7 +682,7 @@ int SyncEngine::ExecSyncTask(ISyncTaskContext *context)
context->ClearSyncOperation();
continue;
}
- if (targetUserId.empty()) {
+ if (context->GetTargetUserId().empty()) {
LOGE("[SyncEngine] No target user found.");
context->SetTaskErrCode(-E_NO_TRUSTED_USER);
context->SetOperationStatus(SyncOperation::OP_FAILED);
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/liao-yonghuang/lyh.git
git@gitee.com:liao-yonghuang/lyh.git
liao-yonghuang
lyh
lyh
master

搜索帮助