diff --git a/services/channel/include/dcamera_softbus_adapter.h b/services/channel/include/dcamera_softbus_adapter.h index 9aa91b7e1119561bea9beb4aefcf611ee2ba89fa..beeaeb8872df4af3f7aeb7a03424a439cd384895 100644 --- a/services/channel/include/dcamera_softbus_adapter.h +++ b/services/channel/include/dcamera_softbus_adapter.h @@ -77,7 +77,7 @@ private: PeerSocketInfo info); int32_t DCameraSoftbusSourceGetSession(int32_t socket, std::shared_ptr& session); int32_t DCameraSoftbusSinkGetSession(int32_t socket, std::shared_ptr& session); - std::string FindSessNameByPeerSessName(const std::string peerSessionName); + void ReplaceSuffix(std::string &mySessNmRep, const std::string &suffix, const std::string &replacement); private: std::mutex optLock_; @@ -92,6 +92,7 @@ private: std::map sessionModeAndDataTypeMap_; std::mutex mySessionNamePeerDevIdLock_; std::map peerDevIdMySessionNameMap_; + std::map mySessionNameMapV2_; std::mutex mySocketSetLock_; std::set mySocketSet_; diff --git a/services/channel/src/dcamera_softbus_adapter.cpp b/services/channel/src/dcamera_softbus_adapter.cpp index d608e25bc3ae9650d8f1850595c9c5a3cc3992e4..632efb569f61c7e7ace6af511ea1b0dda2b401d1 100644 --- a/services/channel/src/dcamera_softbus_adapter.cpp +++ b/services/channel/src/dcamera_softbus_adapter.cpp @@ -36,20 +36,6 @@ static QosTV g_qosInfo[] = { { .qos = QOS_TYPE_MIN_LATENCY, .value = DCAMERA_QOS_TYPE_MIN_LATENCY} }; static uint32_t g_QosTV_Param_Index = static_cast(sizeof(g_qosInfo) / sizeof(QosTV)); -const static std::pair LOCAL_TO_PEER_SESS_NAME_MAP[] = { - {SESSION_HEAD + CAMERA_ID_PREFIX + DEVICE_ID_0 + RECEIVER_SESSION_NAME_CONTROL, - SESSION_HEAD + CAMERA_ID_PREFIX + DEVICE_ID_0 + SENDER_SESSION_NAME_CONTROL}, - {SESSION_HEAD + CAMERA_ID_PREFIX + DEVICE_ID_0 + RECEIVER_SESSION_NAME_DATA_SNAPSHOT, - SESSION_HEAD + CAMERA_ID_PREFIX + DEVICE_ID_0 + SENDER_SESSION_NAME_DATA_SNAPSHOT}, - {SESSION_HEAD + CAMERA_ID_PREFIX + DEVICE_ID_0 + RECEIVER_SESSION_NAME_DATA_CONTINUE, - SESSION_HEAD + CAMERA_ID_PREFIX + DEVICE_ID_0 + SENDER_SESSION_NAME_DATA_CONTINUE}, - {SESSION_HEAD + CAMERA_ID_PREFIX + DEVICE_ID_1 + RECEIVER_SESSION_NAME_CONTROL, - SESSION_HEAD + CAMERA_ID_PREFIX + DEVICE_ID_1 + SENDER_SESSION_NAME_CONTROL}, - {SESSION_HEAD + CAMERA_ID_PREFIX + DEVICE_ID_1 + RECEIVER_SESSION_NAME_DATA_SNAPSHOT, - SESSION_HEAD + CAMERA_ID_PREFIX + DEVICE_ID_1 + SENDER_SESSION_NAME_DATA_SNAPSHOT}, - {SESSION_HEAD + CAMERA_ID_PREFIX + DEVICE_ID_1 + RECEIVER_SESSION_NAME_DATA_CONTINUE, - SESSION_HEAD + CAMERA_ID_PREFIX + DEVICE_ID_1 + SENDER_SESSION_NAME_DATA_CONTINUE}, -}; } IMPLEMENT_SINGLE_INSTANCE(DCameraSoftbusAdapter); @@ -142,6 +128,19 @@ DCameraSoftbusAdapter::~DCameraSoftbusAdapter() { } +void DCameraSoftbusAdapter::ReplaceSuffix(std::string &mySessNmRep, const std::string &suffix, + const std::string &replacement) +{ + DHLOGI("replacing suffix in mySessionName: %{public}s", mySessNmRep.c_str()); + bool isModified = false; + if (mySessNmRep.length() >= suffix.length() && + mySessNmRep.compare(mySessNmRep.length() - suffix.length(), suffix.length(), suffix) == 0) { + mySessNmRep.replace(mySessNmRep.length() - suffix.length(), suffix.length(), replacement); + isModified = true; + } + DHLOGI("suffix replaced? %{public}s - Modified: %{public}s", isModified ? "Y" : "N", mySessNmRep.c_str()); +} + int32_t DCameraSoftbusAdapter::CreatSoftBusSinkSocketServer(std::string mySessionName, DCAMERA_CHANNEL_ROLE role, DCameraSessionMode sessionMode, std::string peerDevId, std::string peerSessionName) { @@ -186,9 +185,12 @@ int32_t DCameraSoftbusAdapter::CreatSoftBusSinkSocketServer(std::string mySessio std::lock_guard autoLock(mySessionNamePeerDevIdLock_); std::string peerDevIdMySessionName = peerDevId + std::string("_") + mySessionName; peerDevIdMySessionNameMap_[peerDevIdMySessionName] = mySessionName; + } else { + std::lock_guard autoLock(mySessionNamePeerDevIdLock_); + mySessionNameMapV2_[mySessionName] = mySessionName; } DHLOGI("create socket server end, mySessionName: %{public}s, peerSessionName: %{public}s", - GetAnonyString(mySessionName).c_str(), GetAnonyString(peerSessionName).c_str()); + GetAnonyString(mySessionName).c_str(), GetAnonyString(peerSessionName).c_str()); return DCAMERA_OK; } @@ -505,16 +507,6 @@ int32_t DCameraSoftbusAdapter::DCameraSoftbusSinkGetSession(int32_t socket, return DCAMERA_OK; } -std::string DCameraSoftbusAdapter::FindSessNameByPeerSessName(const std::string peerSessionName) -{ - auto foundItem = std::find_if(std::begin(LOCAL_TO_PEER_SESS_NAME_MAP), std::end(LOCAL_TO_PEER_SESS_NAME_MAP), - [&](const auto& item) { return item.first == peerSessionName; }); - if (foundItem != std::end(LOCAL_TO_PEER_SESS_NAME_MAP)) { - return foundItem->second; - } - return ""; -} - int32_t DCameraSoftbusAdapter::DCameraSoftBusGetSessionByPeerSocket(int32_t socket, std::shared_ptr &session, PeerSocketInfo info) { @@ -529,16 +521,23 @@ int32_t DCameraSoftbusAdapter::DCameraSoftBusGetSessionByPeerSocket(int32_t sock } mySessionName = sessionNameIter->second; } else { - mySessionName = FindSessNameByPeerSessName(info.name); - } - if (mySessionName.empty()) { - DHLOGE("find mySessionName is empty"); - return DCAMERA_BAD_VALUE; + std::lock_guard autoLock(mySessionNamePeerDevIdLock_); + mySessionName = info.name; + ReplaceSuffix(mySessionName, "_receiver", "_sender"); + auto sessionNameIter = mySessionNameMapV2_.find(mySessionName); + if (sessionNameIter == mySessionNameMapV2_.end()) { + DHLOGE("find session by peer socket error, socket %{public}d", socket); + return DCAMERA_NOT_FOUND; + } + mySessionName = sessionNameIter->second; + if (mySessionName.empty()) { + DHLOGE("find mySessionName is empty"); + return DCAMERA_BAD_VALUE; + } } - auto iter = sinkSessions_.find(std::string(mySessionName)); + auto iter = sinkSessions_.find(mySessionName); if (iter == sinkSessions_.end()) { - DHLOGE("find session by peer socket error, mySessionName %{public}s", - GetAnonyString(mySessionName).c_str()); + DHLOGE("find session by peer socket error, mySessionName %{public}s", GetAnonyString(mySessionName).c_str()); return DCAMERA_NOT_FOUND; } session = iter->second; diff --git a/services/data_process/include/pipeline/dcamera_pipeline_sink.h b/services/data_process/include/pipeline/dcamera_pipeline_sink.h index fdf0945888767c9b447dac54f5a5a0641b4e154f..1f4cac6a8b1ceaac5cf9590dd13be1b227540dbc 100644 --- a/services/data_process/include/pipeline/dcamera_pipeline_sink.h +++ b/services/data_process/include/pipeline/dcamera_pipeline_sink.h @@ -58,6 +58,9 @@ private: constexpr static int32_t MAX_VIDEO_HEIGHT = 1080; std::shared_ptr processListener_ = nullptr; + std::mutex processListenerMtx_; + std::condition_variable processListenerCond_; + std::shared_ptr pipelineHead_ = nullptr; bool isProcess_ = false; diff --git a/services/data_process/include/pipeline_node/multimedia_codec/encoder/encode_data_process.h b/services/data_process/include/pipeline_node/multimedia_codec/encoder/encode_data_process.h index 87b1dfff60d87452586ac37877f0a786e13f3c3e..ea3b151a9e076089db0f335cad97962e042413b3 100644 --- a/services/data_process/include/pipeline_node/multimedia_codec/encoder/encode_data_process.h +++ b/services/data_process/include/pipeline_node/multimedia_codec/encoder/encode_data_process.h @@ -132,6 +132,9 @@ private: sptr encodeProducerSurface_ = nullptr; std::atomic isEncoderProcess_ = false; + std::mutex isEncoderProcessMtx_; + std::condition_variable isEncoderProcessCond_; + int32_t waitEncoderOutputCount_ = 0; int64_t lastFeedEncoderInputBufferTimeUs_ = 0; int64_t inputTimeStampUs_ = 0; diff --git a/services/data_process/src/pipeline/dcamera_pipeline_sink.cpp b/services/data_process/src/pipeline/dcamera_pipeline_sink.cpp index c31033052bc1b2dbe4c96f268ffd15956fa53ef8..a4943e0b300fda48b30482639bda7553c2d01c06 100644 --- a/services/data_process/src/pipeline/dcamera_pipeline_sink.cpp +++ b/services/data_process/src/pipeline/dcamera_pipeline_sink.cpp @@ -65,6 +65,7 @@ int32_t DCameraPipelineSink::CreateDataProcessPipeline(PipelineType piplineType, } piplineType_ = piplineType; processListener_ = listener; + processListenerCond_.notify_all(); isProcess_ = true; return DCAMERA_OK; } @@ -177,6 +178,8 @@ void DCameraPipelineSink::OnError(DataProcessErrorType errorType) void DCameraPipelineSink::OnProcessedVideoBuffer(const std::shared_ptr& videoResult) { DHLOGD("Sink pipeline output the processed video buffer."); + std::unique_lock lock(processListenerMtx_); + processListenerCond_.wait(lock, [this] { return processListener_ != nullptr; }); if (processListener_ == nullptr) { DHLOGE("The process listener of sink pipeline is empty."); return; diff --git a/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process.cpp b/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process.cpp index 09bbc9344875d560ba72f12ee03672ffcc49d915..4728bc0e415dfd57a716cbece13acd843e83af26 100644 --- a/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process.cpp +++ b/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process.cpp @@ -74,6 +74,7 @@ int32_t EncodeDataProcess::InitNode(const VideoConfigParams& sourceConfig, const processedConfig_ = sourceConfig; processedConfig = processedConfig_; isEncoderProcess_.store(true); + isEncoderProcessCond_.notify_all(); return DCAMERA_OK; } @@ -85,6 +86,7 @@ int32_t EncodeDataProcess::InitNode(const VideoConfigParams& sourceConfig, const } processedConfig = processedConfig_; isEncoderProcess_.store(true); + isEncoderProcessCond_.notify_all(); return DCAMERA_OK; } @@ -460,7 +462,7 @@ int32_t EncodeDataProcess::GetEncoderOutputBuffer(uint32_t index, MediaAVCodec:: DHLOGE("Failed to get the output shared memory, index : %{public}u", index); return DCAMERA_BAD_OPERATE; } - + CHECK_AND_RETURN_RET_LOG(info.size <= 0 || info.size > DATABUFF_MAX_SIZE, DCAMERA_BAD_VALUE, "AVCodecBufferInfo error, buffer size : %{public}d", info.size); size_t outputMemoDataSize = static_cast(info.size); @@ -541,6 +543,8 @@ void EncodeDataProcess::OnOutputFormatChanged(const Media::Format &format) void EncodeDataProcess::OnOutputBufferAvailable(uint32_t index, MediaAVCodec::AVCodecBufferInfo info, MediaAVCodec::AVCodecBufferFlag flag, std::shared_ptr buffer) { + std::unique_lock lock(isEncoderProcessMtx_); + isEncoderProcessCond_.wait(lock, [this] { return isEncoderProcess_.load(); }); if (!isEncoderProcess_.load()) { DHLOGE("EncodeNode occurred error or start release."); return;