From bfb61c94a48d712e0986dbe66bc6cdd340774b1c Mon Sep 17 00:00:00 2001 From: Bobie Date: Thu, 24 Oct 2024 14:12:50 +0800 Subject: [PATCH 1/2] framwork of pipeline/filter Signed-off-by: Bobie --- av_transport/common/include/pipeline_event.h | 68 +++ av_transport/common/include/pipeline_status.h | 72 +++ av_transport/framework/BUILD.gn | 87 +++ .../framework/filter/include/filter.h | 280 +++++++++ .../framework/filter/include/filter_factory.h | 101 +++ av_transport/framework/filter/src/filter.cpp | 577 ++++++++++++++++++ .../framework/filter/src/filter_factory.cpp | 37 ++ .../framework/pipeline/include/pipeline.h | 84 +++ .../framework/pipeline/src/pipeline.cpp | 323 ++++++++++ bundle.json | 17 +- 10 files changed, 1638 insertions(+), 8 deletions(-) create mode 100644 av_transport/common/include/pipeline_event.h create mode 100644 av_transport/common/include/pipeline_status.h create mode 100644 av_transport/framework/BUILD.gn create mode 100644 av_transport/framework/filter/include/filter.h create mode 100644 av_transport/framework/filter/include/filter_factory.h create mode 100644 av_transport/framework/filter/src/filter.cpp create mode 100644 av_transport/framework/filter/src/filter_factory.cpp create mode 100644 av_transport/framework/pipeline/include/pipeline.h create mode 100644 av_transport/framework/pipeline/src/pipeline.cpp diff --git a/av_transport/common/include/pipeline_event.h b/av_transport/common/include/pipeline_event.h new file mode 100644 index 00000000..ad19d54e --- /dev/null +++ b/av_transport/common/include/pipeline_event.h @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_AV_PIPELINE_EVENT_H +#define OHOS_AV_PIPELINE_EVENT_H + +#include +#include + +#include "meta/any.h" + +namespace OHOS { +namespace DistributedHardware { +enum struct EventType : uint32_t { + EVENT_READY = 0, + EVENT_AUDIO_PROGRESS, // unit is HST_TIME_BASE + EVENT_VIDEO_PROGRESS, // unit is HST_TIME_BASE + EVENT_COMPLETE, + EVENT_ERROR, + EVENT_PLUGIN_ERROR, + EVENT_PLUGIN_EVENT, + EVENT_BUFFERING, + EVENT_BUFFER_PROGRESS, + EVENT_DECODER_ERROR, + EVENT_RESOLUTION_CHANGE, + EVENT_VIDEO_RENDERING_START, + EVENT_IS_LIVE_STREAM, + EVENT_DRM_INFO_UPDATED, + EVENT_AUDIO_INTERRUPT, + EVENT_AUDIO_STATE_CHANGE, + EVENT_AUDIO_FIRST_FRAME, + EVENT_AUDIO_DEVICE_CHANGE, + EVENT_AUDIO_SERVICE_DIED, + BUFFERING_START, + BUFFERING_END, + EVENT_CACHED_DURATION, + EVENT_SOURCE_BITRATE_START, + EVENT_SUBTITLE_TEXT_UPDATE, + EVENT_AUDIO_TRACK_CHANGE, + EVENT_VIDEO_TRACK_CHANGE, + EVENT_SUBTITLE_TRACK_CHANGE, + EVENT_VIDEO_LAG, // player lag event detected by video sink + EVENT_AUDIO_LAG, // player lag event detected by audio sink + EVENT_STREAM_LAG, // player lag event detected by sync manager +}; + +struct Event { + std::string srcFilter; + EventType type; + Media::Any param; +}; + +const char* GetEventName(EventType type); +} // namespace DistributedHardware +} // namespace OHOS +#endif //OHOS_AV_PIPELINE_EVENT_H diff --git a/av_transport/common/include/pipeline_status.h b/av_transport/common/include/pipeline_status.h new file mode 100644 index 00000000..87335bf7 --- /dev/null +++ b/av_transport/common/include/pipeline_status.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_AV_PIPELINE_STATUS_H +#define OHOS_AV_PIPELINE_STATUS_H + +#include + +namespace OHOS { +namespace DistributedHardware { +enum struct Status : int32_t { + END_OF_STREAM = 1, ///< Read source when end of stream + OK = 0, ///< The execution result is correct. + NO_ERROR = OK, ///< Same as Status::OK + ERROR_UNKNOWN = -1, ///< An unknown error occurred. + ERROR_PLUGIN_ALREADY_EXISTS = -2, ///< The plugin already exists, usually occurs when in plugin registered. + ERROR_INCOMPATIBLE_VERSION = + -3, ///< Incompatible version, may occur during plugin registration or function calling. + ERROR_NO_MEMORY = -4, ///< The system memory is insufficient. + ERROR_WRONG_STATE = -5, ///< The function is called in an invalid state. + ERROR_UNIMPLEMENTED = -6, ///< This method or interface is not implemented. + ERROR_INVALID_PARAMETER = -7, ///< The plugin does not support this parameter. + ERROR_INVALID_DATA = -8, ///< The value is not in the valid range. + ERROR_MISMATCHED_TYPE = -9, ///< Mismatched data type + ERROR_TIMED_OUT = -10, ///< Operation timeout. + ERROR_UNSUPPORTED_FORMAT = -11, ///< The plugin not support this format/name. + ERROR_NOT_ENOUGH_DATA = -12, ///< Not enough data when read from source. + ERROR_NOT_EXISTED = -13, ///< Source is not existed. + ERROR_AGAIN = -14, ///< Operation is not available right now, should try again later. + ERROR_PERMISSION_DENIED = -15, ///< Permission denied. + ERROR_NULL_POINTER = -16, ///< Null pointer. + ERROR_INVALID_OPERATION = -17, ///< Invalid operation. + ERROR_CLIENT = -18, ///< Http client error + ERROR_SERVER = -19, ///< Http server error + ERROR_DELAY_READY = -20, ///< Delay ready event + ERROR_INVALID_STATE = -21, + ERROR_AUDIO_INTERRUPT = -22, + ERROR_INVALID_BUFFER_SIZE = 0xF001, + ERROR_UNEXPECTED_MEMORY_TYPE = 0xF002, + ERROR_CREATE_BUFFER = 0xF003, + ERROR_NULL_POINT_BUFFER = 0xF004, + ERROR_INVALID_BUFFER_ID = 0xF005, + ERROR_INVALID_BUFFER_STATE = 0xF006, + ERROR_NO_FREE_BUFFER = 0xF007, + ERROR_NO_DIRTY_BUFFER = 0xF008, + ERROR_NO_CONSUMER_LISTENER = 0xF009, + ERROR_NULL_BUFFER_QUEUE = 0xF00A, + ERROR_WAIT_TIMEOUT = 0xF00B, + ERROR_OUT_OF_RANGE = 0xF00C, + ERROR_NULL_SURFACE = 0xF00D, + ERROR_SURFACE_INNER = 0xF00E, + ERROR_NULL_SURFACE_BUFFER = 0xF00F, + ERROR_DRM_DECRYPT_FAILED = 0xF010, + + ERROR_IPC_WRITE_INTERFACE_TOKEN = 0xF101, + ERROR_IPC_SEND_REQUEST = 0xF102, +}; +} // namespace DistributedHardware +} // namespace OHOS +#endif // OHOS_AV_PIPELINE_STATUS_H diff --git a/av_transport/framework/BUILD.gn b/av_transport/framework/BUILD.gn new file mode 100644 index 00000000..e9945405 --- /dev/null +++ b/av_transport/framework/BUILD.gn @@ -0,0 +1,87 @@ +# Copyright (c) 2024 Huawei Device Co., Ltd. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import("//build/ohos.gni") +import("../distributed_av_transport.gni") + +config("av_pipeline_fwk_external_config") { + include_dirs = [ + "${common_path}/include", + "${dh_fwk_sdk_path}/include", + "${interface_path}", + ] +} + +ohos_shared_library("distributed_av_pipeline_fwk") { + sanitize = { + cfi = true + cfi_cross_dso = true + cfi_no_nvcall = true + cfi_vcall_icall_only = true + debug = false + boundary_sanitize = true + integer_overflow = true + ubsan = true + } + branch_protector_ret = "pac_ret" + public_configs = [ ":av_pipeline_fwk_external_config" ] + + include_dirs = [ + "${dh_fwk_utils_path}/include", + "${distributed_av_transport_path}/framework", + "${distributed_av_transport_path}/framework/filter/include", + ] + + sources = [ + "${common_path}/src/av_trans_log.cpp", + "${distributed_av_transport_path}/framework/filter/src/filter.cpp", + "${distributed_av_transport_path}/framework/filter/src/filter_factory.cpp", + "${distributed_av_transport_path}/framework/pipeline/src/pipeline.cpp", + ] + + deps = [ "${dh_fwk_sdk_path}:libdhfwk_sdk" ] + + defines = [ + "HI_LOG_ENABLE", + "DH_LOG_TAG=\"av_trans_fwk\"", + "LOG_DOMAIN=0xD004101", + ] + + external_deps = [ + "bounds_checking_function:libsec_shared", + "c_utils:utils", + "hilog:libhilog", + "hisysevent:libhisysevent", + "hitrace:hitrace_meter", + "ipc:ipc_core", + "safwk:system_ability_fwk", + "samgr:samgr_proxy", + ] + + if (histreamer_compile_part) { + external_deps += [ "media_foundation:media_foundation" ] + } + + cflags = [ + "-fexceptions", + "-fno-rtti", + "-fPIC", + "-O2", + "-Wall", + "-Wno-c++20-extensions", + ] + cflags_cc = cflags + + part_name = "distributed_hardware_fwk" + subsystem_name = "distributedhardware" +} diff --git a/av_transport/framework/filter/include/filter.h b/av_transport/framework/filter/include/filter.h new file mode 100644 index 00000000..54369a36 --- /dev/null +++ b/av_transport/framework/filter/include/filter.h @@ -0,0 +1,280 @@ +/* + * Copyright (c) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_AV_PIPELINE_FILTER_BASE_H +#define OHOS_AV_PIPELINE_FILTER_BASE_H +#include +#include +#include +#include + +#include "buffer/avbuffer_queue_producer.h" +#include "meta/meta.h" +#include "osal/task/condition_variable.h" +#include "osal/task/mutex.h" +#include "osal/task/task.h" + +#include "pipeline_event.h" +#include "pipeline_status.h" + +namespace OHOS { +namespace DistributedHardware { +namespace Pipeline { + +class Filter; + +enum class FilterType { + FILTERTYPE_SOURCE, + FILTERTYPE_DEMUXER, + FILTERTYPE_AENC, + FILTERTYPE_ADEC, + FILTERTYPE_VENC, + FILTERTYPE_VDEC, + FILTERTYPE_VIDEODEC, + FILTERTYPE_MUXER, + FILTERTYPE_ASINK, + FILTERTYPE_FSINK, + FILTERTYPE_SSINK, + AUDIO_CAPTURE, + AUDIO_DATA_SOURCE, + VIDEO_CAPTURE, + FILTERTYPE_VIDRESIZE, + TIMED_METADATA, + FILTERTYPE_MAX, +}; + +enum class StreamType { + STREAMTYPE_PACKED, + STREAMTYPE_ENCODED_AUDIO, + STREAMTYPE_ENCODED_VIDEO, + STREAMTYPE_RAW_AUDIO, + STREAMTYPE_RAW_VIDEO, + STREAMTYPE_SUBTITLE, + STREAMTYPE_MAX, +}; + +enum class FilterState { + CREATED, // Filter created + INITIALIZED, // Init called + PREPARING, // Prepare called + READY, // Ready Event reported + RUNNING, // Start called + PAUSED, // Pause called + STOPPED, // Stop called + RELEASED, // Release called + ERROR, // State fail +}; + +enum class FilterCallBackCommand { + NEXT_FILTER_NEEDED, + NEXT_FILTER_REMOVED, + NEXT_FILTER_UPDATE, + FILTER_CALLBACK_COMMAND_MAX, +}; + +class EventReceiver { +public: + virtual ~EventReceiver() = default; + virtual void OnEvent(const Event& event) = 0; +}; + +class FilterCallback { +public: + virtual ~FilterCallback() = default; + virtual Status OnCallback(const std::shared_ptr& filter, FilterCallBackCommand cmd, StreamType outType) = 0; +}; + +class FilterLinkCallback { +public: + virtual ~FilterLinkCallback() = default; + virtual void OnLinkedResult(const sptr& queue, + std::shared_ptr& meta) = 0; + virtual void OnUnlinkedResult(std::shared_ptr& meta) = 0; + virtual void OnUpdatedResult(std::shared_ptr& meta) = 0; +}; + +class Filter { +public: + explicit Filter(std::string name, FilterType type, bool asyncMode = false); + virtual ~Filter(); + virtual void Init(const std::shared_ptr& receiver, const std::shared_ptr& callback); + + virtual void LinkPipeLine(const std::string& groupId) final; + + virtual Status Prepare() final; + + virtual Status Start() final; + + virtual Status Pause() final; + + virtual Status PauseDragging() final; + + virtual Status Resume() final; + + virtual Status ResumeDragging() final; + + virtual Status Stop() final; + + virtual Status Flush() final; + + virtual Status Release() final; + + virtual Status Preroll() final; + + virtual Status WaitPrerollDone(bool render) final; + + virtual void StartFilterTask() final; + + virtual void PauseFilterTask() final; + + virtual Status SetPlayRange(int64_t start, int64_t end) final; + + virtual Status ProcessInputBuffer(int sendArg = 0, int64_t delayUs = 0) final; + + virtual Status ProcessOutputBuffer(int sendArg = 0, int64_t delayUs = 0, bool byIdx = false, uint32_t idx = 0, + int64_t renderTime = -1) final; + + virtual Status WaitAllState(FilterState state) final; + + virtual Status DoInitAfterLink(); + + virtual Status DoPrepare(); + + virtual Status DoStart(); + + virtual Status DoPause(); + + virtual Status DoPauseDragging(); + + virtual Status DoResume(); + + virtual Status DoResumeDragging(); + + virtual Status DoStop(); + + virtual Status DoFlush(); + + virtual Status DoRelease(); + + virtual Status DoPreroll(); + + virtual Status DoWaitPrerollDone(bool render); + + virtual Status DoSetPlayRange(int64_t start, int64_t end); + + virtual Status DoProcessInputBuffer(int recvArg, bool dropFrame); + + virtual Status DoProcessOutputBuffer(int recvArg, bool dropFrame, bool byIdx, uint32_t idx, int64_t renderTime); + + virtual void SetParameter(const std::shared_ptr& meta); + + virtual void GetParameter(std::shared_ptr& meta); + + virtual Status LinkNext(const std::shared_ptr& nextFilter, StreamType outType); + + virtual Status UpdateNext(const std::shared_ptr& nextFilter, StreamType outType); + + virtual Status UnLinkNext(const std::shared_ptr& nextFilter, StreamType outType); + + FilterType GetFilterType(); + + virtual Status OnLinked(StreamType inType, const std::shared_ptr& meta, + const std::shared_ptr& callback); + + virtual Status OnUpdated(StreamType inType, const std::shared_ptr& meta, + const std::shared_ptr& callback); + + virtual Status OnUnLinked(StreamType inType, const std::shared_ptr& callback); + + virtual void ChangeState(FilterState state); + + virtual void SetErrCode(Status errCode); + + virtual Status GetErrCode(); + + virtual Status ClearAllNextFilters(); + + virtual Status SetMuted(bool isMuted) + { + (void)isMuted; + return Status::OK; + } + + virtual bool IsDesignatedState(FilterState state); +protected: + virtual Status PrepareDone() final; + + virtual Status StartDone() final; + + virtual Status PauseDone() final; + + virtual Status ResumeDone() final; + + virtual Status StopDone() final; + + virtual Status ReleaseDone() final; + + std::string name_; + + std::shared_ptr meta_; + + FilterType filterType_; + FilterState curState_; + + std::vector supportedInStreams_; + std::vector supportedOutStreams_; + + OHOS::Media::Mutex stateMutex_{}; + OHOS::Media::ConditionVariable cond_{}; + + std::map>> nextFiltersMap_; + + std::shared_ptr receiver_; + + std::shared_ptr callback_; + + std::map>> linkCallbackMaps_; + + Status errCode_ = Status::OK; + + std::unique_ptr filterTask_; + + int64_t jobIdx_ = 0; + + int64_t processIdx_ = 0; + + int64_t jobIdxBase_ = 0; + + std::string groupId_; + + bool isAsyncMode_; +}; + +enum FilterPlaybackCommand { + INIT = 0, + PREPARE, + START, + PAUSE, + RESUME, + STOP, + RELEASE, + FLUSH, + PROCESS_INPUT_BUFFER, + PROCESS_OUTPUT_BUFFER, +}; +} // namespace Pipeline +} // namespace DistributedHardware +} // namespace OHOS +#endif diff --git a/av_transport/framework/filter/include/filter_factory.h b/av_transport/framework/filter/include/filter_factory.h new file mode 100644 index 00000000..952eae2e --- /dev/null +++ b/av_transport/framework/filter/include/filter_factory.h @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_AV_PIPELINE_FILTER_FACTORY_H +#define OHOS_AV_PIPELINE_FILTER_FACTORY_H + +#include +#include +#include +#include + +#include "filter.h" +#include "cpp_ext/type_cast_ext.h" + +namespace OHOS { +namespace DistributedHardware { +namespace Pipeline { +using InstanceGenerator = std::function(const std::string&, const FilterType type)>; + +class FilterFactory { +public: + ~FilterFactory() = default; + + FilterFactory(const FilterFactory&) = delete; + + FilterFactory operator=(const FilterFactory&) = delete; + + static FilterFactory& Instance(); + + template + std::shared_ptr CreateFilter(const std::string& filterName, const FilterType type) + { + auto filter = CreateFilterPriv(filterName, type); + auto typedFilter = ReinterpretPointerCast(filter); + return typedFilter; + } + + template + void RegisterFilter(const std::string& name, const FilterType type, const InstanceGenerator& generator = nullptr) + { + RegisterFilterPriv(name, type, generator); + } + +private: + FilterFactory() = default; + + std::shared_ptr CreateFilterPriv(const std::string& filterName, const FilterType type); + + template + void RegisterFilterPriv(const std::string& name, const FilterType type, const InstanceGenerator& generator) + { + if (generator == nullptr) { + auto result = generators.emplace( + type, [](const std::string &aliaName, const FilterType type) { + return std::make_shared(aliaName, type); + }); + if (!result.second) { + result.first->second = generator; + } + } else { + auto result = generators.emplace(type, generator); + if (!result.second) { + result.first->second = generator; + } + } + } + + std::unordered_map generators; +}; + +template +class AutoRegisterFilter { +public: + explicit AutoRegisterFilter(const std::string& name, const FilterType type) + { + FilterFactory::Instance().RegisterFilter(name, type); + } + + AutoRegisterFilter(const std::string& name, const FilterType type, const InstanceGenerator& generator) + { + FilterFactory::Instance().RegisterFilter(name, type, generator); + } + + ~AutoRegisterFilter() = default; +}; +} // namespace Pipeline +} // namespace DistributedHardware +} // namespace OHOS +#endif // OHOS_AV_PIPELINE_FILTER_FACTORY_H diff --git a/av_transport/framework/filter/src/filter.cpp b/av_transport/framework/filter/src/filter.cpp new file mode 100644 index 00000000..f1bd5d79 --- /dev/null +++ b/av_transport/framework/filter/src/filter.cpp @@ -0,0 +1,577 @@ +/* + * Copyright (c) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filter.h" + +#include + +#include "osal/utils/util.h" + +#include "av_trans_log.h" + +#undef DH_LOG_TAG +#define DH_LOG_TAG "Filter" + +namespace OHOS { +namespace DistributedHardware { +namespace Pipeline { +Filter::Filter(std::string name, FilterType type, bool isAsyncMode) + : name_(std::move(name)), filterType_(type), isAsyncMode_(isAsyncMode) +{ +} + +Filter::~Filter() +{ + nextFiltersMap_.clear(); +} + +void Filter::Init(const std::shared_ptr& receiver, const std::shared_ptr& callback) +{ + receiver_ = receiver; + callback_ = callback; +} + +void Filter::LinkPipeLine(const std::string& groupId) +{ + groupId_ = groupId; + if (isAsyncMode_) { + Media::TaskType taskType; + switch (filterType_) { + case FilterType::FILTERTYPE_VENC: + case FilterType::FILTERTYPE_VDEC: + case FilterType::VIDEO_CAPTURE: + taskType = Media::TaskType::SINGLETON; + break; + case FilterType::FILTERTYPE_ASINK: + case FilterType::AUDIO_CAPTURE: + taskType = Media::TaskType::AUDIO; + break; + default: + taskType = Media::TaskType::SINGLETON; + break; + } + filterTask_ = std::make_unique(name_, groupId_, taskType, Media::TaskPriority::HIGH, false); + filterTask_->SubmitJobOnce([this] { + DoInitAfterLink(); + ChangeState(FilterState::INITIALIZED); + }); + } else { + DoInitAfterLink(); + ChangeState(FilterState::INITIALIZED); + } +} + +Status Filter::Prepare() +{ + AVTRANS_LOGD("Prepare %{public}s, pState:%{public}d", name_.c_str(), curState_); + if (filterTask_) { + filterTask_->SubmitJobOnce([this] { + PrepareDone(); + }); + } else { + return PrepareDone(); + } + return Status::OK; +} + +Status Filter::PrepareDone() +{ + AVTRANS_LOGI("Prepare in %{public}s", name_.c_str()); + // next filters maybe added in DoPrepare, so we must DoPrepare first + Status ret = DoPrepare(); + SetErrCode(ret); + if (ret != Status::OK) { + return ret; + } + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + filter->Prepare(); + } + } + ChangeState(FilterState::READY); + return ret; +} + +Status Filter::Start() +{ + AVTRANS_LOGD("Start %{public}s, pState:%{public}d", name_.c_str(), curState_); + if (filterTask_) { + filterTask_->SubmitJobOnce([this] { + StartDone(); + filterTask_->Start(); + }); + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + filter->Start(); + } + } + } else { + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + filter->Start(); + } + } + return StartDone(); + } + return Status::OK; +} + +Status Filter::StartDone() +{ + AVTRANS_LOGI("Start in %{public}s", name_.c_str()); + Status ret = DoStart(); + SetErrCode(ret); + ChangeState(ret == Status::OK ? FilterState::RUNNING : FilterState::ERROR); + return ret; +} + +Status Filter::Pause() +{ + AVTRANS_LOGD("Pause %{public}s, pState:%{public}d", name_.c_str(), curState_); + // In offload case, we need pause to interrupt audio_sink_plugin write function, so do not use filterTask_ + auto ret = PauseDone(); + if (filterTask_) { + filterTask_->Pause(); + } + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + filter->Pause(); + } + } + return ret; +} + +Status Filter::PauseDragging() +{ + AVTRANS_LOGD("PauseDragging %{public}s, pState:%{public}d", name_.c_str(), curState_); + auto ret = DoPauseDragging(); + if (filterTask_) { + filterTask_->Pause(); + } + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + filter->PauseDragging(); + } + } + return ret; +} + +Status Filter::PauseDone() +{ + AVTRANS_LOGI("Pause in %{public}s", name_.c_str()); + Status ret = DoPause(); + SetErrCode(ret); + ChangeState(ret == Status::OK ? FilterState::PAUSED : FilterState::ERROR); + return ret; +} + +Status Filter::Resume() +{ + AVTRANS_LOGD("Resume %{public}s, pState:%{public}d", name_.c_str(), curState_); + if (filterTask_) { + filterTask_->SubmitJobOnce([this]() { + ResumeDone(); + filterTask_->Start(); + }); + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + filter->Resume(); + } + } + } else { + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + filter->Resume(); + } + } + return ResumeDone(); + } + return Status::OK; +} + +Status Filter::ResumeDone() +{ + AVTRANS_LOGI("Resume in %{public}s", name_.c_str()); + Status ret = DoResume(); + SetErrCode(ret); + ChangeState(ret == Status::OK ? FilterState::RUNNING : FilterState::ERROR); + return ret; +} + +Status Filter::ResumeDragging() +{ + AVTRANS_LOGD("ResumeDragging %{public}s, pState:%{public}d", name_.c_str(), curState_); + if (filterTask_) { + filterTask_->SubmitJobOnce([this]() { + DoResumeDragging(); + filterTask_->Start(); + }); + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + filter->ResumeDragging(); + } + } + } else { + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + filter->ResumeDragging(); + } + } + return DoResumeDragging(); + } + return Status::OK; +} + +Status Filter::Stop() +{ + AVTRANS_LOGD("Stop %{public}s, pState:%{public}d", name_.c_str(), curState_); + // In offload case, we need stop to interrupt audio_sink_plugin write function, so do not use filterTask_ + auto ret = StopDone(); + if (filterTask_) { + filterTask_->Stop(); + } + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + filter->Stop(); + } + } + return ret; +} + +Status Filter::StopDone() +{ + AVTRANS_LOGI("Stop in %{public}s", name_.c_str()); + Status ret = DoStop(); + SetErrCode(ret); + ChangeState(ret == Status::OK ? FilterState::STOPPED : FilterState::ERROR); + return ret; +} + +Status Filter::Flush() +{ + AVTRANS_LOGD("Flush %{public}s, pState:%{public}d", name_.c_str(), curState_); + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + filter->Flush(); + } + } + jobIdxBase_ = jobIdx_; + return DoFlush(); +} + +Status Filter::Release() +{ + AVTRANS_LOGD("Release %{public}s, pState:%{public}d", name_.c_str(), curState_); + if (filterTask_) { + filterTask_->SubmitJobOnce([this]() { + ReleaseDone(); + }); + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + filter->Release(); + } + } + } else { + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + filter->Release(); + } + } + return ReleaseDone(); + } + return Status::OK; +} + +Status Filter::ReleaseDone() +{ + AVTRANS_LOGI("Release in %{public}s", name_.c_str()); + Status ret = DoRelease(); + SetErrCode(ret); + ChangeState(ret == Status::OK ? FilterState::RELEASED : FilterState::ERROR); + return ret; +} + +Status Filter::SetPlayRange(int64_t start, int64_t end) +{ + AVTRANS_LOGD("SetPlayRange %{public}s, pState:%{public}d", name_.c_str(), curState_); + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + filter->SetPlayRange(start, end); + } + } + return DoSetPlayRange(start, end); +} + +Status Filter::Preroll() +{ + Status ret = DoPreroll(); + if (ret != Status::OK) { + return ret; + } + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + ret = filter->Preroll(); + if (ret != Status::OK) { + return ret; + } + } + } + return Status::OK; +} + +Status Filter::WaitPrerollDone(bool render) +{ + Status ret = Status::OK; + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + auto curRet = filter->WaitPrerollDone(render); + if (curRet != Status::OK) { + ret = curRet; + } + } + } + auto curRet = DoWaitPrerollDone(render); + if (curRet != Status::OK) { + ret = curRet; + } + return ret; +} + +void Filter::StartFilterTask() +{ + if (filterTask_) { + filterTask_->Start(); + } +} + +void Filter::PauseFilterTask() +{ + if (filterTask_) { + filterTask_->Pause(); + } +} + +Status Filter::ClearAllNextFilters() +{ + nextFiltersMap_.clear(); + return Status::OK; +} + +Status Filter::ProcessInputBuffer(int sendArg, int64_t delayUs) +{ + AVTRANS_LOGD("Filter::ProcessInputBuffer %{public}s", name_.c_str()); + if (filterTask_) { + jobIdx_++; + filterTask_->SubmitJob([this, sendArg]() { + processIdx_++; + DoProcessInputBuffer(sendArg, processIdx_ <= jobIdxBase_); // drop frame after flush + }, delayUs, false); + } else { + Media::Task::SleepInTask(delayUs / 1000); // 1000 convert to ms + DoProcessInputBuffer(sendArg, false); + } + return Status::OK; +} + +Status Filter::ProcessOutputBuffer(int sendArg, int64_t delayUs, bool byIdx, uint32_t idx, int64_t renderTime) +{ + AVTRANS_LOGD("Filter::ProcessOutputBuffer %{public}s", name_.c_str()); + if (filterTask_) { + jobIdx_++; + int64_t processIdx = jobIdx_; + filterTask_->SubmitJob([this, sendArg, processIdx, byIdx, idx, renderTime]() { + processIdx_++; + // drop frame after flush + DoProcessOutputBuffer(sendArg, processIdx <= jobIdxBase_, byIdx, idx, renderTime); + }, delayUs, false); + } else { + Media::Task::SleepInTask(delayUs / 1000); // 1000 convert to ms + DoProcessOutputBuffer(sendArg, false, false, idx, renderTime); + } + return Status::OK; +} + +Status Filter::DoInitAfterLink() +{ + AVTRANS_LOGI("Filter::DoInitAfterLink"); + return Status::OK; +} + +Status Filter::DoPrepare() +{ + return Status::OK; +} + +Status Filter::DoStart() +{ + return Status::OK; +} + +Status Filter::DoPause() +{ + return Status::OK; +} + +Status Filter::DoPauseDragging() +{ + return Status::OK; +} + +Status Filter::DoResume() +{ + return Status::OK; +} + +Status Filter::DoResumeDragging() +{ + return Status::OK; +} + +Status Filter::DoStop() +{ + return Status::OK; +} + +Status Filter::DoFlush() +{ + return Status::OK; +} + +Status Filter::DoRelease() +{ + return Status::OK; +} + +Status Filter::DoPreroll() +{ + return Status::OK; +} + +Status Filter::DoWaitPrerollDone(bool render) +{ + return Status::OK; +} + +Status Filter::DoSetPlayRange(int64_t start, int64_t end) +{ + return Status::OK; +} + +Status Filter::DoProcessInputBuffer(int recvArg, bool dropFrame) +{ + return Status::OK; +} + +Status Filter::DoProcessOutputBuffer(int recvArg, bool dropFrame, bool byIdx, uint32_t idx, int64_t renderTimee) +{ + return Status::OK; +} + +// should only call in this cpp +void Filter::ChangeState(FilterState state) +{ + AVTRANS_LOGI("%{public}s > %{public}d", name_.c_str(), state); + Media::AutoLock lock(stateMutex_); + curState_ = state; + cond_.NotifyOne(); +} + +Status Filter::WaitAllState(FilterState state) +{ + Media::AutoLock lock(stateMutex_); + if (curState_ != state) { + cond_.WaitFor(lock, 30000, [this, state] { // 30000 ms timeout + return curState_ == state || curState_ == FilterState::ERROR; + }); + if (curState_ != state) { + AVTRANS_LOGE("Filter(%{public}s) wait state %{public}d fail, curState %{public}d", + name_.c_str(), state, curState_); + return GetErrCode(); + } + } + + Status res = Status::OK; + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + if (filter->WaitAllState(state) != Status::OK) { + res = filter->GetErrCode(); + } + } + } + return res; +} + +bool Filter::IsDesignatedState(FilterState state) +{ + return curState_ == state; +} + +void Filter::SetErrCode(Status errCode) +{ + errCode_ = errCode; +} + +Status Filter::GetErrCode() +{ + return errCode_; +} + +void Filter::SetParameter(const std::shared_ptr& meta) +{ + meta_ = meta; +} + +void Filter::GetParameter(std::shared_ptr& meta) +{ + meta = meta_; +} + +Status Filter::LinkNext(const std::shared_ptr&, StreamType) +{ + return Status::OK; +} + +Status Filter::UpdateNext(const std::shared_ptr&, StreamType) +{ + return Status::OK; +} + +Status Filter::UnLinkNext(const std::shared_ptr&, StreamType) +{ + return Status::OK; +} + +FilterType Filter::GetFilterType() +{ + return filterType_; +}; + +Status Filter::OnLinked(StreamType, const std::shared_ptr&, const std::shared_ptr&) +{ + return Status::OK; +}; + +Status Filter::OnUpdated(StreamType, const std::shared_ptr&, const std::shared_ptr&) +{ + return Status::OK; +} + +Status Filter::OnUnLinked(StreamType, const std::shared_ptr&) +{ + return Status::OK; +} +} // namespace Pipeline +} // namespace DistributedHardware +} // namespace OHOS diff --git a/av_transport/framework/filter/src/filter_factory.cpp b/av_transport/framework/filter/src/filter_factory.cpp new file mode 100644 index 00000000..65b49581 --- /dev/null +++ b/av_transport/framework/filter/src/filter_factory.cpp @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filter_factory.h" + +namespace OHOS { +namespace DistributedHardware { +namespace Pipeline { +FilterFactory& FilterFactory::Instance() +{ + static FilterFactory instance; + return instance; +} + +std::shared_ptr FilterFactory::CreateFilterPriv(const std::string& filterName, const FilterType type) +{ + auto it = generators.find(type); + if (it != generators.end()) { + return it->second(filterName, type); + } + return nullptr; +} +} // namespace Pipeline +} // namespace DistributedHardware +} // namespace OHOS diff --git a/av_transport/framework/pipeline/include/pipeline.h b/av_transport/framework/pipeline/include/pipeline.h new file mode 100644 index 00000000..7acc208d --- /dev/null +++ b/av_transport/framework/pipeline/include/pipeline.h @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_AV_PIPELINE_H +#define OHOS_AV_PIPELINE_H + +#include +#include +#include + +#include "osal/task/mutex.h" + +#include "pipeline_status.h" +#include "filter.h" + +namespace OHOS { +namespace DistributedHardware { +namespace Pipeline { +class FilterCallback; + +class Pipeline : public EventReceiver { +public: + ~Pipeline(); + + void Init(const std::shared_ptr& receiver, + const std::shared_ptr& callback, const std::string& groupId); + + Status Prepare(); + + Status Start(); + + Status Pause(); + + Status Resume(); + + Status Stop(); + + Status Flush(); + + Status Release(); + + Status Preroll(bool render); + + Status SetPlayRange(int64_t start, int64_t end); + + Status AddHeadFilters(std::vector> filters); + + Status RemoveHeadFilter(const std::shared_ptr& filter); + + Status LinkFilters(const std::shared_ptr& preFilter, + const std::vector>& filters, StreamType type); + + Status UpdateFilters(const std::shared_ptr& preFilter, + const std::vector>& filters, StreamType type); + + Status UnLinkFilters(const std::shared_ptr& preFilter, + const std::vector>& filters, StreamType type); + + void OnEvent(const Event& event) override; + + static int32_t GetNextPipelineId(); +private: + std::string groupId_; + Media::Mutex mutex_ {}; + std::vector> filters_ {}; + std::shared_ptr eventReceiver_ {nullptr}; + std::shared_ptr filterCallback_ {nullptr}; +}; +} // namespace Pipeline +} // namespace DistributedHardware +} // namespace OHOS +#endif // OHOS_AV_PIPELINE_H diff --git a/av_transport/framework/pipeline/src/pipeline.cpp b/av_transport/framework/pipeline/src/pipeline.cpp new file mode 100644 index 00000000..653eb523 --- /dev/null +++ b/av_transport/framework/pipeline/src/pipeline.cpp @@ -0,0 +1,323 @@ +/* + * Copyright (c) 2024 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pipeline/include/pipeline.h" + +#include +#include + +#include "osal/task/autolock.h" +#include "osal/task/jobutils.h" + +#include "av_trans_log.h" + +#undef DH_LOG_TAG +#define DH_LOG_TAG "Pipeline" + +namespace OHOS { +namespace DistributedHardware { +namespace Pipeline { +static std::atomic pipeLineId = 0; + +int32_t Pipeline::GetNextPipelineId() +{ + return pipeLineId++; +} + +Pipeline::~Pipeline() +{ +} + +void Pipeline::Init(const std::shared_ptr& receiver, const std::shared_ptr& callback, + const std::string& groupId) +{ + AVTRANS_LOGI("Pipeline::Init"); + eventReceiver_ = receiver; + filterCallback_ = callback; + groupId_ = groupId; +} + +Status Pipeline::Prepare() +{ + AVTRANS_LOGI("Prepare enter."); + Status ret = Status::OK; + Media::SubmitJobOnce([&] { + Media::AutoLock lock(mutex_); + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + ret = (*it)->Prepare(); + if (ret != Status::OK) { + return; + } + } + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + ret = (*it)->WaitAllState(FilterState::READY); + if (ret != Status::OK) { + return; + } + } + }); + AVTRANS_LOGI("Prepare done ret = %{public}d", ret); + return ret; +} + +Status Pipeline::Start() +{ + AVTRANS_LOGI("Start enter."); + Status ret = Status::OK; + Media::SubmitJobOnce([&] { + Media::AutoLock lock(mutex_); + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + ret = (*it)->Start(); + if (ret != Status::OK) { + return; + } + } + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + ret = (*it)->WaitAllState(FilterState::RUNNING); + if (ret != Status::OK) { + return; + } + } + }); + AVTRANS_LOGI("Start done ret = %{public}d", ret); + return ret; +} + +Status Pipeline::Pause() +{ + AVTRANS_LOGI("Pause enter."); + Status ret = Status::OK; + Media::SubmitJobOnce([&] { + Media::AutoLock lock(mutex_); + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + auto rtv = (*it)->Pause(); + if (rtv != Status::OK) { + ret = rtv; + } + } + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + auto rtv = (*it)->WaitAllState(FilterState::PAUSED); + if (rtv != Status::OK) { + ret = rtv; + } + } + }); + AVTRANS_LOGI("Pause done ret = %{public}d", ret); + return ret; +} + +Status Pipeline::Resume() +{ + AVTRANS_LOGI("Resume enter."); + Status ret = Status::OK; + Media::SubmitJobOnce([&] { + Media::AutoLock lock(mutex_); + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + ret = (*it)->Resume(); + if (ret != Status::OK) { + return; + } + } + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + ret = (*it)->WaitAllState(FilterState::RUNNING); + if (ret != Status::OK) { + return; + } + } + }); + AVTRANS_LOGI("Resume done ret = %{public}d", ret); + return ret; +} + +Status Pipeline::Stop() +{ + AVTRANS_LOGI("Stop enter."); + Status ret = Status::OK; + Media::SubmitJobOnce([&] { + Media::AutoLock lock(mutex_); + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + AVTRANS_LOGE("Pipeline error: %{public}zu", filters_.size()); + continue; + } + auto rtv = (*it)->Stop(); + if (rtv != Status::OK) { + ret = rtv; + } + } + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + auto rtv = (*it)->WaitAllState(FilterState::STOPPED); + if (rtv != Status::OK) { + ret = rtv; + } + } + filters_.clear(); + }); + AVTRANS_LOGI("Stop done ret = %{public}d", ret); + return ret; +} + +Status Pipeline::Flush() +{ + AVTRANS_LOGI("Flush enter."); + Media::SubmitJobOnce([&] { + Media::AutoLock lock(mutex_); + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + (*it)->Flush(); + } + }); + AVTRANS_LOGI("Flush end."); + return Status::OK; +} + +Status Pipeline::Release() +{ + AVTRANS_LOGI("Release enter."); + Media::SubmitJobOnce([&] { + Media::AutoLock lock(mutex_); + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + (*it)->Release(); + } + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + (*it)->WaitAllState(FilterState::RELEASED); + } + filters_.clear(); + }); + AVTRANS_LOGI("Release done."); + return Status::OK; +} + +Status Pipeline::Preroll(bool render) +{ + AVTRANS_LOGI("Preroll enter."); + Status ret = Status::OK; + Media::AutoLock lock(mutex_); + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + auto rtv = (*it)->Preroll(); + if (rtv != Status::OK) { + ret = rtv; + AVTRANS_LOGI("Preroll done ret = %{public}d", ret); + return ret; + } + } + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + auto rtv = (*it)->WaitPrerollDone(render); + if (rtv != Status::OK) { + ret = rtv; + AVTRANS_LOGI("Preroll done ret = %{public}d", ret); + return ret; + } + } + AVTRANS_LOGI("Preroll done ret = %{public}d", ret); + return ret; +} + +Status Pipeline::SetPlayRange(int64_t start, int64_t end) +{ + AVTRANS_LOGI("SetPlayRange enter."); + Media::SubmitJobOnce([&] { + Media::AutoLock lock(mutex_); + for (auto it = filters_.begin(); it != filters_.end(); ++it) { + (*it)->SetPlayRange(start, end); + } + }); + AVTRANS_LOGI("SetPlayRange done."); + return Status::OK; +} + +Status Pipeline::AddHeadFilters(std::vector> filtersIn) +{ + AVTRANS_LOGI("AddHeadFilters enter."); + std::vector> filtersToAdd; + for (auto& filterIn : filtersIn) { + bool matched = false; + for (const auto& filter : filters_) { + if (filterIn == filter) { + matched = true; + break; + } + } + if (!matched) { + filtersToAdd.push_back(filterIn); + filterIn->LinkPipeLine(groupId_); + } + } + if (filtersToAdd.empty()) { + AVTRANS_LOGI("filter already exists"); + return Status::OK; + } + Media::SubmitJobOnce([&] { + Media::AutoLock lock(mutex_); + this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end()); + }); + AVTRANS_LOGI("AddHeadFilters done."); + return Status::OK; +} + +Status Pipeline::RemoveHeadFilter(const std::shared_ptr& filter) +{ + Media::SubmitJobOnce([&] { + Media::AutoLock lock(mutex_); + auto it = std::find_if(filters_.begin(), filters_.end(), + [&filter](const std::shared_ptr& filterPtr) { return filterPtr == filter; }); + if (it != filters_.end()) { + filters_.erase(it); + } + filter->Release(); + filter->WaitAllState(FilterState::RELEASED); + filter->ClearAllNextFilters(); + return Status::OK; + }); + return Status::OK; +} + +Status Pipeline::LinkFilters(const std::shared_ptr &preFilter, + const std::vector> &nextFilters, + StreamType type) +{ + for (auto nextFilter : nextFilters) { + auto ret = preFilter->LinkNext(nextFilter, type); + nextFilter->LinkPipeLine(groupId_); + TRUE_RETURN_V(ret != Status::OK, ret); + } + return Status::OK; +} + +Status Pipeline::UpdateFilters(const std::shared_ptr &preFilter, + const std::vector> &nextFilters, + StreamType type) +{ + for (auto nextFilter : nextFilters) { + preFilter->UpdateNext(nextFilter, type); + } + return Status::OK; +} + +Status Pipeline::UnLinkFilters(const std::shared_ptr &preFilter, + const std::vector> &nextFilters, + StreamType type) +{ + for (auto nextFilter : nextFilters) { + preFilter->UnLinkNext(nextFilter, type); + } + return Status::OK; +} + +void Pipeline::OnEvent(const Event& event) +{ +} +} // namespace Pipeline +} // namespace DistributedHardware +} // namespace OHOS diff --git a/bundle.json b/bundle.json index 7633d195..b22ad73b 100644 --- a/bundle.json +++ b/bundle.json @@ -63,16 +63,17 @@ }, "build": { "sub_component": [ - "//foundation/distributedhardware/distributed_hardware_fwk/utils:distributedhardwareutils", - "//foundation/distributedhardware/distributed_hardware_fwk/services/distributedhardwarefwkservice:distributedhardwarefwksvr", - "//foundation/distributedhardware/distributed_hardware_fwk/sa_profile:dhfwk_sa_profile", - "//foundation/distributedhardware/distributed_hardware_fwk/sa_profile:dhardware.cfg", - "//foundation/distributedhardware/distributed_hardware_fwk/interfaces/inner_kits:libdhfwk_sdk", - "//foundation/distributedhardware/distributed_hardware_fwk/interfaces/kits/napi:hardwaremanager", - "//foundation/distributedhardware/distributed_hardware_fwk/av_transport/av_trans_engine/av_sender:distributed_av_sender", + "//foundation/distributedhardware/distributed_hardware_fwk/application:DHardware_UI", "//foundation/distributedhardware/distributed_hardware_fwk/av_transport/av_trans_engine/av_receiver:distributed_av_receiver", + "//foundation/distributedhardware/distributed_hardware_fwk/av_transport/av_trans_engine/av_sender:distributed_av_sender", "//foundation/distributedhardware/distributed_hardware_fwk/av_transport/av_trans_handler/histreamer_ability_querier:histreamer_ability_querier", - "//foundation/distributedhardware/distributed_hardware_fwk/application:DHardware_UI" + "//foundation/distributedhardware/distributed_hardware_fwk/av_transport/framework:distributed_av_pipeline_fwk", + "//foundation/distributedhardware/distributed_hardware_fwk/interfaces/inner_kits:libdhfwk_sdk", + "//foundation/distributedhardware/distributed_hardware_fwk/interfaces/kits/napi:hardwaremanager", + "//foundation/distributedhardware/distributed_hardware_fwk/sa_profile:dhardware.cfg", + "//foundation/distributedhardware/distributed_hardware_fwk/sa_profile:dhfwk_sa_profile", + "//foundation/distributedhardware/distributed_hardware_fwk/services/distributedhardwarefwkservice:distributedhardwarefwksvr", + "//foundation/distributedhardware/distributed_hardware_fwk/utils:distributedhardwareutils" ], "inner_kits": [ { -- Gitee From 3c54ca1b71aad2eea85b5b49f333dbd56c271c60 Mon Sep 17 00:00:00 2001 From: Bobie Date: Fri, 1 Nov 2024 15:33:11 +0800 Subject: [PATCH 2/2] framework fix Signed-off-by: Bobie --- .../framework/filter/include/filter_factory.h | 2 +- av_transport/framework/filter/src/filter.cpp | 82 +++++++++++++------ .../framework/pipeline/src/pipeline.cpp | 60 +++++++++++++- 3 files changed, 114 insertions(+), 30 deletions(-) diff --git a/av_transport/framework/filter/include/filter_factory.h b/av_transport/framework/filter/include/filter_factory.h index 952eae2e..f85274cd 100644 --- a/av_transport/framework/filter/include/filter_factory.h +++ b/av_transport/framework/filter/include/filter_factory.h @@ -43,7 +43,7 @@ public: std::shared_ptr CreateFilter(const std::string& filterName, const FilterType type) { auto filter = CreateFilterPriv(filterName, type); - auto typedFilter = ReinterpretPointerCast(filter); + auto typedFilter = Media::ReinterpretPointerCast(filter); return typedFilter; } diff --git a/av_transport/framework/filter/src/filter.cpp b/av_transport/framework/filter/src/filter.cpp index f1bd5d79..805fccf9 100644 --- a/av_transport/framework/filter/src/filter.cpp +++ b/av_transport/framework/filter/src/filter.cpp @@ -97,7 +97,9 @@ Status Filter::PrepareDone() } for (auto iter : nextFiltersMap_) { for (auto filter : iter.second) { - filter->Prepare(); + if (filter != nullptr) { + filter->Prepare(); + } } } ChangeState(FilterState::READY); @@ -117,15 +119,17 @@ Status Filter::Start() filter->Start(); } } - } else { - for (auto iter : nextFiltersMap_) { - for (auto filter : iter.second) { + return Status::OK; + } + + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + if (filter != nullptr) { filter->Start(); } } - return StartDone(); } - return Status::OK; + return StartDone(); } Status Filter::StartDone() @@ -147,7 +151,9 @@ Status Filter::Pause() } for (auto iter : nextFiltersMap_) { for (auto filter : iter.second) { - filter->Pause(); + if (filter != nullptr) { + filter->Pause(); + } } } return ret; @@ -162,7 +168,9 @@ Status Filter::PauseDragging() } for (auto iter : nextFiltersMap_) { for (auto filter : iter.second) { - filter->PauseDragging(); + if (filter != nullptr) { + filter->PauseDragging(); + } } } return ret; @@ -190,15 +198,17 @@ Status Filter::Resume() filter->Resume(); } } - } else { - for (auto iter : nextFiltersMap_) { - for (auto filter : iter.second) { + return Status::OK; + } + + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + if (filter != nullptr) { filter->Resume(); } } - return ResumeDone(); } - return Status::OK; + return ResumeDone(); } Status Filter::ResumeDone() @@ -223,15 +233,17 @@ Status Filter::ResumeDragging() filter->ResumeDragging(); } } - } else { - for (auto iter : nextFiltersMap_) { - for (auto filter : iter.second) { + return Status::OK; + } + + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + if (filter != nullptr) { filter->ResumeDragging(); } } - return DoResumeDragging(); } - return Status::OK; + return DoResumeDragging(); } Status Filter::Stop() @@ -244,7 +256,9 @@ Status Filter::Stop() } for (auto iter : nextFiltersMap_) { for (auto filter : iter.second) { - filter->Stop(); + if (filter != nullptr) { + filter->Stop(); + } } } return ret; @@ -264,7 +278,9 @@ Status Filter::Flush() AVTRANS_LOGD("Flush %{public}s, pState:%{public}d", name_.c_str(), curState_); for (auto iter : nextFiltersMap_) { for (auto filter : iter.second) { - filter->Flush(); + if (filter != nullptr) { + filter->Flush(); + } } } jobIdxBase_ = jobIdx_; @@ -278,20 +294,23 @@ Status Filter::Release() filterTask_->SubmitJobOnce([this]() { ReleaseDone(); }); + for (auto iter : nextFiltersMap_) { for (auto filter : iter.second) { filter->Release(); } } - } else { - for (auto iter : nextFiltersMap_) { - for (auto filter : iter.second) { + return Status::OK; + } + + for (auto iter : nextFiltersMap_) { + for (auto filter : iter.second) { + if (filter != nullptr) { filter->Release(); } } - return ReleaseDone(); } - return Status::OK; + return ReleaseDone(); } Status Filter::ReleaseDone() @@ -308,7 +327,9 @@ Status Filter::SetPlayRange(int64_t start, int64_t end) AVTRANS_LOGD("SetPlayRange %{public}s, pState:%{public}d", name_.c_str(), curState_); for (auto iter : nextFiltersMap_) { for (auto filter : iter.second) { - filter->SetPlayRange(start, end); + if (filter != nullptr) { + filter->SetPlayRange(start, end); + } } } return DoSetPlayRange(start, end); @@ -322,6 +343,9 @@ Status Filter::Preroll() } for (auto iter : nextFiltersMap_) { for (auto filter : iter.second) { + if (filter == nullptr) { + continue; + } ret = filter->Preroll(); if (ret != Status::OK) { return ret; @@ -336,6 +360,9 @@ Status Filter::WaitPrerollDone(bool render) Status ret = Status::OK; for (auto iter : nextFiltersMap_) { for (auto filter : iter.second) { + if (filter == nullptr) { + continue; + } auto curRet = filter->WaitPrerollDone(render); if (curRet != Status::OK) { ret = curRet; @@ -505,6 +532,9 @@ Status Filter::WaitAllState(FilterState state) Status res = Status::OK; for (auto iter : nextFiltersMap_) { for (auto filter : iter.second) { + if (filter == nullptr) { + continue; + } if (filter->WaitAllState(state) != Status::OK) { res = filter->GetErrCode(); } diff --git a/av_transport/framework/pipeline/src/pipeline.cpp b/av_transport/framework/pipeline/src/pipeline.cpp index 653eb523..b8ff3113 100644 --- a/av_transport/framework/pipeline/src/pipeline.cpp +++ b/av_transport/framework/pipeline/src/pipeline.cpp @@ -56,12 +56,18 @@ Status Pipeline::Prepare() Media::SubmitJobOnce([&] { Media::AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } ret = (*it)->Prepare(); if (ret != Status::OK) { return; } } for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } ret = (*it)->WaitAllState(FilterState::READY); if (ret != Status::OK) { return; @@ -79,12 +85,18 @@ Status Pipeline::Start() Media::SubmitJobOnce([&] { Media::AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } ret = (*it)->Start(); if (ret != Status::OK) { return; } } for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } ret = (*it)->WaitAllState(FilterState::RUNNING); if (ret != Status::OK) { return; @@ -102,12 +114,18 @@ Status Pipeline::Pause() Media::SubmitJobOnce([&] { Media::AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } auto rtv = (*it)->Pause(); if (rtv != Status::OK) { ret = rtv; } } for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } auto rtv = (*it)->WaitAllState(FilterState::PAUSED); if (rtv != Status::OK) { ret = rtv; @@ -125,12 +143,18 @@ Status Pipeline::Resume() Media::SubmitJobOnce([&] { Media::AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } ret = (*it)->Resume(); if (ret != Status::OK) { return; } } for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } ret = (*it)->WaitAllState(FilterState::RUNNING); if (ret != Status::OK) { return; @@ -158,6 +182,9 @@ Status Pipeline::Stop() } } for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } auto rtv = (*it)->WaitAllState(FilterState::STOPPED); if (rtv != Status::OK) { ret = rtv; @@ -175,6 +202,9 @@ Status Pipeline::Flush() Media::SubmitJobOnce([&] { Media::AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } (*it)->Flush(); } }); @@ -188,9 +218,15 @@ Status Pipeline::Release() Media::SubmitJobOnce([&] { Media::AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } (*it)->Release(); } for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } (*it)->WaitAllState(FilterState::RELEASED); } filters_.clear(); @@ -205,6 +241,9 @@ Status Pipeline::Preroll(bool render) Status ret = Status::OK; Media::AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } auto rtv = (*it)->Preroll(); if (rtv != Status::OK) { ret = rtv; @@ -213,6 +252,9 @@ Status Pipeline::Preroll(bool render) } } for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } auto rtv = (*it)->WaitPrerollDone(render); if (rtv != Status::OK) { ret = rtv; @@ -230,6 +272,9 @@ Status Pipeline::SetPlayRange(int64_t start, int64_t end) Media::SubmitJobOnce([&] { Media::AutoLock lock(mutex_); for (auto it = filters_.begin(); it != filters_.end(); ++it) { + if (*it == nullptr) { + continue; + } (*it)->SetPlayRange(start, end); } }); @@ -242,6 +287,9 @@ Status Pipeline::AddHeadFilters(std::vector> filtersIn) AVTRANS_LOGI("AddHeadFilters enter."); std::vector> filtersToAdd; for (auto& filterIn : filtersIn) { + if (filterIn == nullptr) { + continue; + } bool matched = false; for (const auto& filter : filters_) { if (filterIn == filter) { @@ -275,9 +323,11 @@ Status Pipeline::RemoveHeadFilter(const std::shared_ptr& filter) if (it != filters_.end()) { filters_.erase(it); } - filter->Release(); - filter->WaitAllState(FilterState::RELEASED); - filter->ClearAllNextFilters(); + if (filter != nullptr) { + filter->Release(); + filter->WaitAllState(FilterState::RELEASED); + filter->ClearAllNextFilters(); + } return Status::OK; }); return Status::OK; @@ -287,7 +337,9 @@ Status Pipeline::LinkFilters(const std::shared_ptr &preFilter, const std::vector> &nextFilters, StreamType type) { + TRUE_RETURN_V(preFilter == nullptr, Status::ERROR_NULL_POINTER); for (auto nextFilter : nextFilters) { + TRUE_RETURN_V(nextFilter == nullptr, Status::ERROR_NULL_POINTER); auto ret = preFilter->LinkNext(nextFilter, type); nextFilter->LinkPipeLine(groupId_); TRUE_RETURN_V(ret != Status::OK, ret); @@ -299,6 +351,7 @@ Status Pipeline::UpdateFilters(const std::shared_ptr &preFilter, const std::vector> &nextFilters, StreamType type) { + TRUE_RETURN_V(preFilter == nullptr, Status::ERROR_NULL_POINTER); for (auto nextFilter : nextFilters) { preFilter->UpdateNext(nextFilter, type); } @@ -309,6 +362,7 @@ Status Pipeline::UnLinkFilters(const std::shared_ptr &preFilter, const std::vector> &nextFilters, StreamType type) { + TRUE_RETURN_V(preFilter == nullptr, Status::ERROR_NULL_POINTER); for (auto nextFilter : nextFilters) { preFilter->UnLinkNext(nextFilter, type); } -- Gitee