diff --git a/services/cameraservice/sinkservice/include/distributedcameramgr/dcamera_sink_data_process.h b/services/cameraservice/sinkservice/include/distributedcameramgr/dcamera_sink_data_process.h index 313ade9a0e37a838aef3499020dc3c43deebc525..ec71147d053908b8fd7ee2336f1111812fce7308 100644 --- a/services/cameraservice/sinkservice/include/distributedcameramgr/dcamera_sink_data_process.h +++ b/services/cameraservice/sinkservice/include/distributedcameramgr/dcamera_sink_data_process.h @@ -16,31 +16,28 @@ #ifndef OHOS_DCAMERA_SINK_DATA_PROCESS_H #define OHOS_DCAMERA_SINK_DATA_PROCESS_H -#include "event_bus.h" -#include "dcamera_photo_output_event.h" -#include "dcamera_video_output_event.h" -#include "icamera_sink_data_process.h" +#include +#include +#include +#include "event_handler.h" #include "icamera_channel.h" +#include "icamera_sink_data_process.h" #include "idata_process_pipeline.h" #include "image_common_type.h" namespace OHOS { namespace DistributedHardware { -class DCameraSinkDataProcess : public ICameraSinkDataProcess, public EventSender, - public DistributedHardware::EventBusHandler, - public DistributedHardware::EventBusHandler, +class DCameraSinkDataProcess : public ICameraSinkDataProcess, public std::enable_shared_from_this { public: DCameraSinkDataProcess(const std::string& dhId, std::shared_ptr& channel); - ~DCameraSinkDataProcess() override = default; + ~DCameraSinkDataProcess(); int32_t StartCapture(std::shared_ptr& captureInfo) override; int32_t StopCapture() override; int32_t FeedStream(std::shared_ptr& dataBuffer) override; - - void OnEvent(DCameraPhotoOutputEvent& event) override; - void OnEvent(DCameraVideoOutputEvent& event) override; + void Init() override; void OnProcessedVideoBuffer(const std::shared_ptr& videoResult); void OnError(DataProcessErrorType errorType); @@ -51,12 +48,18 @@ private: int32_t FeedStreamInner(std::shared_ptr& dataBuffer); VideoCodecType GetPipelineCodecType(DCEncodeType encodeType); Videoformat GetPipelineFormat(int32_t format); + void StartEventHandler(); + void SendDataAsync(const std::shared_ptr& buffer); std::string dhId_; std::shared_ptr captureInfo_; - std::shared_ptr eventBus_; std::shared_ptr channel_; std::shared_ptr pipeline_; + + std::mutex eventMutex_; + std::thread eventThread_; + std::condition_variable eventCon_; + std::shared_ptr eventHandler_; }; } // namespace DistributedHardware } // namespace OHOS diff --git a/services/cameraservice/sinkservice/include/distributedcameramgr/interface/icamera_sink_data_process.h b/services/cameraservice/sinkservice/include/distributedcameramgr/interface/icamera_sink_data_process.h index 87ffc2299e279f164b9ffd33f27da4886aa2abed..1abf70be7f6052b600e97ff2e91d86bc10dfd6d8 100644 --- a/services/cameraservice/sinkservice/include/distributedcameramgr/interface/icamera_sink_data_process.h +++ b/services/cameraservice/sinkservice/include/distributedcameramgr/interface/icamera_sink_data_process.h @@ -34,7 +34,7 @@ public: virtual int32_t StartCapture(std::shared_ptr& captureInfo) = 0; virtual int32_t StopCapture() = 0; virtual int32_t FeedStream(std::shared_ptr& dataBuffer) = 0; - + virtual void Init() = 0; virtual int32_t GetProperty(const std::string& propertyName, PropertyCarrier& propertyCarrier) = 0; }; } // namespace DistributedHardware diff --git a/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process.cpp b/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process.cpp index bba879f2a29a5eaba4a02f8ac2fc472b89acffa3..b226c20831be8892b7f93222927a443a4e9885fd 100644 --- a/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process.cpp +++ b/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process.cpp @@ -26,14 +26,40 @@ namespace OHOS { namespace DistributedHardware { DCameraSinkDataProcess::DCameraSinkDataProcess(const std::string& dhId, std::shared_ptr& channel) - : dhId_(dhId), channel_(channel) + : dhId_(dhId), channel_(channel), eventHandler_(nullptr) { DHLOGI("DCameraSinkDataProcess Constructor dhId: %s", GetAnonyString(dhId_).c_str()); - eventBus_ = std::make_shared("SinkDPHandler"); - DCameraPhotoOutputEvent photoEvent(*this); - DCameraVideoOutputEvent videoEvent(*this); - eventBus_->AddHandler(photoEvent.GetType(), *this); - eventBus_->AddHandler(videoEvent.GetType(), *this); +} + +DCameraSinkDataProcess::~DCameraSinkDataProcess() +{ + DHLOGI("DCameraSinkDataProcess delete dhId: %s", GetAnonyString(dhId_).c_str()); + if ((eventHandler_ != nullptr) && (eventHandler_->GetEventRunner() != nullptr)) { + eventHandler_->GetEventRunner()->Stop(); + } + eventThread_.join(); + eventHandler_ = nullptr; +} + +void DCameraSinkDataProcess::Init() +{ + DHLOGI("DCameraSinkDataProcess Init dhId: %s", GetAnonyString(dhId_).c_str()); + eventThread_ = std::thread(&DCameraSinkDataProcess::StartEventHandler, this); + std::unique_lock lock(eventMutex_); + eventCon_.wait(lock, [this] { + return eventHandler_ != nullptr; + }); +} + +void DCameraSinkDataProcess::StartEventHandler() +{ + auto runner = AppExecFwk::EventRunner::Create(false); + { + std::lock_guard lock(eventMutex_); + eventHandler_ = std::make_shared(runner); + } + eventCon_.notify_one(); + runner->Run(); } int32_t DCameraSinkDataProcess::StartCapture(std::shared_ptr& captureInfo) @@ -80,6 +106,10 @@ int32_t DCameraSinkDataProcess::StopCapture() pipeline_->DestroyDataProcessPipeline(); pipeline_ = nullptr; } + if (eventHandler_ != nullptr) { + DHLOGI("DCameraSinkDataProcess::StopCapture dhId: %s, remove all events", GetAnonyString(dhId_).c_str()); + eventHandler_->RemoveAllEvents(); + } return DCAMERA_OK; } @@ -98,41 +128,34 @@ int32_t DCameraSinkDataProcess::FeedStream(std::shared_ptr& dataBuff break; } case SNAPSHOT_FRAME: { - DCameraPhotoOutputEvent photoEvent(*this, dataBuffer); - eventBus_->PostEvent(photoEvent, POSTMODE::POST_ASYNC); + SendDataAsync(dataBuffer); break; } default: { DHLOGE("DCameraSinkDataProcess::FeedStream %s unknown stream type: %d", - GetAnonyString(dhId_).c_str(), type); + GetAnonyString(dhId_).c_str(), type); break; } } return DCAMERA_OK; } -void DCameraSinkDataProcess::OnEvent(DCameraPhotoOutputEvent& event) +void DCameraSinkDataProcess::SendDataAsync(const std::shared_ptr& buffer) { - std::shared_ptr buffer = event.GetParam(); - int32_t ret = channel_->SendData(buffer); - if (ret != DCAMERA_OK) { - DHLOGE("DCameraSinkDataProcess::OnEvent %s send photo output data ret: %d", GetAnonyString(dhId_).c_str(), ret); - } -} - -void DCameraSinkDataProcess::OnEvent(DCameraVideoOutputEvent& event) -{ - std::shared_ptr buffer = event.GetParam(); - int32_t ret = channel_->SendData(buffer); - if (ret != DCAMERA_OK) { - DHLOGE("DCameraSinkDataProcess::OnEvent %s send video output data ret: %d", GetAnonyString(dhId_).c_str(), ret); + auto sendFunc = [this, buffer]() mutable { + std::shared_ptr sendBuffer = buffer; + int32_t ret = channel_->SendData(sendBuffer); + DHLOGD("SendData type: %d output data ret: %d, dhId: %s, bufferSize: %d", captureInfo_->streamType_, ret, + GetAnonyString(dhId_).c_str(), buffer->Size()); + }; + if (eventHandler_ != nullptr) { + eventHandler_->PostTask(sendFunc); } } void DCameraSinkDataProcess::OnProcessedVideoBuffer(const std::shared_ptr& videoResult) { - DCameraVideoOutputEvent videoEvent(*this, videoResult); - eventBus_->PostEvent(videoEvent, POSTMODE::POST_ASYNC); + SendDataAsync(videoResult); } void DCameraSinkDataProcess::OnError(DataProcessErrorType errorType) @@ -145,8 +168,7 @@ int32_t DCameraSinkDataProcess::FeedStreamInner(std::shared_ptr& dat { if (captureInfo_->format_ == captureInfo_->encodeType_) { DHLOGI("DCameraSinkDataProcess::FeedStreamInner %s send video buffer", GetAnonyString(dhId_).c_str()); - DCameraVideoOutputEvent videoEvent(*this, dataBuffer); - eventBus_->PostEvent(videoEvent, POSTMODE::POST_ASYNC); + SendDataAsync(dataBuffer); return DCAMERA_OK; } diff --git a/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process_common.cpp b/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process_common.cpp index ef21f348384c5624e23583203d4714ecd4445e56..5e6334f20b053724b930b5e33983dd317e1b3573 100644 --- a/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process_common.cpp +++ b/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process_common.cpp @@ -26,14 +26,40 @@ namespace OHOS { namespace DistributedHardware { DCameraSinkDataProcess::DCameraSinkDataProcess(const std::string& dhId, std::shared_ptr& channel) - : dhId_(dhId), channel_(channel) + : dhId_(dhId), channel_(channel), eventHandler_(nullptr) { DHLOGI("DCameraSinkDataProcess Constructor dhId: %s", GetAnonyString(dhId_).c_str()); - eventBus_ = std::make_shared("SinkDPHandler"); - DCameraPhotoOutputEvent photoEvent(*this); - DCameraVideoOutputEvent videoEvent(*this); - eventBus_->AddHandler(photoEvent.GetType(), *this); - eventBus_->AddHandler(videoEvent.GetType(), *this); +} + +DCameraSinkDataProcess::~DCameraSinkDataProcess() +{ + DHLOGI("DCameraSinkDataProcess delete dhId: %s", GetAnonyString(dhId_).c_str()); + if ((eventHandler_ != nullptr) && (eventHandler_->GetEventRunner() != nullptr)) { + eventHandler_->GetEventRunner()->Stop(); + } + eventThread_.join(); + eventHandler_ = nullptr; +} + +void DCameraSinkDataProcess::Init() +{ + DHLOGI("DCameraSinkDataProcess Init dhId: %s", GetAnonyString(dhId_).c_str()); + eventThread_ = std::thread(&DCameraSinkDataProcess::StartEventHandler, this); + std::unique_lock lock(eventMutex_); + eventCon_.wait(lock, [this] { + return eventHandler_ != nullptr; + }); +} + +void DCameraSinkDataProcess::StartEventHandler() +{ + auto runner = AppExecFwk::EventRunner::Create(false); + { + std::lock_guard lock(eventMutex_); + eventHandler_ = std::make_shared(runner); + } + eventCon_.notify_one(); + runner->Run(); } int32_t DCameraSinkDataProcess::StartCapture(std::shared_ptr& captureInfo) @@ -80,6 +106,10 @@ int32_t DCameraSinkDataProcess::StopCapture() pipeline_->DestroyDataProcessPipeline(); pipeline_ = nullptr; } + if (eventHandler_ != nullptr) { + DHLOGI("DCameraSinkDataProcess::StopCapture dhId: %s, remove all events", GetAnonyString(dhId_).c_str()); + eventHandler_->RemoveAllEvents(); + } return DCAMERA_OK; } @@ -98,41 +128,34 @@ int32_t DCameraSinkDataProcess::FeedStream(std::shared_ptr& dataBuff break; } case SNAPSHOT_FRAME: { - DCameraPhotoOutputEvent photoEvent(*this, dataBuffer); - eventBus_->PostEvent(photoEvent, POSTMODE::POST_ASYNC); + SendDataAsync(dataBuffer); break; } default: { DHLOGE("DCameraSinkDataProcess::FeedStream %s unknown stream type: %d", - GetAnonyString(dhId_).c_str(), type); + GetAnonyString(dhId_).c_str(), type); break; } } return DCAMERA_OK; } -void DCameraSinkDataProcess::OnEvent(DCameraPhotoOutputEvent& event) +void DCameraSinkDataProcess::SendDataAsync(const std::shared_ptr& buffer) { - std::shared_ptr buffer = event.GetParam(); - int32_t ret = channel_->SendData(buffer); - if (ret != DCAMERA_OK) { - DHLOGE("DCameraSinkDataProcess::OnEvent %s send photo output data ret: %d", GetAnonyString(dhId_).c_str(), ret); - } -} - -void DCameraSinkDataProcess::OnEvent(DCameraVideoOutputEvent& event) -{ - std::shared_ptr buffer = event.GetParam(); - int32_t ret = channel_->SendData(buffer); - if (ret != DCAMERA_OK) { - DHLOGE("DCameraSinkDataProcess::OnEvent %s send video output data ret: %d", GetAnonyString(dhId_).c_str(), ret); + auto sendFunc = [this, buffer]() mutable { + std::shared_ptr sendBuffer = buffer; + int32_t ret = channel_->SendData(sendBuffer); + DHLOGD("SendData type: %d output data ret: %d, dhId: %s, bufferSize: %d", captureInfo_->streamType_, ret, + GetAnonyString(dhId_).c_str(), buffer->Size()); + }; + if (eventHandler_ != nullptr) { + eventHandler_->PostTask(sendFunc); } } void DCameraSinkDataProcess::OnProcessedVideoBuffer(const std::shared_ptr& videoResult) { - DCameraVideoOutputEvent videoEvent(*this, videoResult); - eventBus_->PostEvent(videoEvent, POSTMODE::POST_ASYNC); + SendDataAsync(videoResult); } void DCameraSinkDataProcess::OnError(DataProcessErrorType errorType) diff --git a/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_output.cpp b/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_output.cpp index 7f7742b52e4bc61f4b69ddd1856490d244f8f215..8c2c55998e31fac50a76ccf7873132de228d3487 100644 --- a/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_output.cpp +++ b/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_output.cpp @@ -59,6 +59,7 @@ void DCameraSinkOutput::InitInner(DCStreamType type) { std::shared_ptr channel = std::make_shared(); std::shared_ptr dataProcess = std::make_shared(dhId_, channel); + dataProcess->Init(); dataProcesses_.emplace(type, dataProcess); channels_.emplace(type, channel); sessionState_.emplace(type, DCAMERA_CHANNEL_STATE_DISCONNECTED); diff --git a/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/dcamera_sink_data_process_test.cpp b/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/dcamera_sink_data_process_test.cpp index 0ea2953c397cb3d731b5652faa4e3b66f837e7b3..ffa58014e5455984c3ea9b358c9204580cbacabd 100644 --- a/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/dcamera_sink_data_process_test.cpp +++ b/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/dcamera_sink_data_process_test.cpp @@ -105,6 +105,7 @@ void DCameraSinkDataProcessTest::SetUp(void) DCameraHandler::GetInstance().Initialize(); std::vector cameras = DCameraHandler::GetInstance().GetCameras(); dataProcess_ = std::make_shared(cameras[0], channel_); + dataProcess_->Init(); dataProcess_->pipeline_ = std::make_shared(); dataProcess_->captureInfo_ = g_testCaptureInfoContinuousNeedEncode; diff --git a/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/mock_dcamera_sink_data_process.h b/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/mock_dcamera_sink_data_process.h index 3e17d081e8bb5ed3c4540be9205256ee4f127a91..0083041fe5a50ff97cee0f19f307810399d0fbd1 100644 --- a/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/mock_dcamera_sink_data_process.h +++ b/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/mock_dcamera_sink_data_process.h @@ -59,6 +59,9 @@ public: { return DCAMERA_OK; } + void Init() + { + } void OnEvent(DCameraPhotoOutputEvent& event) { } diff --git a/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process.cpp b/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process.cpp index 173af67d07f2deaedfd23ccfefe28440ce4c54be..094cc3914687a119c1eceee92ea3bf75ebb191bf 100644 --- a/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process.cpp +++ b/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process.cpp @@ -311,6 +311,7 @@ void EncodeDataProcess::ReleaseVideoEncoder() DHLOGE("VideoEncoder release failed. Error type: %d.", ret); } videoEncoder_ = nullptr; + DHLOGD("Start release videoEncoder success."); } void EncodeDataProcess::ReleaseProcessNode() diff --git a/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process_common.cpp b/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process_common.cpp index 0404d4ada3e6655b7066f84845b02b7a56e4b06b..dc54e05b978552818f5045978903966ad108b7ed 100644 --- a/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process_common.cpp +++ b/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process_common.cpp @@ -283,6 +283,7 @@ void EncodeDataProcess::ReleaseVideoEncoder() DHLOGE("VideoEncoder release failed. Error type: %d.", ret); } videoEncoder_ = nullptr; + DHLOGD("Start release videoEncoder success."); } void EncodeDataProcess::ReleaseProcessNode()