From b51b24fbfdf68b38296c1b09d1b36af4e8091e31 Mon Sep 17 00:00:00 2001 From: lobty Date: Fri, 23 May 2025 18:22:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=94=AE=E9=BC=A0=E7=A9=BF?= =?UTF-8?q?=E8=B6=8A=E5=BC=82=E5=B8=B8=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: lobty --- .../distributeddb/common/include/db_errno.h | 1 + .../include/communicator_aggregator.h | 13 ++- .../communicator/include/icommunicator.h | 2 + .../communicator/src/communicator.cpp | 15 +++ .../communicator/src/communicator.h | 7 ++ .../src/communicator_aggregator.cpp | 99 ++++++++++++------- .../communicator/src/protocol_proto.cpp | 3 +- .../include/iprocess_communicator.h | 5 + .../interfaces/include/store_types.h | 1 + .../syncer/src/device/ability_sync.cpp | 5 + .../syncer/src/device/communicator_proxy.cpp | 14 +++ .../syncer/src/device/communicator_proxy.h | 1 + .../single_ver_sync_state_machine.cpp | 16 ++- .../singlever/single_ver_sync_state_machine.h | 1 + .../single_ver_sync_task_context.cpp | 15 +++ .../singlever/single_ver_sync_task_context.h | 4 + .../syncer/src/device/sync_engine.cpp | 11 +++ .../syncer/src/device/sync_engine.h | 2 + .../syncer/src/device/time_sync.cpp | 14 ++- .../syncer/src/sync_operation.cpp | 1 + .../distributeddb/syncer/src/sync_operation.h | 1 + .../common/syncer/mock_communicator.h | 1 + .../common/syncer/virtual_communicator.cpp | 5 + .../common/syncer/virtual_communicator.h | 1 + .../syncer/virtual_time_sync_communicator.cpp | 6 ++ .../syncer/virtual_time_sync_communicator.h | 1 + 26 files changed, 198 insertions(+), 47 deletions(-) diff --git a/frameworks/libs/distributeddb/common/include/db_errno.h b/frameworks/libs/distributeddb/common/include/db_errno.h index a0174fae408..151dd7c2a7e 100644 --- a/frameworks/libs/distributeddb/common/include/db_errno.h +++ b/frameworks/libs/distributeddb/common/include/db_errno.h @@ -190,6 +190,7 @@ constexpr const int E_TABLE_REFERENCE_CHANGED = (E_BASE + 204); // table referen constexpr const int E_CLOUD_DISABLED = (E_BASE + 205); // The cloud switch has been turned off constexpr const int E_DISTRIBUTED_FIELD_DECREASE = (E_BASE + 206); // Sync fewer specified columns than last time constexpr const int E_NO_TRUSTED_USER = (E_BASE + 207); // No trusted found before device sync +constexpr const int E_FEEDBACK_DB_CLOSING = (E_BASE + 208); // Db was closing feedback from remote device } // namespace DistributedDB #endif // DISTRIBUTEDDB_ERRNO_H diff --git a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h index 7005f476ae0..8ecc02c01fe 100644 --- a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h +++ b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h @@ -119,8 +119,8 @@ private: const ParseResult &inResult, const DataUserInfoProc &userInfoProc); // Function with suffix NoMutex should be called with mutex in the caller - int TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget, SerialBuffer *&inFrameBuffer, - const LabelType &toLabel, const UserInfo &userInfo); + int TryDeliverAppLayerFrameToCommunicatorNoMutex(const DataUserInfoProc &userInfoProc, + const std::string &srcTarget, SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const UserInfo &userInfo); // Auxiliary function for cutting short primary function int RegCallbackToAdapter(); @@ -131,8 +131,9 @@ private: // Feedback related functions void TriggerVersionNegotiation(const std::string &dstTarget); void TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget, const LabelType &dstLabel, - const SerialBuffer *inOriFrame); - void TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget, const LabelType &dstLabel, Message* &oriMsg); + const SerialBuffer *inOriFrame, int inErrCode); + void TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget, const LabelType &dstLabel, Message* &oriMsg, + int sendErrNo); // Record the protocol version of remote target. void SetRemoteCommunicatorVersion(const std::string &target, uint16_t version); @@ -166,6 +167,10 @@ private: int GetDataUserId(const ParseResult &inResult, const LabelType &toLabel, const DataUserInfoProc &userInfoProc, const std::string &device, UserInfo &userInfo); + int ReTryDeliverAppLayerFrameOnCommunicatorNotFound(const ReceiveBytesInfo &receiveBytesInfo, + SerialBuffer *&inFrameBuffer, const ParseResult &inResult, const DataUserInfoProc &userInfoProc, + const UserInfo &userInfo); + DECLARE_OBJECT_TAG(CommunicatorAggregator); static std::atomic isCommunicatorNotFoundFeedbackEnable_; diff --git a/frameworks/libs/distributeddb/communicator/include/icommunicator.h b/frameworks/libs/distributeddb/communicator/include/icommunicator.h index fa3eed8c659..5d9c56048ad 100644 --- a/frameworks/libs/distributeddb/communicator/include/icommunicator.h +++ b/frameworks/libs/distributeddb/communicator/include/icommunicator.h @@ -27,6 +27,7 @@ namespace DistributedDB { // inMsg is heap memory, its ownership transfers by calling OnMessageCallback using OnMessageCallback = std::function; +using OnExtendInfoCallback = std::function; constexpr uint32_t SEND_TIME_OUT = 3000; // 3s struct SendConfig { @@ -57,6 +58,7 @@ public: virtual int RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper) = 0; virtual int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) = 0; virtual int RegOnSendableCallback(const std::function &onSendable, const Finalizer &inOper) = 0; + virtual int RegOnExtendInfoCallback(const OnExtendInfoCallback &onExtendInfo, const Finalizer &inOper) = 0; virtual void Activate(const std::string &userId = "") = 0; diff --git a/frameworks/libs/distributeddb/communicator/src/communicator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator.cpp index ef60f196232..b1d56568c7a 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator.cpp @@ -60,6 +60,12 @@ int Communicator::RegOnSendableCallback(const std::function &onSenda return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_); } +int Communicator::RegOnExtendInfoCallback(const OnExtendInfoCallback &onExtendInfo, const Finalizer &inOper) +{ + std::lock_guard extendInfoLockGuard(extendInfoHandleMutex_); + return RegCallBack(onExtendInfo, onExtendInfoHandle_, inOper, onExtendInfoFinalizer_); +} + void Communicator::Activate(const std::string &userId) { commAggrHandle_->ActivateCommunicator(commLabel_, userId); @@ -279,5 +285,14 @@ std::string Communicator::GetTargetUserId(const ExtendInfo ¶mInfo) const return extendHandle->GetTargetUserId(); } +void Communicator::OnGetExtendInfo(ExtendInfo &extendInfo) +{ + std::lock_guard lockguard(extendInfoHandleMutex_); + if (onExtendInfoHandle_) { + onExtendInfoHandle_(extendInfo); + } else { + LOGI("[Comm][Connect] Handle invalid currently."); + } +} DEFINE_OBJECT_TAG_FACILITIES(Communicator) } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/communicator/src/communicator.h b/frameworks/libs/distributeddb/communicator/src/communicator.h index 2a6aaf82d8a..796a37fac75 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator.h +++ b/frameworks/libs/distributeddb/communicator/src/communicator.h @@ -38,6 +38,7 @@ public: int RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper) override; int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override; int RegOnSendableCallback(const std::function &onSendable, const Finalizer &inOper) override; + int RegOnExtendInfoCallback(const OnExtendInfoCallback &onExtendInfo, const Finalizer &inOper) override; void Activate(const std::string &userId = "") override; @@ -64,6 +65,9 @@ public: // Call by CommunicatorAggregator directly void OnSendAvailable(); + // Call by CommunicatorAggregator directly + void OnGetExtendInfo(ExtendInfo &extendInfo); + // Call by CommunicatorAggregator directly LabelType GetCommunicatorLabel() const; @@ -82,12 +86,15 @@ private: OnMessageCallback onMessageHandle_; OnConnectCallback onConnectHandle_; std::function onSendableHandle_; + OnExtendInfoCallback onExtendInfoHandle_; Finalizer onMessageFinalizer_; Finalizer onConnectFinalizer_; Finalizer onSendableFinalizer_; + Finalizer onExtendInfoFinalizer_; std::mutex messageHandleMutex_; std::mutex connectHandleMutex_; std::mutex sendableHandleMutex_; + std::mutex extendInfoHandleMutex_; }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp index 89d69a36298..6d39ddf71e6 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp @@ -691,43 +691,21 @@ int CommunicatorAggregator::OnAppLayerFrameReceive(const ReceiveBytesInfo &recei } { std::lock_guard commMapLockGuard(commMapMutex_); - int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(receiveBytesInfo.srcTarget, inFrameBuffer, toLabel, - userInfo); + int errCode = TryDeliverAppLayerFrameToCommunicatorNoMutex(userInfoProc, receiveBytesInfo.srcTarget, + inFrameBuffer, toLabel, userInfo); if (errCode == E_OK) { // Attention: Here is equal to E_OK return E_OK; + } else if (errCode == -E_FEEDBACK_DB_CLOSING) { + TryToFeedbackWhenCommunicatorNotFound(receiveBytesInfo.srcTarget, toLabel, inFrameBuffer, + E_FEEDBACK_DB_CLOSING); + delete inFrameBuffer; + inFrameBuffer = nullptr; + return errCode; // The caller will display errCode in log } } LOGI("[CommAggr][AppReceive] Communicator of %.3s not found or nonactivated.", VEC_TO_STR(toLabel)); - int errCode = -E_NOT_FOUND; - { - std::lock_guard onCommLackLockGuard(onCommLackMutex_); - if (onCommLackHandle_) { - errCode = onCommLackHandle_(toLabel, userInfo.receiveUser); - LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread - } else { - LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently."); - } - } - // Here we have to lock commMapMutex_ and search communicator again. - std::lock_guard commMapLockGuard(commMapMutex_); - int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(receiveBytesInfo.srcTarget, inFrameBuffer, toLabel, + return ReTryDeliverAppLayerFrameOnCommunicatorNotFound(receiveBytesInfo, inFrameBuffer, inResult, userInfoProc, userInfo); - if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK. - LOGI("[CommAggr][AppReceive] Communicator of %.3s found after try again(rare case).", VEC_TO_STR(toLabel)); - return E_OK; - } - // Here, communicator is still not found, retain or discard according to the result of onCommLackHandle_ - if (errCode != E_OK) { - TryToFeedbackWhenCommunicatorNotFound(receiveBytesInfo.srcTarget, toLabel, inFrameBuffer); - delete inFrameBuffer; - inFrameBuffer = nullptr; - return errCode; // The caller will display errCode in log - } - // Do Retention, the retainer is responsible to deal with the frame - retainer_.RetainFrame(FrameInfo{inFrameBuffer, receiveBytesInfo.srcTarget, userInfo.sendUser, toLabel, - inResult.GetFrameId()}); - inFrameBuffer = nullptr; - return E_OK; } int CommunicatorAggregator::GetDataUserId(const ParseResult &inResult, const LabelType &toLabel, @@ -754,8 +732,8 @@ int CommunicatorAggregator::GetDataUserId(const ParseResult &inResult, const Lab return E_OK; } -int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget, - SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const UserInfo &userInfo) +int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const DataUserInfoProc &userInfoProc, + const std::string &srcTarget, SerialBuffer *&inFrameBuffer, const LabelType &toLabel, const UserInfo &userInfo) { // Ignore nonactivated communicator, which is regarded as inexistent const std::string &sendUser = userInfo.sendUser; @@ -782,6 +760,15 @@ int CommunicatorAggregator::TryDeliverAppLayerFrameToCommunicatorNoMutex(const s break; } } + if (communicator != nullptr && userInfoProc.processCommunicator != nullptr) { + ExtendInfo extendInfo; + communicator->OnGetExtendInfo(extendInfo); + DBStatus ret = userInfoProc.processCommunicator->ExtendInfoNotify(extendInfo); + if (ret != OK) { + LOGW("[CommAggr][TryDeliver] extend info notify ret:%d", ret); + return -E_FEEDBACK_DB_CLOSING; + } + } if (communicator != nullptr && (receiveUser.empty() || isEmpty)) { communicator->OnBufferReceive(srcTarget, inFrameBuffer, sendUser); inFrameBuffer = nullptr; @@ -884,7 +871,7 @@ void CommunicatorAggregator::TriggerVersionNegotiation(const std::string &dstTar } void CommunicatorAggregator::TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget, - const LabelType &dstLabel, const SerialBuffer *inOriFrame) + const LabelType &dstLabel, const SerialBuffer *inOriFrame, int inErrCode) { if (!isCommunicatorNotFoundFeedbackEnable_ || dstTarget.empty() || inOriFrame == nullptr) { return; @@ -898,11 +885,11 @@ void CommunicatorAggregator::TryToFeedbackWhenCommunicatorNotFound(const std::st return; } // Message is release in TriggerCommunicatorNotFoundFeedback - TriggerCommunicatorNotFoundFeedback(dstTarget, dstLabel, message); + TriggerCommunicatorNotFoundFeedback(dstTarget, dstLabel, message, inErrCode); } void CommunicatorAggregator::TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget, - const LabelType &dstLabel, Message* &oriMsg) + const LabelType &dstLabel, Message* &oriMsg, int sendErrNo) { if (oriMsg == nullptr || oriMsg->GetMessageType() != TYPE_REQUEST) { LOGI("[CommAggr][TrigNotFound] Do nothing for message with type not request."); @@ -914,7 +901,7 @@ void CommunicatorAggregator::TriggerCommunicatorNotFoundFeedback(const std::stri LOGI("[CommAggr][TrigNotFound] Do communicator not found feedback with target=%s{private}.", dstTarget.c_str()); oriMsg->SetMessageType(TYPE_RESPONSE); - oriMsg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND); + oriMsg->SetErrorNo(sendErrNo); int errCode = E_OK; SerialBuffer *buffer = ProtocolProto::BuildFeedbackMessageFrame(oriMsg, dstLabel, errCode); @@ -1181,5 +1168,43 @@ void CommunicatorAggregator::ClearOnlineLabel() } commLinker_->ClearOnlineLabel(); } + +int CommunicatorAggregator::ReTryDeliverAppLayerFrameOnCommunicatorNotFound(const ReceiveBytesInfo &receiveBytesInfo, + SerialBuffer *&inFrameBuffer, const ParseResult &inResult, const DataUserInfoProc &userInfoProc, + const UserInfo &userInfo) +{ + LabelType toLabel = inResult.GetCommLabel(); + int errCode = -E_NOT_FOUND; + { + std::lock_guard onCommLackLockGuard(onCommLackMutex_); + if (onCommLackHandle_) { + errCode = onCommLackHandle_(toLabel, userInfo.receiveUser); + LOGI("[CommAggr][AppReceive] On CommLack End."); // Log in case callback block this thread + } else { + LOGI("[CommAggr][AppReceive] CommLackHandle invalid currently."); + } + } + // Here we have to lock commMapMutex_ and search communicator again. + std::lock_guard commMapLockGuard(commMapMutex_); + int errCodeAgain = TryDeliverAppLayerFrameToCommunicatorNoMutex(userInfoProc, receiveBytesInfo.srcTarget, + inFrameBuffer, toLabel, userInfo); + if (errCodeAgain == E_OK) { // Attention: Here is equal to E_OK. + LOGI("[CommAggr][AppReceive] Communicator of %.3s found after try again(rare case).", VEC_TO_STR(toLabel)); + return E_OK; + } + // Here, communicator is still not found, retain or discard according to the result of onCommLackHandle_ + if (errCode != E_OK || errCodeAgain == -E_FEEDBACK_DB_CLOSING) { + TryToFeedbackWhenCommunicatorNotFound(receiveBytesInfo.srcTarget, toLabel, inFrameBuffer, + errCodeAgain == -E_FEEDBACK_DB_CLOSING ? E_FEEDBACK_DB_CLOSING : E_FEEDBACK_COMMUNICATOR_NOT_FOUND); + delete inFrameBuffer; + inFrameBuffer = nullptr; + return errCode == E_OK ? errCodeAgain : errCode; // The caller will display errCode in log + } + // Do Retention, the retainer is responsible to deal with the frame + retainer_.RetainFrame(FrameInfo{inFrameBuffer, receiveBytesInfo.srcTarget, userInfo.sendUser, toLabel, + inResult.GetFrameId()}); + inFrameBuffer = nullptr; + return E_OK; +} DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator) } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/communicator/src/protocol_proto.cpp b/frameworks/libs/distributeddb/communicator/src/protocol_proto.cpp index d5339ebb016..4e8ac3e3a4f 100644 --- a/frameworks/libs/distributeddb/communicator/src/protocol_proto.cpp +++ b/frameworks/libs/distributeddb/communicator/src/protocol_proto.cpp @@ -715,7 +715,8 @@ bool ProtocolProto::IsSupportMessageVersion(uint16_t version) bool ProtocolProto::IsFeedbackErrorMessage(uint32_t errorNo) { - return (errorNo == E_FEEDBACK_UNKNOWN_MESSAGE || errorNo == E_FEEDBACK_COMMUNICATOR_NOT_FOUND); + return (errorNo == E_FEEDBACK_UNKNOWN_MESSAGE || errorNo == E_FEEDBACK_COMMUNICATOR_NOT_FOUND || + errorNo == E_FEEDBACK_DB_CLOSING); } int ProtocolProto::ParseCommPhyHeaderCheckMagicAndVersion(const uint8_t *bytes, uint32_t length) diff --git a/frameworks/libs/distributeddb/interfaces/include/iprocess_communicator.h b/frameworks/libs/distributeddb/interfaces/include/iprocess_communicator.h index e398263d3b4..9ca66ebd96d 100644 --- a/frameworks/libs/distributeddb/interfaces/include/iprocess_communicator.h +++ b/frameworks/libs/distributeddb/interfaces/include/iprocess_communicator.h @@ -210,6 +210,11 @@ public: virtual void RegOnSendAble([[gnu::unused]] const OnSendAble &sendAbleCallback) { } + + virtual DBStatus ExtendInfoNotify(const ExtendInfo &extendInfo) + { + return OK; + } }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/interfaces/include/store_types.h b/frameworks/libs/distributeddb/interfaces/include/store_types.h index 422945e78bc..6c4fe85a75c 100644 --- a/frameworks/libs/distributeddb/interfaces/include/store_types.h +++ b/frameworks/libs/distributeddb/interfaces/include/store_types.h @@ -94,6 +94,7 @@ enum DBStatus { CLOUD_DISABLED, // the cloud switch has been turned off DISTRIBUTED_FIELD_DECREASE, // sync fewer specified columns than last time SKIP_ASSET, // workaround status for contact app assets download failure, need to ignore these failures + DB_CLOSING, // db is closing BUTT_STATUS = 27394048 // end of status }; diff --git a/frameworks/libs/distributeddb/syncer/src/device/ability_sync.cpp b/frameworks/libs/distributeddb/syncer/src/device/ability_sync.cpp index e4239e09e65..ea52e76e0b2 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/ability_sync.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/ability_sync.cpp @@ -1108,6 +1108,11 @@ int AbilitySync::AckMsgCheck(const Message *message, ISyncTaskContext *context) context->SetTaskErrCode(-E_FEEDBACK_COMMUNICATOR_NOT_FOUND); return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND; } + if (message->GetErrorNo() == E_FEEDBACK_DB_CLOSING) { + LOGE("[AbilitySync][AckMsgCheck] Remote db is closing"); + context->SetTaskErrCode(-E_FEEDBACK_DB_CLOSING); + return -E_FEEDBACK_DB_CLOSING; + } const AbilitySyncAckPacket *packet = message->GetObject(); if (packet == nullptr) { return -E_INVALID_ARGS; diff --git a/frameworks/libs/distributeddb/syncer/src/device/communicator_proxy.cpp b/frameworks/libs/distributeddb/syncer/src/device/communicator_proxy.cpp index 847d17ee2f0..a0c38c7ee3e 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/communicator_proxy.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/communicator_proxy.cpp @@ -78,6 +78,20 @@ int CommunicatorProxy::RegOnSendableCallback(const std::function &on return E_OK; } +int CommunicatorProxy::RegOnExtendInfoCallback(const OnExtendInfoCallback &onExtendInfo, const Finalizer &inOper) +{ + if (mainComm_ != nullptr) { + (void) mainComm_->RegOnExtendInfoCallback(onExtendInfo, inOper); + } + + std::lock_guard lock(devCommMapLock_); + for (const auto &iter : devCommMap_) { + (void) devCommMap_[iter.first].second->RegOnExtendInfoCallback(onExtendInfo, inOper); + } + + return E_OK; +} + void CommunicatorProxy::Activate(const std::string &userId) { if (mainComm_ != nullptr) { diff --git a/frameworks/libs/distributeddb/syncer/src/device/communicator_proxy.h b/frameworks/libs/distributeddb/syncer/src/device/communicator_proxy.h index 9049a37b0e8..2d8bc5c10a5 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/communicator_proxy.h +++ b/frameworks/libs/distributeddb/syncer/src/device/communicator_proxy.h @@ -34,6 +34,7 @@ public: int RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper) override; int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override; int RegOnSendableCallback(const std::function &onSendable, const Finalizer &inOper) override; + int RegOnExtendInfoCallback(const OnExtendInfoCallback &onExtendInfo, const Finalizer &inOper) override; void Activate(const std::string &userId = "") override; uint32_t GetCommunicatorMtuSize() const override; uint32_t GetCommunicatorMtuSize(const std::string &target) const override; diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.cpp index c50514c5340..fa0afed9673 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.cpp @@ -49,6 +49,7 @@ namespace { {State::TIME_SYNC, Event::TIME_SYNC_FINISHED_EVENT, State::ABILITY_SYNC}, {State::TIME_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT}, {State::TIME_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR}, + {State::TIME_SYNC, Event::NEED_RESYNC_EVENT, State::TIME_SYNC}, // In ABILITY_SYNC state, compare version num and schema {State::ABILITY_SYNC, Event::VERSION_NOT_SUPPOR_EVENT, State::INNER_ERR}, @@ -56,6 +57,7 @@ namespace { {State::ABILITY_SYNC, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT}, {State::ABILITY_SYNC, Event::INNER_ERR_EVENT, State::INNER_ERR}, {State::ABILITY_SYNC, Event::CONTROL_CMD_EVENT, State::SYNC_CONTROL_CMD}, + {State::ABILITY_SYNC, Event::NEED_RESYNC_EVENT, State::ABILITY_SYNC}, // In START_INITIACTIVE_DATA_SYNC state, send a sync request, and send first packt of data sync {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC}, @@ -64,6 +66,7 @@ namespace { {State::START_INITIACTIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC}, {State::START_INITIACTIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_INITIACTIVE_DATA_SYNC}, {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_TIME_SYNC_EVENT, State::TIME_SYNC}, + {State::START_INITIACTIVE_DATA_SYNC, Event::NEED_RESYNC_EVENT, State::START_INITIACTIVE_DATA_SYNC}, // In START_PASSIVE_DATA_SYNC state, do response pull request, and send first packt of data sync {State::START_PASSIVE_DATA_SYNC, Event::SEND_FINISHED_EVENT, State::START_PASSIVE_DATA_SYNC}, @@ -73,6 +76,7 @@ namespace { {State::START_PASSIVE_DATA_SYNC, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC}, {State::START_PASSIVE_DATA_SYNC, Event::RE_SEND_DATA_EVENT, State::START_PASSIVE_DATA_SYNC}, {State::START_PASSIVE_DATA_SYNC, Event::NEED_TIME_SYNC_EVENT, State::TIME_SYNC}, + {State::START_PASSIVE_DATA_SYNC, Event::NEED_RESYNC_EVENT, State::START_PASSIVE_DATA_SYNC}, // In WAIT_FOR_RECEIVE_DATA_FINISH, {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::RECV_FINISHED_EVENT, State::SYNC_TASK_FINISHED}, @@ -80,11 +84,13 @@ namespace { {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT}, {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::INNER_ERR_EVENT, State::INNER_ERR}, {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC}, + {State::WAIT_FOR_RECEIVE_DATA_FINISH, Event::NEED_RESYNC_EVENT, State::START_PASSIVE_DATA_SYNC}, {State::SYNC_CONTROL_CMD, Event::SEND_FINISHED_EVENT, State::SYNC_TASK_FINISHED}, {State::SYNC_CONTROL_CMD, Event::TIME_OUT_EVENT, State::SYNC_TIME_OUT}, {State::SYNC_CONTROL_CMD, Event::INNER_ERR_EVENT, State::INNER_ERR}, {State::SYNC_CONTROL_CMD, Event::NEED_ABILITY_SYNC_EVENT, State::ABILITY_SYNC}, + {State::SYNC_CONTROL_CMD, Event::NEED_ABILITY_SYNC_EVENT, State::SYNC_CONTROL_CMD}, // In SYNC_TASK_FINISHED, {State::SYNC_TASK_FINISHED, Event::ALL_TASK_FINISHED_EVENT, State::IDLE}, @@ -185,6 +191,10 @@ int SingleVerSyncStateMachine::ReceiveMessageCallback(Message *inMsg) LOGE("[StateMachine] message pre check failed"); return errCode; } + if (context_->IsNeedRetrySync(inMsg->GetErrorNo())) { + SwitchStateAndStep(NEED_RESYNC_EVENT); + return E_OK; + } switch (inMsg->GetMessageId()) { case TIME_SYNC_MESSAGE: errCode = TimeMarkSyncRecv(inMsg); @@ -538,6 +548,7 @@ Event SingleVerSyncStateMachine::GetEventAfterTimeSync(int mode) const Event SingleVerSyncStateMachine::DoSyncTaskFinished() { StopWatchDog(); + context_->ResetResyncTimes(); if (dataSync_ == nullptr || communicator_ == nullptr || syncContext_ == nullptr) { LOGE("[SingleVerSyncStateMachine] [DoSyncTaskFinished] dataSync_ or communicator_ or syncContext_ is nullptr."); return TransformErrCodeToEvent(-E_OUT_OF_MEMORY); @@ -883,7 +894,8 @@ int SingleVerSyncStateMachine::GetSyncOperationStatus(int errCode) const { -E_DENIED_SQL, SyncOperation::OP_DENIED_SQL }, { -E_REMOTE_OVER_SIZE, SyncOperation::OP_MAX_LIMITS }, { -E_INVALID_PASSWD_OR_CORRUPTED_DB, SyncOperation::OP_NOTADB_OR_CORRUPTED }, - { -E_DISTRIBUTED_SCHEMA_NOT_FOUND, SyncOperation::OP_SCHEMA_INCOMPATIBLE } + { -E_DISTRIBUTED_SCHEMA_NOT_FOUND, SyncOperation::OP_SCHEMA_INCOMPATIBLE }, + { -E_FEEDBACK_DB_CLOSING, SyncOperation::OP_DB_CLOSING }, }; const auto &result = std::find_if(std::begin(stateNodes), std::end(stateNodes), [errCode](const auto &node) { return node.errCode == errCode; @@ -1148,6 +1160,7 @@ void SingleVerSyncStateMachine::DataRecvErrCodeHandle(uint32_t sessionId, int er case -E_NOT_REGISTER: case -E_NOT_SUPPORT: case -E_SECURITY_OPTION_CHECK_ERROR: + case -E_FEEDBACK_DB_CLOSING: context_->SetTaskErrCode(errCode); SwitchStateAndStep(Event::INNER_ERR_EVENT); break; @@ -1203,6 +1216,7 @@ void SingleVerSyncStateMachine::DataAckRecvErrCodeHandle(int errCode, bool handl case -E_NOT_REGISTER: case -E_NOT_SUPPORT: case -E_SECURITY_OPTION_CHECK_ERROR: + case -E_FEEDBACK_DB_CLOSING: if (handleError) { context_->SetTaskErrCode(errCode); } diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.h b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.h index 5a417f92b64..23d6c9f1fdd 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.h +++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.h @@ -67,6 +67,7 @@ public: RE_SEND_DATA_EVENT, CONTROL_CMD_EVENT, NEED_TIME_SYNC_EVENT, + NEED_RESYNC_EVENT, ANY_EVENT }; SingleVerSyncStateMachine(); diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp index e72dfa41585..15c5737dae1 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.cpp @@ -645,4 +645,19 @@ int32_t SingleVerSyncTaskContext::GetResponseTaskCount() } return taskCount; } + +bool SingleVerSyncTaskContext::IsNeedRetrySync(int errnoNo) +{ + if (errnoNo != E_FEEDBACK_DB_CLOSING) { + return false; + } + resyncTimes_++; + LOGI("resyncTimes_=%u", resyncTimes_); + return resyncTimes_ <= MANUAL_RETRY_TIMES; +} + +void SingleVerSyncTaskContext::ResetResyncTimes() +{ + resyncTimes_ = 0; +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.h b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.h index 66efa6acfbc..f67d495621d 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.h +++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_task_context.h @@ -147,6 +147,8 @@ public: int32_t GetResponseTaskCount() override; + bool IsNeedRetrySync(int errnoNo); + void ResetResyncTimes(); protected: ~SingleVerSyncTaskContext() override; void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam) override; @@ -191,6 +193,8 @@ private: // Initial Water Mark when the sync task launched. WaterMark initWaterMark_ = 0; WaterMark initDeletedMark_ = 0; + + uint32_t resyncTimes_ = 0; }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp index 42a09b82831..5fdd835b5e7 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp @@ -971,6 +971,8 @@ ICommunicator *SyncEngine::AllocCommunicator(const std::string &identifier, int return nullptr; } + errCode = communicator->RegOnExtendInfoCallback( + [this](ExtendInfo &extendInfo) { this->GetExtendInfo(extendInfo); }, []() {}); return communicator; } @@ -1422,4 +1424,13 @@ std::string SyncEngine::GetTargetUserId(const std::string &dev) RefObject::DecObjRef(communicator); return targetUserId; } + +void SyncEngine::GetExtendInfo(ExtendInfo &extendInfo) +{ + DBProperties properties = syncInterface_->GetDbProperties(); + extendInfo.appId = properties.GetStringProp(DBProperties::APP_ID, ""); + extendInfo.userId = properties.GetStringProp(DBProperties::USER_ID, ""); + extendInfo.storeId = properties.GetStringProp(DBProperties::STORE_ID, ""); + extendInfo.subUserId = properties.GetStringProp(DBProperties::SUB_USER, ""); +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h index cbed36ed923..920746ff82f 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h +++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h @@ -137,6 +137,8 @@ public: void TimeChange() override; int32_t GetResponseTaskCount() override; + + void GetExtendInfo(ExtendInfo &extendInfo); protected: // Create a context virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0; diff --git a/frameworks/libs/distributeddb/syncer/src/device/time_sync.cpp b/frameworks/libs/distributeddb/syncer/src/device/time_sync.cpp index d8bcdcc790f..868b01c6d94 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/time_sync.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/time_sync.cpp @@ -370,10 +370,16 @@ int TimeSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *i int TimeSync::AckRecv(const Message *message, uint32_t targetSessionId) { // only check when sessionId is not 0, because old version timesync sessionId is 0. - if (message != nullptr && message->GetSessionId() != 0 && - message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND && message->GetSessionId() == targetSessionId) { - LOGE("[AbilitySync][AckMsgCheck] Remote db is closed"); - return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND; + if (message != nullptr && message->GetSessionId() != 0 && message->GetSessionId() == targetSessionId) { + if (message->GetErrorNo() == E_FEEDBACK_COMMUNICATOR_NOT_FOUND) { + LOGE("[AbilitySync][AckMsgCheck] Remote db is closed"); + return -E_FEEDBACK_COMMUNICATOR_NOT_FOUND; + } + + if (message->GetErrorNo() == E_FEEDBACK_DB_CLOSING) { + LOGE("[AbilitySync][AckMsgCheck] Remote db is closing"); + return -E_FEEDBACK_DB_CLOSING; + } } if (!IsPacketValid(message, TYPE_RESPONSE)) { return -E_INVALID_ARGS; diff --git a/frameworks/libs/distributeddb/syncer/src/sync_operation.cpp b/frameworks/libs/distributeddb/syncer/src/sync_operation.cpp index 0e68d5fe649..bd44c542457 100644 --- a/frameworks/libs/distributeddb/syncer/src/sync_operation.cpp +++ b/frameworks/libs/distributeddb/syncer/src/sync_operation.cpp @@ -445,6 +445,7 @@ DBStatus SyncOperation::DBStatusTrans(int operationStatus) { static_cast(OP_USER_CHANGED), USER_CHANGED }, { static_cast(OP_DENIED_SQL), NO_PERMISSION }, { static_cast(OP_NOTADB_OR_CORRUPTED), INVALID_PASSWD_OR_CORRUPTED_DB }, + { static_cast(OP_DB_CLOSING), DB_CLOSING }, { static_cast(OP_FAILED), DB_ERROR }, }; const auto &result = std::find_if(std::begin(syncOperationStatusNodes), std::end(syncOperationStatusNodes), diff --git a/frameworks/libs/distributeddb/syncer/src/sync_operation.h b/frameworks/libs/distributeddb/syncer/src/sync_operation.h index f3f00a5de05..161a82155e9 100644 --- a/frameworks/libs/distributeddb/syncer/src/sync_operation.h +++ b/frameworks/libs/distributeddb/syncer/src/sync_operation.h @@ -57,6 +57,7 @@ public: OP_USER_CHANGED, OP_DENIED_SQL, OP_NOTADB_OR_CORRUPTED, + OP_DB_CLOSING, }; using UserCallback = std::function)>; diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_communicator.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_communicator.h index 8befecd235d..a043115db9a 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_communicator.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/mock_communicator.h @@ -33,6 +33,7 @@ public: MOCK_METHOD2(RegOnConnectCallback, int(const OnConnectCallback &, const Finalizer &)); MOCK_METHOD2(RegOnSendableCallback, int(const std::function &, const Finalizer &)); MOCK_METHOD2(RegOnMessageCallback, int(const OnMessageCallback &, const Finalizer &)); + MOCK_METHOD2(RegOnExtendInfoCallback, int(const OnExtendInfoCallback &, const Finalizer &)); MOCK_METHOD1(Activate, void(const std::string &)); MOCK_CONST_METHOD1(IsDeviceOnline, bool(const std::string &)); MOCK_CONST_METHOD1(GetTargetUserId, std::string(const ExtendInfo &)); diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp index a0384fc2b76..d1697b3406a 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.cpp @@ -41,6 +41,11 @@ int VirtualCommunicator::RegOnSendableCallback(const std::function & return E_OK; } +int VirtualCommunicator::RegOnExtendInfoCallback(const OnExtendInfoCallback &OnExtendInfo, const Finalizer &inOper) +{ + return E_OK; +} + void VirtualCommunicator::Activate(const std::string &userId) { } diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h index 9c7778ca199..87c63c118dd 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_communicator.h @@ -42,6 +42,7 @@ public: int RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper) override; int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override; int RegOnSendableCallback(const std::function &onSendable, const Finalizer &inOper) override; + int RegOnExtendInfoCallback(const OnExtendInfoCallback &OnExtendInfo, const Finalizer &inOper) override; void Activate(const std::string &userId = "") override; diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_time_sync_communicator.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_time_sync_communicator.cpp index a3a302aed23..89ca3f4f696 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_time_sync_communicator.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/virtual_time_sync_communicator.cpp @@ -46,6 +46,12 @@ int VirtualTimeSyncCommunicator::RegOnSendableCallback(const std::function &onSendable, const Finalizer &inOper) override; + int RegOnExtendInfoCallback(const OnExtendInfoCallback &OnExtendInfo, const Finalizer &inOper) override; void Activate(const std::string &userId = "") override; -- Gitee