From 5ba21f859d62afba1fe1374b42e30089665d9ae6 Mon Sep 17 00:00:00 2001 From: hobbycao Date: Sat, 26 Nov 2022 20:19:55 +0800 Subject: [PATCH] fix: drop the dirty data when softbus send slowly Signed-off-by: hobbycao --- .../dcamera_sink_data_process.h | 27 ++++--- .../interface/icamera_sink_data_process.h | 1 + .../dcamera_sink_data_process.cpp | 76 ++++++++++++------- .../dcamera_sink_data_process_common.cpp | 73 ++++++++++++------ .../dcamera_sink_output.cpp | 1 + .../dcamera_sink_data_process_test.cpp | 1 + .../mock_dcamera_sink_data_process.h | 3 + .../encoder/encode_data_process.cpp | 1 + .../encoder/encode_data_process_common.cpp | 1 + 9 files changed, 120 insertions(+), 64 deletions(-) diff --git a/services/cameraservice/sinkservice/include/distributedcameramgr/dcamera_sink_data_process.h b/services/cameraservice/sinkservice/include/distributedcameramgr/dcamera_sink_data_process.h index 01c108d3..49a75fe3 100644 --- a/services/cameraservice/sinkservice/include/distributedcameramgr/dcamera_sink_data_process.h +++ b/services/cameraservice/sinkservice/include/distributedcameramgr/dcamera_sink_data_process.h @@ -16,31 +16,28 @@ #ifndef OHOS_DCAMERA_SINK_DATA_PROCESS_H #define OHOS_DCAMERA_SINK_DATA_PROCESS_H -#include "event_bus.h" -#include "dcamera_photo_output_event.h" -#include "dcamera_video_output_event.h" -#include "icamera_sink_data_process.h" +#include +#include +#include +#include "event_handler.h" #include "icamera_channel.h" +#include "icamera_sink_data_process.h" #include "idata_process_pipeline.h" #include "image_common_type.h" namespace OHOS { namespace DistributedHardware { -class DCameraSinkDataProcess : public ICameraSinkDataProcess, public EventSender, - public DistributedHardware::EventBusHandler, - public DistributedHardware::EventBusHandler, +class DCameraSinkDataProcess : public ICameraSinkDataProcess, public std::enable_shared_from_this { public: DCameraSinkDataProcess(const std::string& dhId, std::shared_ptr& channel); - ~DCameraSinkDataProcess() = default; + ~DCameraSinkDataProcess(); int32_t StartCapture(std::shared_ptr& captureInfo) override; int32_t StopCapture() override; int32_t FeedStream(std::shared_ptr& dataBuffer) override; - - void OnEvent(DCameraPhotoOutputEvent& event) override; - void OnEvent(DCameraVideoOutputEvent& event) override; + void Init() override; void OnProcessedVideoBuffer(const std::shared_ptr& videoResult); void OnError(DataProcessErrorType errorType); @@ -49,12 +46,18 @@ private: int32_t FeedStreamInner(std::shared_ptr& dataBuffer); VideoCodecType GetPipelineCodecType(DCEncodeType encodeType); Videoformat GetPipelineFormat(int32_t format); + void StartEventHandler(); + void SendDataAsync(const std::shared_ptr& buffer); std::string dhId_; std::shared_ptr captureInfo_; - std::shared_ptr eventBus_; std::shared_ptr channel_; std::shared_ptr pipeline_; + + std::mutex eventMutex_; + std::thread eventThread_; + std::condition_variable eventCon_; + std::shared_ptr eventHandler_; }; } // namespace DistributedHardware } // namespace OHOS diff --git a/services/cameraservice/sinkservice/include/distributedcameramgr/interface/icamera_sink_data_process.h b/services/cameraservice/sinkservice/include/distributedcameramgr/interface/icamera_sink_data_process.h index 035cec5e..dacaf16c 100644 --- a/services/cameraservice/sinkservice/include/distributedcameramgr/interface/icamera_sink_data_process.h +++ b/services/cameraservice/sinkservice/include/distributedcameramgr/interface/icamera_sink_data_process.h @@ -32,6 +32,7 @@ public: virtual int32_t StartCapture(std::shared_ptr& captureInfo) = 0; virtual int32_t StopCapture() = 0; virtual int32_t FeedStream(std::shared_ptr& dataBuffer) = 0; + virtual void Init() = 0; }; } // namespace DistributedHardware } // namespace OHOS diff --git a/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process.cpp b/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process.cpp index 1133c184..9c2cdde2 100644 --- a/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process.cpp +++ b/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process.cpp @@ -26,14 +26,40 @@ namespace OHOS { namespace DistributedHardware { DCameraSinkDataProcess::DCameraSinkDataProcess(const std::string& dhId, std::shared_ptr& channel) - : dhId_(dhId), channel_(channel) + : dhId_(dhId), channel_(channel), eventHandler_(nullptr) { DHLOGI("DCameraSinkDataProcess Constructor dhId: %s", GetAnonyString(dhId_).c_str()); - eventBus_ = std::make_shared("SinkDPHandler"); - DCameraPhotoOutputEvent photoEvent(*this); - DCameraVideoOutputEvent videoEvent(*this); - eventBus_->AddHandler(photoEvent.GetType(), *this); - eventBus_->AddHandler(videoEvent.GetType(), *this); +} + +DCameraSinkDataProcess::~DCameraSinkDataProcess() +{ + DHLOGI("DCameraSinkDataProcess delete dhId: %s", GetAnonyString(dhId_).c_str()); + if ((eventHandler_ != nullptr) && (eventHandler_->GetEventRunner() != nullptr)) { + eventHandler_->GetEventRunner()->Stop(); + } + eventThread_.join(); + eventHandler_ = nullptr; +} + +void DCameraSinkDataProcess::Init() +{ + DHLOGI("DCameraSinkDataProcess Init dhId: %s", GetAnonyString(dhId_).c_str()); + eventThread_ = std::thread(&DCameraSinkDataProcess::StartEventHandler, this); + std::unique_lock lock(eventMutex_); + eventCon_.wait(lock, [this] { + return eventHandler_ != nullptr; + }); +} + +void DCameraSinkDataProcess::StartEventHandler() +{ + auto runner = AppExecFwk::EventRunner::Create(false); + { + std::lock_guard lock(eventMutex_); + eventHandler_ = std::make_shared(runner); + } + eventCon_.notify_one(); + runner->Run(); } int32_t DCameraSinkDataProcess::StartCapture(std::shared_ptr& captureInfo) @@ -80,6 +106,10 @@ int32_t DCameraSinkDataProcess::StopCapture() pipeline_->DestroyDataProcessPipeline(); pipeline_ = nullptr; } + if (eventHandler_ != nullptr) { + DHLOGI("DCameraSinkDataProcess::StopCapture dhId: %s, remove all events", GetAnonyString(dhId_).c_str()); + eventHandler_->RemoveAllEvents(); + } return DCAMERA_OK; } @@ -98,41 +128,34 @@ int32_t DCameraSinkDataProcess::FeedStream(std::shared_ptr& dataBuff break; } case SNAPSHOT_FRAME: { - DCameraPhotoOutputEvent photoEvent(*this, dataBuffer); - eventBus_->PostEvent(photoEvent, POSTMODE::POST_ASYNC); + SendDataAsync(dataBuffer); break; } default: { DHLOGE("DCameraSinkDataProcess::FeedStream %s unknown stream type: %d", - GetAnonyString(dhId_).c_str(), type); + GetAnonyString(dhId_).c_str(), type); break; } } return DCAMERA_OK; } -void DCameraSinkDataProcess::OnEvent(DCameraPhotoOutputEvent& event) +void DCameraSinkDataProcess::SendDataAsync(const std::shared_ptr& buffer) { - std::shared_ptr buffer = event.GetParam(); - int32_t ret = channel_->SendData(buffer); - if (ret != DCAMERA_OK) { - DHLOGE("DCameraSinkDataProcess::OnEvent %s send photo output data ret: %d", GetAnonyString(dhId_).c_str(), ret); - } -} - -void DCameraSinkDataProcess::OnEvent(DCameraVideoOutputEvent& event) -{ - std::shared_ptr buffer = event.GetParam(); - int32_t ret = channel_->SendData(buffer); - if (ret != DCAMERA_OK) { - DHLOGE("DCameraSinkDataProcess::OnEvent %s send video output data ret: %d", GetAnonyString(dhId_).c_str(), ret); + auto sendFunc = [this, buffer]() mutable { + std::shared_ptr sendBuffer = buffer; + int32_t ret = channel_->SendData(sendBuffer); + DHLOGD("SendData type: %d output data ret: %d, dhId: %s, bufferSize: %d", captureInfo_->streamType_, ret, + GetAnonyString(dhId_).c_str(), buffer->Size()); + }; + if (eventHandler_ != nullptr) { + eventHandler_->PostTask(sendFunc); } } void DCameraSinkDataProcess::OnProcessedVideoBuffer(const std::shared_ptr& videoResult) { - DCameraVideoOutputEvent videoEvent(*this, videoResult); - eventBus_->PostEvent(videoEvent, POSTMODE::POST_ASYNC); + SendDataAsync(videoResult); } void DCameraSinkDataProcess::OnError(DataProcessErrorType errorType) @@ -145,8 +168,7 @@ int32_t DCameraSinkDataProcess::FeedStreamInner(std::shared_ptr& dat { if (captureInfo_->format_ == captureInfo_->encodeType_) { DHLOGI("DCameraSinkDataProcess::FeedStreamInner %s send video buffer", GetAnonyString(dhId_).c_str()); - DCameraVideoOutputEvent videoEvent(*this, dataBuffer); - eventBus_->PostEvent(videoEvent, POSTMODE::POST_ASYNC); + SendDataAsync(dataBuffer); return DCAMERA_OK; } diff --git a/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process_common.cpp b/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process_common.cpp index 8c76340f..a14bf081 100644 --- a/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process_common.cpp +++ b/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_data_process_common.cpp @@ -26,14 +26,40 @@ namespace OHOS { namespace DistributedHardware { DCameraSinkDataProcess::DCameraSinkDataProcess(const std::string& dhId, std::shared_ptr& channel) - : dhId_(dhId), channel_(channel) + : dhId_(dhId), channel_(channel), eventHandler_(nullptr) { DHLOGI("DCameraSinkDataProcess Constructor dhId: %s", GetAnonyString(dhId_).c_str()); - eventBus_ = std::make_shared("SinkDPHandler"); - DCameraPhotoOutputEvent photoEvent(*this); - DCameraVideoOutputEvent videoEvent(*this); - eventBus_->AddHandler(photoEvent.GetType(), *this); - eventBus_->AddHandler(videoEvent.GetType(), *this); +} + +DCameraSinkDataProcess::~DCameraSinkDataProcess() +{ + DHLOGI("DCameraSinkDataProcess delete dhId: %s", GetAnonyString(dhId_).c_str()); + if ((eventHandler_ != nullptr) && (eventHandler_->GetEventRunner() != nullptr)) { + eventHandler_->GetEventRunner()->Stop(); + } + eventThread_.join(); + eventHandler_ = nullptr; +} + +void DCameraSinkDataProcess::Init() +{ + DHLOGI("DCameraSinkDataProcess Init dhId: %s", GetAnonyString(dhId_).c_str()); + eventThread_ = std::thread(&DCameraSinkDataProcess::StartEventHandler, this); + std::unique_lock lock(eventMutex_); + eventCon_.wait(lock, [this] { + return eventHandler_ != nullptr; + }); +} + +void DCameraSinkDataProcess::StartEventHandler() +{ + auto runner = AppExecFwk::EventRunner::Create(false); + { + std::lock_guard lock(eventMutex_); + eventHandler_ = std::make_shared(runner); + } + eventCon_.notify_one(); + runner->Run(); } int32_t DCameraSinkDataProcess::StartCapture(std::shared_ptr& captureInfo) @@ -80,6 +106,10 @@ int32_t DCameraSinkDataProcess::StopCapture() pipeline_->DestroyDataProcessPipeline(); pipeline_ = nullptr; } + if (eventHandler_ != nullptr) { + DHLOGI("DCameraSinkDataProcess::StopCapture dhId: %s, remove all events", GetAnonyString(dhId_).c_str()); + eventHandler_->RemoveAllEvents(); + } return DCAMERA_OK; } @@ -98,41 +128,34 @@ int32_t DCameraSinkDataProcess::FeedStream(std::shared_ptr& dataBuff break; } case SNAPSHOT_FRAME: { - DCameraPhotoOutputEvent photoEvent(*this, dataBuffer); - eventBus_->PostEvent(photoEvent, POSTMODE::POST_ASYNC); + SendDataAsync(dataBuffer); break; } default: { DHLOGE("DCameraSinkDataProcess::FeedStream %s unknown stream type: %d", - GetAnonyString(dhId_).c_str(), type); + GetAnonyString(dhId_).c_str(), type); break; } } return DCAMERA_OK; } -void DCameraSinkDataProcess::OnEvent(DCameraPhotoOutputEvent& event) +void DCameraSinkDataProcess::SendDataAsync(const std::shared_ptr& buffer) { - std::shared_ptr buffer = event.GetParam(); - int32_t ret = channel_->SendData(buffer); - if (ret != DCAMERA_OK) { - DHLOGE("DCameraSinkDataProcess::OnEvent %s send photo output data ret: %d", GetAnonyString(dhId_).c_str(), ret); - } -} - -void DCameraSinkDataProcess::OnEvent(DCameraVideoOutputEvent& event) -{ - std::shared_ptr buffer = event.GetParam(); - int32_t ret = channel_->SendData(buffer); - if (ret != DCAMERA_OK) { - DHLOGE("DCameraSinkDataProcess::OnEvent %s send video output data ret: %d", GetAnonyString(dhId_).c_str(), ret); + auto sendFunc = [this, buffer]() mutable { + std::shared_ptr sendBuffer = buffer; + int32_t ret = channel_->SendData(sendBuffer); + DHLOGD("SendData type: %d output data ret: %d, dhId: %s, bufferSize: %d", captureInfo_->streamType_, ret, + GetAnonyString(dhId_).c_str(), buffer->Size()); + }; + if (eventHandler_ != nullptr) { + eventHandler_->PostTask(sendFunc); } } void DCameraSinkDataProcess::OnProcessedVideoBuffer(const std::shared_ptr& videoResult) { - DCameraVideoOutputEvent videoEvent(*this, videoResult); - eventBus_->PostEvent(videoEvent, POSTMODE::POST_ASYNC); + SendDataAsync(videoResult); } void DCameraSinkDataProcess::OnError(DataProcessErrorType errorType) diff --git a/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_output.cpp b/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_output.cpp index d70c8045..5c3de9fa 100644 --- a/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_output.cpp +++ b/services/cameraservice/sinkservice/src/distributedcameramgr/dcamera_sink_output.cpp @@ -59,6 +59,7 @@ void DCameraSinkOutput::InitInner(DCStreamType type) { std::shared_ptr channel = std::make_shared(); std::shared_ptr dataProcess = std::make_shared(dhId_, channel); + dataProcess->Init(); dataProcesses_.emplace(type, dataProcess); channels_.emplace(type, channel); sessionState_.emplace(type, DCAMERA_CHANNEL_STATE_DISCONNECTED); diff --git a/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/dcamera_sink_data_process_test.cpp b/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/dcamera_sink_data_process_test.cpp index 0ea2953c..ffa58014 100644 --- a/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/dcamera_sink_data_process_test.cpp +++ b/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/dcamera_sink_data_process_test.cpp @@ -105,6 +105,7 @@ void DCameraSinkDataProcessTest::SetUp(void) DCameraHandler::GetInstance().Initialize(); std::vector cameras = DCameraHandler::GetInstance().GetCameras(); dataProcess_ = std::make_shared(cameras[0], channel_); + dataProcess_->Init(); dataProcess_->pipeline_ = std::make_shared(); dataProcess_->captureInfo_ = g_testCaptureInfoContinuousNeedEncode; diff --git a/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/mock_dcamera_sink_data_process.h b/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/mock_dcamera_sink_data_process.h index 53809375..32bf0a45 100644 --- a/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/mock_dcamera_sink_data_process.h +++ b/services/cameraservice/sinkservice/test/unittest/common/distributedcameramgr/mock_dcamera_sink_data_process.h @@ -59,6 +59,9 @@ public: { return DCAMERA_OK; } + void Init() + { + } void OnEvent(DCameraPhotoOutputEvent& event) { } diff --git a/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process.cpp b/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process.cpp index 3192210f..5fb7bd2d 100644 --- a/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process.cpp +++ b/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process.cpp @@ -311,6 +311,7 @@ void EncodeDataProcess::ReleaseVideoEncoder() DHLOGE("VideoEncoder release failed. Error type: %d.", ret); } videoEncoder_ = nullptr; + DHLOGD("Start release videoEncoder success."); } void EncodeDataProcess::ReleaseProcessNode() diff --git a/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process_common.cpp b/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process_common.cpp index 482c0699..16147eb2 100644 --- a/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process_common.cpp +++ b/services/data_process/src/pipeline_node/multimedia_codec/encoder/encode_data_process_common.cpp @@ -283,6 +283,7 @@ void EncodeDataProcess::ReleaseVideoEncoder() DHLOGE("VideoEncoder release failed. Error type: %d.", ret); } videoEncoder_ = nullptr; + DHLOGD("Start release videoEncoder success."); } void EncodeDataProcess::ReleaseProcessNode() -- Gitee