diff --git a/common/include/constants/distributed_camera_constants.h b/common/include/constants/distributed_camera_constants.h index e105f0e7a7487de840bdbc3b5fbabf7c57cf8b1b..6be0dc736e7a53c97f1968ec2526864a985924cd 100644 --- a/common/include/constants/distributed_camera_constants.h +++ b/common/include/constants/distributed_camera_constants.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023 Huawei Device Co., Ltd. + * Copyright (c) 2021-2024 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -149,6 +149,8 @@ const std::string PRODUCER = "producer"; const std::string REGISTER_SERVICE_NOTIFY = "regSvcNotify"; const std::string SINK_START_EVENT = "sinkStartEvent"; const std::string SOURCE_START_EVENT = "srcStartEvent"; +const std::string DECODE_DATA_EVENT = "srcDecEvent"; +const std::string PIPELINE_SRC_EVENT = "srcPipeEvent"; const std::string UNREGISTER_SERVICE_NOTIFY = "unregSvcNotify"; const std::string LOOPER_SMOOTH = "looperSmooth"; const std::string DUMP_PATH = "/data/data/dcamera/"; diff --git a/common/src/utils/dcamera_utils_tools.cpp b/common/src/utils/dcamera_utils_tools.cpp index 5739561444406ed0b4f38509c8c51a7c3f3a1b74..ce361daf38b25f930325c231803b0a5154438ba3 100644 --- a/common/src/utils/dcamera_utils_tools.cpp +++ b/common/src/utils/dcamera_utils_tools.cpp @@ -298,7 +298,7 @@ bool GetSysPara(const char *key, T &value) auto res = GetParameter(key, "-1", paraValue, sizeof(paraValue)); CHECK_AND_RETURN_RET_LOG(res <= 0, false, "GetParameter fail, key:%{public}s res:%{public}d", key, res); - DHLOGI("GetSysPara key:%{public}s value:%{public}s", key, paraValue); + DHLOGD("GetSysPara key:%{public}s value:%{public}s", key, paraValue); std::stringstream valueStr; valueStr << paraValue; valueStr >> value; @@ -328,7 +328,7 @@ FILE *DumpFileUtil::OpenDumpFileInner(std::string para, std::string fileName) g_lastPara[para] = dumpPara; return dumpFile; } - DHLOGI("%{public}s = %{public}s, filePath: %{public}s", para.c_str(), dumpPara.c_str(), path); + DHLOGD("%{public}s = %{public}s, filePath: %{public}s", para.c_str(), dumpPara.c_str(), path); if (dumpPara == "w") { dumpFile = fopen(path, "wb+"); CHECK_AND_RETURN_RET_LOG(dumpFile == nullptr, dumpFile, "Error opening dump file!"); diff --git a/interfaces/inner_kits/native_cpp/test/sourcefuzztest/decodeoninputbufferavailable_fuzzer/decodeoninputbufferavailable_fuzzer.cpp b/interfaces/inner_kits/native_cpp/test/sourcefuzztest/decodeoninputbufferavailable_fuzzer/decodeoninputbufferavailable_fuzzer.cpp index ebeb9c00a196548e55da8af608a33f3952b235c5..cd74fc4cbf7798049cd941c379e3e891611dd5c9 100644 --- a/interfaces/inner_kits/native_cpp/test/sourcefuzztest/decodeoninputbufferavailable_fuzzer/decodeoninputbufferavailable_fuzzer.cpp +++ b/interfaces/inner_kits/native_cpp/test/sourcefuzztest/decodeoninputbufferavailable_fuzzer/decodeoninputbufferavailable_fuzzer.cpp @@ -28,8 +28,7 @@ void DecodeOnInputBufferAvailableFuzzTest(const uint8_t* data, size_t size) uint32_t index = *(reinterpret_cast(data)); std::shared_ptr sourcePipeline = std::make_shared(); std::shared_ptr runner = AppExecFwk::EventRunner::Create(true); - std::shared_ptr pipeEventHandler = - std::make_shared(runner, sourcePipeline); + std::shared_ptr pipeEventHandler = std::make_shared(runner); std::shared_ptr decodeDataProcess = std::make_shared(pipeEventHandler, sourcePipeline); std::shared_ptr decodeVideoCallback = std::make_shared(decodeDataProcess); diff --git a/interfaces/inner_kits/native_cpp/test/sourcefuzztest/decodeonoutputbufferavailable_fuzzer/decodeonoutputbufferavailable_fuzzer.cpp b/interfaces/inner_kits/native_cpp/test/sourcefuzztest/decodeonoutputbufferavailable_fuzzer/decodeonoutputbufferavailable_fuzzer.cpp index 2bfa77aab76fb6702dc92cec3c9c9df9dedfd166..aed0eff52ccee29fae9d2a771107a42f57416747 100644 --- a/interfaces/inner_kits/native_cpp/test/sourcefuzztest/decodeonoutputbufferavailable_fuzzer/decodeonoutputbufferavailable_fuzzer.cpp +++ b/interfaces/inner_kits/native_cpp/test/sourcefuzztest/decodeonoutputbufferavailable_fuzzer/decodeonoutputbufferavailable_fuzzer.cpp @@ -34,8 +34,7 @@ void DecodeOnOutputBufferAvailableFuzzTest(const uint8_t* data, size_t size) MediaAVCodec::AVCodecBufferFlag flag = static_cast(bufferFlag); std::shared_ptr sourcePipeline = std::make_shared(); std::shared_ptr runner = AppExecFwk::EventRunner::Create(true); - std::shared_ptr pipeEventHandler = - std::make_shared(runner, sourcePipeline); + std::shared_ptr pipeEventHandler = std::make_shared(runner); std::shared_ptr decodeDataProcess = std::make_shared(pipeEventHandler, sourcePipeline); std::shared_ptr decodeVideoCallback = std::make_shared(decodeDataProcess); diff --git a/services/data_process/include/pipeline/dcamera_pipeline_source.h b/services/data_process/include/pipeline/dcamera_pipeline_source.h index d3304064672489a69836b2bd1188e794123618b1..f70f6adff5539a12dda9e4244e583ea35c405b57 100644 --- a/services/data_process/include/pipeline/dcamera_pipeline_source.h +++ b/services/data_process/include/pipeline/dcamera_pipeline_source.h @@ -31,7 +31,6 @@ namespace OHOS { namespace DistributedHardware { class DecodeDataProcess; -const uint32_t EVENT_PIPELINE_PROCESS_DATA = 0; class DCameraPipelineSource : public IDataProcessPipeline, public std::enable_shared_from_this { public: @@ -47,21 +46,11 @@ public: int32_t GetProperty(const std::string& propertyName, PropertyCarrier& propertyCarrier) override; - class DCameraPipelineSrcEventHandler : public AppExecFwk::EventHandler { - public: - DCameraPipelineSrcEventHandler(const std::shared_ptr &runner, - std::shared_ptr pipeSourcePtr); - ~DCameraPipelineSrcEventHandler() override = default; - void ProcessEvent(const AppExecFwk::InnerEvent::Pointer &event) override; - private: - std::weak_ptr pipeSourceWPtr_; - }; - private: bool IsInRange(const VideoConfigParams& curConfig); void InitDCameraPipEvent(); int32_t InitDCameraPipNodes(const VideoConfigParams& sourceConfig, const VideoConfigParams& targetConfig); - void DoProcessData(const AppExecFwk::InnerEvent::Pointer &event); + void StartEventHandler(); private: const static std::string PIPELINE_OWNER; @@ -74,11 +63,15 @@ private: std::shared_ptr processListener_ = nullptr; std::shared_ptr pipelineHead_ = nullptr; - std::shared_ptr pipeEventHandler_ = nullptr; bool isProcess_ = false; PipelineType piplineType_ = PipelineType::VIDEO; std::vector> pipNodeRanks_; + + std::mutex eventMutex_; + std::thread eventThread_; + std::condition_variable eventCon_; + std::shared_ptr pipeEventHandler_ = nullptr; }; } // namespace DistributedHardware } // namespace OHOS diff --git a/services/data_process/include/pipeline_node/multimedia_codec/decoder/decode_data_process.h b/services/data_process/include/pipeline_node/multimedia_codec/decoder/decode_data_process.h index 830de4afee5e862b000ee1e087ac1a5e76eccf5a..842c37d45bfa82919d5cc5dd351011fe0450883b 100644 --- a/services/data_process/include/pipeline_node/multimedia_codec/decoder/decode_data_process.h +++ b/services/data_process/include/pipeline_node/multimedia_codec/decoder/decode_data_process.h @@ -46,13 +46,10 @@ namespace OHOS { namespace DistributedHardware { class DCameraPipelineSource; class DecodeVideoCallback; -const uint32_t EVENT_NO_ACTION = 0; -const uint32_t EVENT_ACTION_ONCE_AGAIN = 1; -const uint32_t EVENT_ACTION_GET_DECODER_OUTPUT_BUFFER = 2; class DecodeDataProcess : public AbstractDataProcess, public std::enable_shared_from_this { public: - DecodeDataProcess(const std::shared_ptr& pipeEventHandler, + DecodeDataProcess(const std::shared_ptr& pipeEventHandler, const std::weak_ptr& callbackPipSource) : pipeSrcEventHandler_(pipeEventHandler), callbackPipelineSource_(callbackPipSource) {} ~DecodeDataProcess() override; @@ -75,16 +72,6 @@ public: int32_t GetProperty(const std::string& propertyName, PropertyCarrier& propertyCarrier) override; - class DecodeDataProcessEventHandler : public AppExecFwk::EventHandler { - public: - DecodeDataProcessEventHandler(const std::shared_ptr &runner, - std::shared_ptr decPtr); - ~DecodeDataProcessEventHandler() override = default; - void ProcessEvent(const AppExecFwk::InnerEvent::Pointer &event) override; - private: - std::weak_ptr decPtrWPtr_; - }; - private: bool IsInDecoderRange(const VideoConfigParams& curConfig); bool IsConvertible(const VideoConfigParams& sourceConfig, const VideoConfigParams& targetConfig); @@ -107,9 +94,7 @@ private: bool IsCorrectSurfaceBuffer(const sptr& surBuf, int32_t alignedWidth, int32_t alignedHeight); void PostOutputDataBuffers(std::shared_ptr& outputBuffer); int32_t DecodeDone(std::vector>& outputBuffers); - void ProcessFeedDecoderInputBuffer(); - void ProcessGetDecoderOutputBuffer(const AppExecFwk::InnerEvent::Pointer &event); - void ProcessDecodeDone(const AppExecFwk::InnerEvent::Pointer &event); + void StartEventHandler(); private: constexpr static int32_t VIDEO_DECODER_QUEUE_MAX = 1000; @@ -130,7 +115,7 @@ private: constexpr static int32_t ALIGNED_WIDTH_MAX_SIZE = 10000; constexpr static uint32_t MEMORY_RATIO_UV = 1; - std::shared_ptr pipeSrcEventHandler_; + std::shared_ptr pipeSrcEventHandler_; std::weak_ptr callbackPipelineSource_; std::mutex mtxDecoderLock_; std::mutex mtxDecoderState_; @@ -139,7 +124,6 @@ private: VideoConfigParams sourceConfig_; VideoConfigParams targetConfig_; VideoConfigParams processedConfig_; - std::shared_ptr decEventHandler_; std::shared_ptr videoDecoder_ = nullptr; std::shared_ptr decodeVideoCallback_ = nullptr; sptr decodeConsumerSurface_ = nullptr; @@ -161,6 +145,11 @@ private: std::deque frameInfoDeque_; FILE *dumpDecBeforeFile_ = nullptr; FILE *dumpDecAfterFile_ = nullptr; + + std::mutex eventMutex_; + std::thread eventThread_; + std::condition_variable eventCon_; + std::shared_ptr decEventHandler_ = nullptr; }; } // namespace DistributedHardware } // namespace OHOS diff --git a/services/data_process/src/pipeline/dcamera_pipeline_source.cpp b/services/data_process/src/pipeline/dcamera_pipeline_source.cpp index a4f5ac6ac522da2b18230daaebf2468d4e4fdcdc..f9a3366d6a783da369b8dd41cd4440871ee7dfbd 100644 --- a/services/data_process/src/pipeline/dcamera_pipeline_source.cpp +++ b/services/data_process/src/pipeline/dcamera_pipeline_source.cpp @@ -17,10 +17,11 @@ #include "dcamera_hitrace_adapter.h" #include "distributed_hardware_log.h" - +#include "distributed_camera_constants.h" #include "decode_data_process.h" #include "fps_controller_process.h" #include "scale_convert_process.h" +#include namespace OHOS { namespace DistributedHardware { @@ -83,9 +84,23 @@ bool DCameraPipelineSource::IsInRange(const VideoConfigParams& curConfig) void DCameraPipelineSource::InitDCameraPipEvent() { DHLOGD("Init source DCamera pipeline event to asynchronously process data."); - std::shared_ptr runner = AppExecFwk::EventRunner::Create(true); - pipeEventHandler_ = std::make_shared( - runner, shared_from_this()); + eventThread_ = std::thread(&DCameraPipelineSource::StartEventHandler, this); + std::unique_lock lock(eventMutex_); + eventCon_.wait(lock, [this] { + return pipeEventHandler_ != nullptr; + }); +} + +void DCameraPipelineSource::StartEventHandler() +{ + prctl(PR_SET_NAME, PIPELINE_SRC_EVENT.c_str()); + auto runner = AppExecFwk::EventRunner::Create(false); + { + std::lock_guard lock(eventMutex_); + pipeEventHandler_ = std::make_shared(runner); + } + eventCon_.notify_one(); + runner->Run(); } int32_t DCameraPipelineSource::InitDCameraPipNodes(const VideoConfigParams& sourceConfig, @@ -166,10 +181,18 @@ int32_t DCameraPipelineSource::ProcessData(std::vector pipConfigSource = std::make_shared(piplineType_, PIPELINE_OWNER, dataBuffers); + std::vector> inputBuffers = pipConfigSource->GetDataBuffers(); + if (inputBuffers.empty()) { + DHLOGE("Receiving process data buffers is empty in source pipeline."); + OnError(ERROR_PIPELINE_EVENTBUS); + return DCAMERA_BAD_VALUE; + } + auto sendFunc = [this, inputBuffers]() mutable { + int32_t ret = pipelineHead_->ProcessData(inputBuffers); + DHLOGD("excute ProcessData ret %{public}d.", ret); + }; CHECK_AND_RETURN_RET_LOG(pipeEventHandler_ == nullptr, DCAMERA_BAD_VALUE, "pipeEventHandler_ is nullptr."); - AppExecFwk::InnerEvent::Pointer msgEvent = - AppExecFwk::InnerEvent::Get(EVENT_PIPELINE_PROCESS_DATA, pipConfigSource, 0); - pipeEventHandler_->SendEvent(msgEvent, 0, AppExecFwk::EventQueue::Priority::IMMEDIATE); + pipeEventHandler_->PostTask(sendFunc); return DCAMERA_OK; } @@ -182,6 +205,10 @@ void DCameraPipelineSource::DestroyDataProcessPipeline() pipelineHead_->ReleaseProcessNode(); pipelineHead_ = nullptr; } + if ((pipeEventHandler_ != nullptr) && (pipeEventHandler_->GetEventRunner() != nullptr)) { + pipeEventHandler_->GetEventRunner()->Stop(); + eventThread_.join(); + } pipeEventHandler_ = nullptr; processListener_ = nullptr; pipNodeRanks_.clear(); @@ -189,24 +216,6 @@ void DCameraPipelineSource::DestroyDataProcessPipeline() DHLOGD("Destroy source data process pipeline end."); } -void DCameraPipelineSource::DoProcessData(const AppExecFwk::InnerEvent::Pointer &event) -{ - DHLOGD("Receive asynchronous event then start process data in source pipeline."); - std::shared_ptr pipelineConfig = event->GetSharedObject(); - if (pipelineConfig == nullptr) { - DHLOGE("pipeline config is nullptr."); - OnError(ERROR_PIPELINE_EVENTBUS); - return; - } - std::vector> inputBuffers = pipelineConfig->GetDataBuffers(); - if (inputBuffers.empty()) { - DHLOGE("Receiving process data buffers is empty in source pipeline."); - OnError(ERROR_PIPELINE_EVENTBUS); - return; - } - pipelineHead_->ProcessData(inputBuffers); -} - void DCameraPipelineSource::OnError(DataProcessErrorType errorType) { DHLOGE("A runtime error occurred in the source pipeline."); @@ -232,31 +241,5 @@ int32_t DCameraPipelineSource::GetProperty(const std::string& propertyName, Prop { return DCAMERA_OK; } - -DCameraPipelineSource::DCameraPipelineSrcEventHandler::DCameraPipelineSrcEventHandler( - const std::shared_ptr &runner, std::shared_ptr pipeSourcePtr) - : AppExecFwk::EventHandler(runner), pipeSourceWPtr_(pipeSourcePtr) -{ - DHLOGI("Ctor DCameraPipelineSrcEventHandler."); -} - -void DCameraPipelineSource::DCameraPipelineSrcEventHandler::ProcessEvent(const AppExecFwk::InnerEvent::Pointer &event) -{ - CHECK_AND_RETURN_LOG(event == nullptr, "event is nullptr."); - uint32_t eventId = event->GetInnerEventId(); - auto pipeSrc = pipeSourceWPtr_.lock(); - if (pipeSrc == nullptr) { - DHLOGE("Can not get strong self ptr"); - return; - } - switch (eventId) { - case EVENT_PIPELINE_PROCESS_DATA: - pipeSrc->DoProcessData(event); - break; - default: - DHLOGE("event is undefined, id is %{public}d", eventId); - break; - } -} } // namespace DistributedHardware } // namespace OHOS diff --git a/services/data_process/src/pipeline_node/multimedia_codec/decoder/decode_data_process.cpp b/services/data_process/src/pipeline_node/multimedia_codec/decoder/decode_data_process.cpp index 06388c1a78d74752e470f56f5e1e37f6cbdb225e..7129d857625a2db24e1cf31f1b328563711a2155 100644 --- a/services/data_process/src/pipeline_node/multimedia_codec/decoder/decode_data_process.cpp +++ b/services/data_process/src/pipeline_node/multimedia_codec/decoder/decode_data_process.cpp @@ -22,6 +22,7 @@ #include "decode_surface_listener.h" #include "decode_video_callback.h" #include "graphic_common_c.h" +#include namespace OHOS { namespace DistributedHardware { @@ -94,9 +95,23 @@ bool DecodeDataProcess::IsConvertible(const VideoConfigParams& sourceConfig, con void DecodeDataProcess::InitCodecEvent() { DHLOGD("Init DecodeNode eventBus, and add handler for it."); - std::shared_ptr decRunner = AppExecFwk::EventRunner::Create(true); - decEventHandler_ = std::make_shared( - decRunner, shared_from_this()); + eventThread_ = std::thread(&DecodeDataProcess::StartEventHandler, this); + std::unique_lock lock(eventMutex_); + eventCon_.wait(lock, [this] { + return decEventHandler_ != nullptr; + }); +} + +void DecodeDataProcess::StartEventHandler() +{ + prctl(PR_SET_NAME, DECODE_DATA_EVENT.c_str()); + auto runner = AppExecFwk::EventRunner::Create(false); + { + std::lock_guard lock(eventMutex_); + decEventHandler_ = std::make_shared(runner); + } + eventCon_.notify_one(); + runner->Run(); } int32_t DecodeDataProcess::InitDecoder() @@ -287,6 +302,10 @@ void DecodeDataProcess::ReleaseDecoderSurface() void DecodeDataProcess::ReleaseCodecEvent() { + if ((decEventHandler_ != nullptr) && (decEventHandler_->GetEventRunner() != nullptr)) { + decEventHandler_->GetEventRunner()->Stop(); + eventThread_.join(); + } decEventHandler_ = nullptr; pipeSrcEventHandler_ = nullptr; DHLOGD("Release DecodeNode eventBusDecode and eventBusPipeline end."); @@ -356,13 +375,13 @@ int32_t DecodeDataProcess::ProcessData(std::vector>& int32_t sleepTimeUs = 5000; std::this_thread::sleep_for(std::chrono::microseconds(sleepTimeUs)); DHLOGD("Feed decoder input buffer failed. Try FeedDecoderInputBuffer again."); - std::shared_ptr reFeedInputPacket = std::make_shared(); - reFeedInputPacket->SetVideoCodecType(sourceConfig_.GetVideoCodecType()); + auto sendFunc = [this]() mutable { + int32_t ret = FeedDecoderInputBuffer(); + DHLOGD("excute FeedDecoderInputBuffer ret %{public}d.", ret); + }; CHECK_AND_RETURN_RET_LOG(pipeSrcEventHandler_ == nullptr, DCAMERA_BAD_VALUE, "%{public}s", "pipeSrcEventHandler_ is nullptr."); - AppExecFwk::InnerEvent::Pointer msgEvent = - AppExecFwk::InnerEvent::Get(EVENT_ACTION_ONCE_AGAIN, reFeedInputPacket, 0); - pipeSrcEventHandler_->SendEvent(msgEvent, 0, AppExecFwk::EventQueue::Priority::IMMEDIATE); + pipeSrcEventHandler_->PostTask(sendFunc); } return DCAMERA_OK; } @@ -469,11 +488,13 @@ void DecodeDataProcess::ReduceWaitDecodeCnt() void DecodeDataProcess::OnSurfaceOutputBufferAvailable(const sptr& surface) { - std::shared_ptr bufferPkt = std::make_shared(surface); - CHECK_AND_RETURN_LOG(decEventHandler_ == nullptr, "decEventHandler is nullptr."); - AppExecFwk::InnerEvent::Pointer msgEvent = - AppExecFwk::InnerEvent::Get(EVENT_ACTION_GET_DECODER_OUTPUT_BUFFER, bufferPkt, 0); - decEventHandler_->SendEvent(msgEvent, 0, AppExecFwk::EventQueue::Priority::IMMEDIATE); + auto sendFunc = [this, surface]() mutable { + GetDecoderOutputBuffer(surface); + DHLOGD("excute GetDecoderOutputBuffer."); + }; + if (decEventHandler_ != nullptr) { + decEventHandler_->PostTask(sendFunc); + } } void DecodeDataProcess::GetDecoderOutputBuffer(const sptr& surface) @@ -588,13 +609,15 @@ void DecodeDataProcess::PostOutputDataBuffers(std::shared_ptr& outpu DHLOGE("decEventHandler_ or outputBuffer is null."); return; } - std::vector> multiDataBuffers; - multiDataBuffers.push_back(outputBuffer); - std::shared_ptr transNextNodePacket = std::make_shared(VideoCodecType::NO_CODEC, - multiDataBuffers); - AppExecFwk::InnerEvent::Pointer msgEvent = - AppExecFwk::InnerEvent::Get(EVENT_NO_ACTION, transNextNodePacket, 0); - decEventHandler_->SendEvent(msgEvent, 0, AppExecFwk::EventQueue::Priority::IMMEDIATE); + auto sendFunc = [this, outputBuffer]() mutable { + std::vector> multiDataBuffers; + multiDataBuffers.push_back(outputBuffer); + int32_t ret = DecodeDone(multiDataBuffers); + DHLOGD("excute DecodeDone ret %{public}d.", ret); + }; + if (decEventHandler_ != nullptr) { + decEventHandler_->PostTask(sendFunc); + } DHLOGD("Send video decoder output asynchronous DCameraCodecEvents success."); } @@ -624,67 +647,6 @@ int32_t DecodeDataProcess::DecodeDone(std::vector>& return DCAMERA_OK; } -DecodeDataProcess::DecodeDataProcessEventHandler::DecodeDataProcessEventHandler( - const std::shared_ptr &runner, std::shared_ptr decPtr) - : AppExecFwk::EventHandler(runner), decPtrWPtr_(decPtr) -{ - DHLOGI("Ctor DecodeDataProcessEventHandler."); -} - -void DecodeDataProcess::DecodeDataProcessEventHandler::ProcessEvent(const AppExecFwk::InnerEvent::Pointer &event) -{ - CHECK_AND_RETURN_LOG(event == nullptr, "event is nullptr."); - uint32_t eventId = event->GetInnerEventId(); - auto decPtr = decPtrWPtr_.lock(); - if (decPtr == nullptr) { - DHLOGE("Can not get strong self ptr"); - return; - } - switch (eventId) { - case EVENT_NO_ACTION: - decPtr->ProcessDecodeDone(event); - break; - case EVENT_ACTION_ONCE_AGAIN: - DHLOGD("Try FeedDecoderInputBuffer again."); - decPtr->ProcessFeedDecoderInputBuffer(); - return; - case EVENT_ACTION_GET_DECODER_OUTPUT_BUFFER: - decPtr->ProcessGetDecoderOutputBuffer(event); - break; - default: - DHLOGD("The action : %{public}d is not supported.", eventId); - return; - } -} - -void DecodeDataProcess::ProcessFeedDecoderInputBuffer() -{ - FeedDecoderInputBuffer(); -} - -void DecodeDataProcess::ProcessGetDecoderOutputBuffer(const AppExecFwk::InnerEvent::Pointer &event) -{ - std::shared_ptr receivedCodecPacket = event->GetSharedObject(); - if (receivedCodecPacket == nullptr) { - DHLOGE("the received codecPacket of eventId [%{public}d] is null.", EVENT_ACTION_GET_DECODER_OUTPUT_BUFFER); - OnError(); - return; - } - GetDecoderOutputBuffer(receivedCodecPacket->GetSurface()); -} - -void DecodeDataProcess::ProcessDecodeDone(const AppExecFwk::InnerEvent::Pointer &event) -{ - std::shared_ptr receivedCodecPacket = event->GetSharedObject(); - if (receivedCodecPacket == nullptr) { - DHLOGE("the received codecPacket of eventId [%{public}d] is null.", EVENT_NO_ACTION); - OnError(); - return; - } - std::vector> dataBuffers = receivedCodecPacket->GetDataBuffers(); - DecodeDone(dataBuffers); -} - void DecodeDataProcess::OnError() { DHLOGD("DecodeDataProcess : OnError."); diff --git a/services/data_process/src/pipeline_node/multimedia_codec/decoder/decode_data_process_common.cpp b/services/data_process/src/pipeline_node/multimedia_codec/decoder/decode_data_process_common.cpp index 26adfe13fdc1a615b03f943966f0e77635eeda96..8e4abf45befe5f6ab16faf6571167e862f58ddbe 100644 --- a/services/data_process/src/pipeline_node/multimedia_codec/decoder/decode_data_process_common.cpp +++ b/services/data_process/src/pipeline_node/multimedia_codec/decoder/decode_data_process_common.cpp @@ -14,14 +14,15 @@ */ #include "decode_data_process.h" + #include "distributed_camera_constants.h" #include "distributed_hardware_log.h" -#include "graphic_common_c.h" - #include "dcamera_hisysevent_adapter.h" #include "dcamera_hidumper.h" #include "decode_surface_listener.h" #include "decode_video_callback.h" +#include "graphic_common_c.h" +#include namespace OHOS { namespace DistributedHardware { @@ -47,6 +48,7 @@ int32_t DecodeDataProcess::InitNode(const VideoConfigParams& sourceConfig, const DHLOGE("Source config or target config are invalid."); return DCAMERA_BAD_VALUE; } + if (!IsConvertible(sourceConfig, targetConfig)) { DHLOGE("The DecodeNode can't convert %{public}d to %{public}d.", sourceConfig.GetVideoCodecType(), targetConfig_.GetVideoCodecType()); @@ -93,9 +95,23 @@ bool DecodeDataProcess::IsConvertible(const VideoConfigParams& sourceConfig, con void DecodeDataProcess::InitCodecEvent() { DHLOGD("Init DecodeNode eventBus, and add handler for it."); - std::shared_ptr decRunner = AppExecFwk::EventRunner::Create(true); - decEventHandler_ = std::make_shared( - decRunner, shared_from_this()); + eventThread_ = std::thread(&DecodeDataProcess::StartEventHandler, this); + std::unique_lock lock(eventMutex_); + eventCon_.wait(lock, [this] { + return decEventHandler_ != nullptr; + }); +} + +void DecodeDataProcess::StartEventHandler() +{ + prctl(PR_SET_NAME, DECODE_DATA_EVENT.c_str()); + auto runner = AppExecFwk::EventRunner::Create(false); + { + std::lock_guard lock(eventMutex_); + decEventHandler_ = std::make_shared(runner); + } + eventCon_.notify_one(); + runner->Run(); } int32_t DecodeDataProcess::InitDecoder() @@ -333,6 +349,10 @@ void DecodeDataProcess::ReleaseDecoderSurface() void DecodeDataProcess::ReleaseCodecEvent() { + if ((decEventHandler_ != nullptr) && (decEventHandler_->GetEventRunner() != nullptr)) { + decEventHandler_->GetEventRunner()->Stop(); + eventThread_.join(); + } decEventHandler_ = nullptr; pipeSrcEventHandler_ = nullptr; DHLOGD("Release DecodeNode eventBusDecode and eventBusPipeline end."); @@ -402,13 +422,13 @@ int32_t DecodeDataProcess::ProcessData(std::vector>& int32_t sleepTimeUs = 5000; std::this_thread::sleep_for(std::chrono::microseconds(sleepTimeUs)); DHLOGD("Feed decoder input buffer failed. Try FeedDecoderInputBuffer again."); - std::shared_ptr reFeedInputPacket = std::make_shared(); - reFeedInputPacket->SetVideoCodecType(sourceConfig_.GetVideoCodecType()); + auto sendFunc = [this]() mutable { + int32_t ret = FeedDecoderInputBuffer(); + DHLOGD("excute FeedDecoderInputBuffer ret %{public}d.", ret); + }; CHECK_AND_RETURN_RET_LOG(pipeSrcEventHandler_ == nullptr, DCAMERA_BAD_VALUE, "%{public}s", "pipeSrcEventHandler_ is nullptr."); - AppExecFwk::InnerEvent::Pointer msgEvent = - AppExecFwk::InnerEvent::Get(EVENT_ACTION_ONCE_AGAIN, reFeedInputPacket, 0); - pipeSrcEventHandler_->SendEvent(msgEvent, 0, AppExecFwk::EventQueue::Priority::IMMEDIATE); + pipeSrcEventHandler_->PostTask(sendFunc); } return DCAMERA_OK; } @@ -515,11 +535,13 @@ void DecodeDataProcess::ReduceWaitDecodeCnt() void DecodeDataProcess::OnSurfaceOutputBufferAvailable(const sptr& surface) { - std::shared_ptr bufferPkt = std::make_shared(surface); - CHECK_AND_RETURN_LOG(decEventHandler_ == nullptr, "decEventHandler is nullptr."); - AppExecFwk::InnerEvent::Pointer msgEvent = - AppExecFwk::InnerEvent::Get(EVENT_ACTION_GET_DECODER_OUTPUT_BUFFER, bufferPkt, 0); - decEventHandler_->SendEvent(msgEvent, 0, AppExecFwk::EventQueue::Priority::IMMEDIATE); + auto sendFunc = [this, surface]() mutable { + GetDecoderOutputBuffer(surface); + DHLOGD("excute GetDecoderOutputBuffer."); + }; + if (decEventHandler_ != nullptr) { + decEventHandler_->PostTask(sendFunc); + } } void DecodeDataProcess::GetDecoderOutputBuffer(const sptr& surface) @@ -634,13 +656,15 @@ void DecodeDataProcess::PostOutputDataBuffers(std::shared_ptr& outpu DHLOGE("decEventHandler_ or outputBuffer is null."); return; } - std::vector> multiDataBuffers; - multiDataBuffers.push_back(outputBuffer); - std::shared_ptr transNextNodePacket = std::make_shared(VideoCodecType::NO_CODEC, - multiDataBuffers); - AppExecFwk::InnerEvent::Pointer msgEvent = - AppExecFwk::InnerEvent::Get(EVENT_NO_ACTION, transNextNodePacket, 0); - decEventHandler_->SendEvent(msgEvent, 0, AppExecFwk::EventQueue::Priority::IMMEDIATE); + auto sendFunc = [this, outputBuffer]() mutable { + std::vector> multiDataBuffers; + multiDataBuffers.push_back(outputBuffer); + int32_t ret = DecodeDone(multiDataBuffers); + DHLOGD("excute DecodeDone ret %{public}d.", ret); + }; + if (decEventHandler_ != nullptr) { + decEventHandler_->PostTask(sendFunc); + } DHLOGD("Send video decoder output asynchronous DCameraCodecEvents success."); } @@ -670,67 +694,6 @@ int32_t DecodeDataProcess::DecodeDone(std::vector>& return DCAMERA_OK; } -DecodeDataProcess::DecodeDataProcessEventHandler::DecodeDataProcessEventHandler( - const std::shared_ptr &runner, std::shared_ptr decPtr) - : AppExecFwk::EventHandler(runner), decPtrWPtr_(decPtr) -{ - DHLOGI("Ctor DecodeDataProcessEventHandler."); -} - -void DecodeDataProcess::DecodeDataProcessEventHandler::ProcessEvent(const AppExecFwk::InnerEvent::Pointer &event) -{ - CHECK_AND_RETURN_LOG(event == nullptr, "event is nullptr."); - uint32_t eventId = event->GetInnerEventId(); - auto decPtr = decPtrWPtr_.lock(); - if (decPtr == nullptr) { - DHLOGE("Can not get strong self ptr"); - return; - } - switch (eventId) { - case EVENT_NO_ACTION: - decPtr->ProcessDecodeDone(event); - break; - case EVENT_ACTION_ONCE_AGAIN: - DHLOGD("Try FeedDecoderInputBuffer again."); - decPtr->ProcessFeedDecoderInputBuffer(); - return; - case EVENT_ACTION_GET_DECODER_OUTPUT_BUFFER: - decPtr->ProcessGetDecoderOutputBuffer(event); - break; - default: - DHLOGD("The action : %{public}d is not supported.", eventId); - return; - } -} - -void DecodeDataProcess::ProcessFeedDecoderInputBuffer() -{ - FeedDecoderInputBuffer(); -} - -void DecodeDataProcess::ProcessGetDecoderOutputBuffer(const AppExecFwk::InnerEvent::Pointer &event) -{ - std::shared_ptr receivedCodecPacket = event->GetSharedObject(); - if (receivedCodecPacket == nullptr) { - DHLOGE("the received codecPacket of eventId [%{public}d] is null.", EVENT_ACTION_GET_DECODER_OUTPUT_BUFFER); - OnError(); - return; - } - GetDecoderOutputBuffer(receivedCodecPacket->GetSurface()); -} - -void DecodeDataProcess::ProcessDecodeDone(const AppExecFwk::InnerEvent::Pointer &event) -{ - std::shared_ptr receivedCodecPacket = event->GetSharedObject(); - if (receivedCodecPacket == nullptr) { - DHLOGE("the received codecPacket of eventId [%{public}d] is null.", EVENT_NO_ACTION); - OnError(); - return; - } - std::vector> dataBuffers = receivedCodecPacket->GetDataBuffers(); - DecodeDone(dataBuffers); -} - void DecodeDataProcess::OnError() { DHLOGD("DecodeDataProcess : OnError."); diff --git a/services/data_process/test/unittest/common/pipeline/dcamera_pipeline_source_test.cpp b/services/data_process/test/unittest/common/pipeline/dcamera_pipeline_source_test.cpp index 060bda58f90ebcb0e589f3eae9bb51ff9ba89e09..255b70a542cf63ded7acfa9861d214d63079c6fc 100644 --- a/services/data_process/test/unittest/common/pipeline/dcamera_pipeline_source_test.cpp +++ b/services/data_process/test/unittest/common/pipeline/dcamera_pipeline_source_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Huawei Device Co., Ltd. + * Copyright (c) 2022-2024 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -41,7 +41,6 @@ const int32_t TEST_HEIGTH = 1080; const int32_t TEST_WIDTH2 = 640; const int32_t TEST_HEIGTH2 = 480; const int32_t SLEEP_TIME = 200000; -constexpr uint32_t EVENT_FRAME_TRIGGER = 1; } void DCameraPipelineSourceTest::SetUpTestCase(void) @@ -245,22 +244,5 @@ HWTEST_F(DCameraPipelineSourceTest, dcamera_pipeline_source_test_008, TestSize.L testPipelineSource_->OnProcessedVideoBuffer(videoResult); EXPECT_TRUE(true); } - -/** - * @tc.name: dcamera_pipeline_source_test_009 - * @tc.desc: Verify pipeline source DoProcessData abnormal. - * @tc.type: FUNC - * @tc.require: Issue Number - */ -HWTEST_F(DCameraPipelineSourceTest, dcamera_pipeline_source_test_009, TestSize.Level1) -{ - EXPECT_EQ(false, testPipelineSource_ == nullptr); - - std::shared_ptr param = std::make_shared(""); - AppExecFwk::InnerEvent::Pointer triggerEvent = - AppExecFwk::InnerEvent::Get(EVENT_FRAME_TRIGGER, param, 0); - testPipelineSource_->DoProcessData(triggerEvent); - EXPECT_TRUE(true); -} } // namespace DistributedHardware } // namespace OHOS diff --git a/services/data_process/test/unittest/common/pipeline_node/decode_data_process_test.cpp b/services/data_process/test/unittest/common/pipeline_node/decode_data_process_test.cpp index 471b3126065cc9513d100efdcc14b02cf6bb1e9f..f8ed441a8e93053358a153ab0b363eb13739c110 100644 --- a/services/data_process/test/unittest/common/pipeline_node/decode_data_process_test.cpp +++ b/services/data_process/test/unittest/common/pipeline_node/decode_data_process_test.cpp @@ -34,7 +34,6 @@ public: void TearDown(); std::shared_ptr testDecodeDataProcess_; - std::shared_ptr pipeEventHandler_; std::shared_ptr sourcePipeline_; }; @@ -61,9 +60,8 @@ void DecodeDataProcessTest::SetUp(void) DHLOGI("DecodeDataProcessTest SetUp"); sourcePipeline_ = std::make_shared(); std::shared_ptr runner = AppExecFwk::EventRunner::Create(true); - pipeEventHandler_ = std::make_shared( - runner, sourcePipeline_); - testDecodeDataProcess_ = std::make_shared(pipeEventHandler_, sourcePipeline_); + std::shared_ptr pipeEventHandler = std::make_shared(runner); + testDecodeDataProcess_ = std::make_shared(pipeEventHandler, sourcePipeline_); } void DecodeDataProcessTest::TearDown(void) @@ -71,7 +69,6 @@ void DecodeDataProcessTest::TearDown(void) DHLOGI("DecodeDataProcessTest TearDown"); usleep(SLEEP_TIME); sourcePipeline_ = nullptr; - pipeEventHandler_ = nullptr; testDecodeDataProcess_ = nullptr; } @@ -405,6 +402,7 @@ HWTEST_F(DecodeDataProcessTest, decode_data_process_test_011, TestSize.Level1) std::shared_ptr db = std::make_shared(capacity); inputBuffers.push_back(db); rc = testDecodeDataProcess_->ProcessData(inputBuffers); + testDecodeDataProcess_->isDecoderProcess_.store(true); EXPECT_EQ(rc, DCAMERA_DISABLE_PROCESS); } @@ -596,6 +594,7 @@ HWTEST_F(DecodeDataProcessTest, decode_data_process_test_016, TestSize.Level1) MediaAVCodec::AVCodecBufferFlag flag = MediaAVCodec::AVCODEC_BUFFER_FLAG_CODEC_DATA; testDecodeDataProcess_->OnOutputBufferAvailable(index, info, flag, buffer); testDecodeDataProcess_->OnError(); + testDecodeDataProcess_->isDecoderProcess_.store(true); EXPECT_EQ(rc, DCAMERA_OK); } } // namespace DistributedHardware