From 029238fdf319a8c4c04d4ff02de660ce4caf6625 Mon Sep 17 00:00:00 2001 From: Bobie Date: Wed, 19 Feb 2025 14:56:27 +0800 Subject: [PATCH] add new input and output filters Signed-off-by: Bobie --- .../av_trans_audio_input_filter.cpp | 358 +++++++++++++ .../av_trans_audio_input_filter.h | 89 ++++ .../av_trans_bus_input_filter.cpp | 495 ++++++++++++++++++ .../av_trans_bus_input_filter.h | 100 ++++ .../av_trans_output/daudio_output_filter.cpp | 315 +++++++++++ .../av_trans_output/daudio_output_filter.h | 89 ++++ .../dsoftbus_output_filter.cpp | 409 +++++++++++++++ .../av_trans_output/dsoftbus_output_filter.h | 98 ++++ 8 files changed, 1953 insertions(+) create mode 100644 av_transport/av_trans_engine/filters/av_trans_input/av_trans_audio_input_filter.cpp create mode 100644 av_transport/av_trans_engine/filters/av_trans_input/av_trans_audio_input_filter.h create mode 100644 av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.cpp create mode 100644 av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.h create mode 100644 av_transport/av_trans_engine/filters/av_trans_output/daudio_output_filter.cpp create mode 100644 av_transport/av_trans_engine/filters/av_trans_output/daudio_output_filter.h create mode 100644 av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.cpp create mode 100644 av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.h diff --git a/av_transport/av_trans_engine/filters/av_trans_input/av_trans_audio_input_filter.cpp b/av_transport/av_trans_engine/filters/av_trans_input/av_trans_audio_input_filter.cpp new file mode 100644 index 00000000..c3bbdc85 --- /dev/null +++ b/av_transport/av_trans_engine/filters/av_trans_input/av_trans_audio_input_filter.cpp @@ -0,0 +1,358 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "av_trans_audio_input_filter.h" + +#include +#include + +#include "av_trans_log.h" +#include "filter_factory.h" + +#undef DH_LOG_TAG +#define DH_LOG_TAG "AudioInputFilter" + +namespace OHOS { +namespace DistributedHardware { +namespace Pipeline { +namespace { +constexpr int32_t DEFAULT_BUFFER_NUM = 8; +constexpr int32_t MAX_TIME_OUT_MS = 1; +constexpr int64_t MS_ONE_S = 1000; +constexpr int64_t NS_ONE_S = 1000000; +const std::string INPUT_BUFFER_QUEUE_NAME = "AVTransAudioInputBufferQueue"; + +int64_t GetCurrentTime() +{ + struct timespec time = { 0, 0 }; + clock_gettime(CLOCK_MONOTONIC, &time); + return time.tv_sec * MS_ONE_S + time.tv_nsec / NS_ONE_S; +} +} + +static AutoRegisterFilter g_registerAudioEncoderFilter("builtin.avtrans.audio.input", + FilterType::FILTERTYPE_SOURCE, + [](const std::string& name, const FilterType type) { + return std::make_shared(name, FilterType::FILTERTYPE_SOURCE); + }); + +class HeadFilterLinkCallback : public FilterLinkCallback { +public: + explicit HeadFilterLinkCallback(std::shared_ptr filter) + : filter_(std::move(filter)) {} + ~HeadFilterLinkCallback() = default; + + void OnLinkedResult(const sptr &queue, std::shared_ptr &meta) override + { + if (auto filter = filter_.lock()) { + filter->OnLinkedResult(queue, meta); + } else { + AVTRANS_LOGI("invalid headFilter"); + } + } + + void OnUnlinkedResult(std::shared_ptr &meta) override + { + if (auto filter = filter_.lock()) { + filter->OnUnlinkedResult(meta); + } else { + AVTRANS_LOGI("invalid headFilter"); + } + } + + void OnUpdatedResult(std::shared_ptr &meta) override + { + if (auto filter = filter_.lock()) { + filter->OnUpdatedResult(meta); + } else { + AVTRANS_LOGI("invalid headFilter"); + } + } + +private: + std::weak_ptr filter_ {}; +}; + +class InputBufAvailableListener : public Media::IConsumerListener { +public: + explicit InputBufAvailableListener(const std::weak_ptr inputFilter) + { + inputFilter_ = inputFilter; + } + + void OnBufferAvailable() override + { + auto inputFilter = inputFilter_.lock(); + if (inputFilter != nullptr) { + inputFilter->ProcessInputBuffer(); + } + } + +private: + std::weak_ptr inputFilter_; +}; + +AVTransAudioInputFilter::AVTransAudioInputFilter(std::string name, FilterType type, bool isAsyncMode) + : Filter(name, type) +{ +} + +AVTransAudioInputFilter::~AVTransAudioInputFilter() +{ + nextFiltersMap_.clear(); +} + +void AVTransAudioInputFilter::Init(const std::shared_ptr& receiver, + const std::shared_ptr& callback) +{ + AVTRANS_LOGI("AVTransAudioInputFilter::Init"); + receiver_ = receiver; + callback_ = callback; + AVTRANS_LOGI("AVTransAudioInputFilter::Init Done"); +} + +Status AVTransAudioInputFilter::DoInitAfterLink() +{ + AVTRANS_LOGI("AVTransAudioInputFilter::DoInitAfterLink"); + return Status::OK; +} + +void AVTransAudioInputFilter::PrepareInputBuffer() +{ + AVTRANS_LOGI("Preparing input buffer."); + int32_t inputBufNum = DEFAULT_BUFFER_NUM; + Media::MemoryType memoryType = Media::MemoryType::VIRTUAL_MEMORY; + if (inputBufQue_ == nullptr) { + inputBufQue_ = Media::AVBufferQueue::Create(inputBufNum, memoryType, INPUT_BUFFER_QUEUE_NAME); + } + if (inputBufQue_ == nullptr) { + AVTRANS_LOGE("Create buffer queue failed."); + return; + } + inputBufQueProducer_ = inputBufQue_->GetProducer(); + TRUE_RETURN((inputBufQueProducer_ == nullptr), "Get producer failed"); + + inputBufQueConsumer_ = inputBufQue_->GetConsumer(); + TRUE_RETURN((inputBufQueConsumer_ == nullptr), "Get consumer failed"); + + sptr listener(new InputBufAvailableListener(shared_from_this())); + inputBufQueConsumer_->SetBufferAvailableListener(listener); +} + +Status AVTransAudioInputFilter::DoPrepare() +{ + AVTRANS_LOGI("AVTransAudioInputFilter::DoPrepare"); + PrepareInputBuffer(); + frameNumber_.store(0); + if (callback_ == nullptr) { + AVTRANS_LOGE("filter callback is null"); + return Status::ERROR_NULL_POINTER; + } + TRUE_RETURN_V_MSG_E(meta_ == nullptr, Status::ERROR_NULL_POINTER, "meta_ is nullptr"); + int32_t mimeType = 0; + auto filterType = StreamType::STREAMTYPE_ENCODED_AUDIO; + meta_->GetData(Media::Tag::MIME_TYPE, mimeType); + if (static_cast(mimeType) == AudioCodecType::AUDIO_CODEC_AAC) { + filterType = StreamType::STREAMTYPE_RAW_AUDIO; + } + AVTRANS_LOGD("mimeType: %{public}d.", mimeType); + callback_->OnCallback(shared_from_this(), FilterCallBackCommand::NEXT_FILTER_NEEDED, filterType); + AVTRANS_LOGI("AVTransAudioInputFilter::DoPrepare done"); + return Status::OK; +} + +Status AVTransAudioInputFilter::DoStart() +{ + AVTRANS_LOGI("AVTransAudioInputFilter::DoStart"); + return Status::OK; +} + +Status AVTransAudioInputFilter::DoPause() +{ + AVTRANS_LOGI("AVTransAudioInputFilter::DoPause"); + return Status::OK; +} + +Status AVTransAudioInputFilter::DoPauseDragging() +{ + AVTRANS_LOGI("AVTransAudioInputFilter::DoPauseDragging"); + return Status::OK; +} + +Status AVTransAudioInputFilter::DoResume() +{ + AVTRANS_LOGI("AVTransAudioInputFilter::DoResume"); + frameNumber_.store(0); + return Status::OK; +} + +Status AVTransAudioInputFilter::DoResumeDragging() +{ + AVTRANS_LOGI("AVTransAudioInputFilter::DoResumeDragging"); + return Status::OK; +} + +Status AVTransAudioInputFilter::DoStop() +{ + AVTRANS_LOGI("AVTransAudioInputFilter::DoStop"); + return Status::OK; +} + +Status AVTransAudioInputFilter::DoFlush() +{ + AVTRANS_LOGI("AVTransAudioInputFilter::DoFlush"); + return Status::OK; +} + +Status AVTransAudioInputFilter::DoRelease() +{ + AVTRANS_LOGI("AVTransAudioInputFilter::DoRelease"); + return Status::OK; +} + +Status AVTransAudioInputFilter::DoProcessInputBuffer(int recvArg, bool dropFrame) +{ + AVTRANS_LOGD("DoProcessInputBuffer"); + (void)recvArg; + (void)dropFrame; + std::shared_ptr filledBuffer = nullptr; + if (curState_ != FilterState::RUNNING) { + AVTRANS_LOGE("Current status ia not running."); + return Status::ERROR_WRONG_STATE; + } + Media::Status ret = inputBufQueConsumer_->AcquireBuffer(filledBuffer); + if (ret != Media::Status::OK) { + AVTRANS_LOGE("Acquire buffer err."); + return Status::ERROR_INVALID_OPERATION; + } + ProcessAndSendBuffer(filledBuffer); + inputBufQueConsumer_->ReleaseBuffer(filledBuffer); + return Status::OK; +} + +Status AVTransAudioInputFilter::ProcessAndSendBuffer(const std::shared_ptr buffer) +{ + if (buffer == nullptr || buffer->memory_ == nullptr) { + AVTRANS_LOGE("AVBuffer is null"); + return Status::ERROR_NULL_POINTER; + } + + TRUE_RETURN_V_MSG_E((outputBufQueProducer_ == nullptr), Status::ERROR_NULL_POINTER, "Producer is null"); + Media::AVBufferConfig config(buffer->GetConfig()); + AVTRANS_LOGD("outPut config, size: %{public}u, capacity: %{public}u, memtype: %{public}hhu", + config.size, config.capacity, config.memoryType); + std::shared_ptr outBuffer = nullptr; + outputBufQueProducer_->RequestBuffer(outBuffer, config, MAX_TIME_OUT_MS); + TRUE_RETURN_V_MSG_E((outBuffer == nullptr || outBuffer->memory_ == nullptr), Status::ERROR_NULL_POINTER, + "OutBuffer or memory is null"); + auto meta = outBuffer->meta_; + if (meta == nullptr) { + AVTRANS_LOGE("Meta of AVBuffer is null"); + outputBufQueProducer_->PushBuffer(outBuffer, true); + return Status::ERROR_NULL_POINTER; + } + ++frameNumber_; + outBuffer->pts_ = GetCurrentTime(); + meta->SetData(Media::Tag::USER_FRAME_PTS, outBuffer->pts_); + outBuffer->memory_->Write(buffer->memory_->GetAddr(), buffer->memory_->GetSize(), 0); + outputBufQueProducer_->PushBuffer(outBuffer, true); + return Status::OK; +} + +Status AVTransAudioInputFilter::DoProcessOutputBuffer(int recvArg, bool dropFrame, bool byIdx, + uint32_t idx, int64_t renderTimee) +{ + AVTRANS_LOGI("AVTransAudioInputFilter::DoProcessOutputBuffer"); + return Status::OK; +} + +void AVTransAudioInputFilter::SetParameter(const std::shared_ptr& meta) +{ + meta_ = meta; +} + +void AVTransAudioInputFilter::GetParameter(std::shared_ptr& meta) +{ + meta = meta_; +} + +Status AVTransAudioInputFilter::LinkNext(const std::shared_ptr& nextFilter, StreamType outType) +{ + AVTRANS_LOGI("cur: AVTransAudioInputFilter, link next filter.."); + nextFilter_ = nextFilter; + nextFiltersMap_[outType].push_back(nextFilter_); + auto filterLinkCallback = std::make_shared(shared_from_this()); + auto ret = nextFilter->OnLinked(outType, meta_, filterLinkCallback); + if (ret != Status::OK) { + AVTRANS_LOGE("Onlinked failed, status: %{public}d.", ret); + return ret; + } + return Status::OK; +} + +Status AVTransAudioInputFilter::UpdateNext(const std::shared_ptr&, StreamType) +{ + return Status::OK; +} + +Status AVTransAudioInputFilter::UnLinkNext(const std::shared_ptr&, StreamType) +{ + AVTRANS_LOGI("cur: AVTransAudioInputFilter, unlink next filter.."); + return Status::OK; +} + +Status AVTransAudioInputFilter::OnLinked(StreamType inType, const std::shared_ptr &meta, + const std::shared_ptr &callback) +{ + AVTRANS_LOGI("cur: AVTransAudioInputFilter, OnLinked"); + return Status::OK; +}; + +Status AVTransAudioInputFilter::OnUpdated(StreamType, const std::shared_ptr&, + const std::shared_ptr&) +{ + return Status::OK; +} + +Status AVTransAudioInputFilter::OnUnLinked(StreamType, const std::shared_ptr&) +{ + AVTRANS_LOGI("cur: AVTransAudioInputFilter, OnUnLinked."); + return Status::OK; +} + +void AVTransAudioInputFilter::OnLinkedResult(const sptr& queue, + std::shared_ptr& meta) +{ + AVTRANS_LOGI("cur: AVTransAudioInputFilter, OnLinkedResult"); + outputBufQueProducer_ = queue; +} + +void AVTransAudioInputFilter::OnUnlinkedResult(std::shared_ptr& meta) +{ + (void)meta; +} + +void AVTransAudioInputFilter::OnUpdatedResult(std::shared_ptr& meta) +{ + (void)meta; +} + +sptr AVTransAudioInputFilter::GetInputBufQueProducer() +{ + return inputBufQueProducer_; +} +} // namespace Pipeline +} // namespace DistributedHardware +} // namespace OHOS diff --git a/av_transport/av_trans_engine/filters/av_trans_input/av_trans_audio_input_filter.h b/av_transport/av_trans_engine/filters/av_trans_input/av_trans_audio_input_filter.h new file mode 100644 index 00000000..47bfdc07 --- /dev/null +++ b/av_transport/av_trans_engine/filters/av_trans_input/av_trans_audio_input_filter.h @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_AV_TRANS_AUDIO_INPUT_FILTER_H +#define OHOS_AV_TRANS_AUDIO_INPUT_FILTER_H + +#include +#include +#include + +#include "buffer/avbuffer_queue.h" +#include "buffer/avbuffer_queue_consumer.h" +#include "buffer/avbuffer_queue_producer.h" + +#include "pipeline_status.h" +#include "filter.h" + +namespace OHOS { +namespace DistributedHardware { +namespace Pipeline { +class AVTransAudioInputFilter : public Filter, public std::enable_shared_from_this { +public: + AVTransAudioInputFilter(std::string name, FilterType type, bool asyncMode = false); + virtual ~AVTransAudioInputFilter(); + + void Init(const std::shared_ptr& receiver, const std::shared_ptr& callback) override; + Status DoInitAfterLink() override; + Status DoPrepare() override; + Status DoStart() override; + Status DoPause() override; + Status DoPauseDragging() override; + Status DoResume() override; + Status DoResumeDragging() override; + Status DoStop() override; + Status DoFlush() override; + Status DoRelease() override; + + Status DoProcessInputBuffer(int recvArg, bool dropFrame) override; + Status DoProcessOutputBuffer(int recvArg, bool dropFrame, bool byIdx, uint32_t idx, int64_t renderTime) override; + void SetParameter(const std::shared_ptr& meta) override; + void GetParameter(std::shared_ptr& meta) override; + + Status LinkNext(const std::shared_ptr& nextFilter, StreamType outType) override; + Status UpdateNext(const std::shared_ptr& nextFilter, StreamType outType) override; + Status UnLinkNext(const std::shared_ptr& nextFilter, StreamType outType) override; + + Status OnLinked(StreamType inType, const std::shared_ptr& meta, + const std::shared_ptr& callback) override; + Status OnUpdated(StreamType inType, const std::shared_ptr& meta, + const std::shared_ptr& callback) override; + Status OnUnLinked(StreamType inType, const std::shared_ptr& callback) override; + + void OnLinkedResult(const sptr& queue, std::shared_ptr& meta); + void OnUnlinkedResult(std::shared_ptr& meta); + void OnUpdatedResult(std::shared_ptr& meta); + sptr GetInputBufQueProducer(); + +private: + void PrepareInputBuffer(); + Status ProcessAndSendBuffer(const std::shared_ptr buffer); + +private: + std::shared_ptr configureParam_ {nullptr}; + std::shared_ptr nextFilter_ {nullptr}; + std::shared_ptr onLinkedResultCallback_ {nullptr}; + + std::shared_ptr inputBufQue_ {nullptr}; + sptr inputBufQueProducer_ {nullptr}; + sptr inputBufQueConsumer_ {nullptr}; + sptr outputBufQueProducer_ {nullptr}; + + std::atomic frameNumber_ {0}; +}; +} // namespace Pipeline +} // namespace DistributedHardware +} // namespace OHOS +#endif diff --git a/av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.cpp b/av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.cpp new file mode 100644 index 00000000..909b0fb4 --- /dev/null +++ b/av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.cpp @@ -0,0 +1,495 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "av_trans_bus_input_filter.h" + +#include +#include + +#include "av_trans_constants.h" +#include "av_trans_errno.h" +#include "av_trans_log.h" +#include "av_trans_types.h" +#include "filter_factory.h" + +#undef DH_LOG_TAG +#define DH_LOG_TAG "AVTransBusInputFilter" + +namespace OHOS { +namespace DistributedHardware { +namespace Pipeline { +namespace { +constexpr int32_t DEFAULT_BUFFER_NUM = 8; +constexpr int32_t MAX_TIME_OUT_MS = 1; +const std::string INPUT_BUFFER_QUEUE_NAME = "AVTransBusInputBufferQueue"; +const std::string META_TIMESTAMP = "meta_timestamp"; + +bool IsUInt32(const cJSON *jsonObj, const std::string &key) +{ + cJSON *keyObj = cJSON_GetObjectItemCaseSensitive(jsonObj, key.c_str()); + return (keyObj != nullptr) && cJSON_IsNumber(keyObj) && + static_cast(keyObj->valueint) <= UINT32_MAX; +} +} + +static AutoRegisterFilter g_registerAudioFilter("builtin.avtrans.softbus.input", + FilterType::AUDIO_DATA_SOURCE, + [](const std::string& name, const FilterType type) { + return std::make_shared(name, FilterType::AUDIO_DATA_SOURCE); + }); + +class BusInputFilterLinkCB : public FilterLinkCallback { +public: + explicit BusInputFilterLinkCB(std::shared_ptr filter) + : filter_(std::move(filter)) {} + ~BusInputFilterLinkCB() = default; + + void OnLinkedResult(const sptr &queue, std::shared_ptr &meta) override + { + if (auto filter = filter_.lock()) { + filter->OnLinkedResult(queue, meta); + } else { + AVTRANS_LOGI("invalid headFilter"); + } + } + + void OnUnlinkedResult(std::shared_ptr &meta) override + { + if (auto filter = filter_.lock()) { + filter->OnUnlinkedResult(meta); + } else { + AVTRANS_LOGI("invalid headFilter"); + } + } + + void OnUpdatedResult(std::shared_ptr &meta) override + { + if (auto filter = filter_.lock()) { + filter->OnUpdatedResult(meta); + } else { + AVTRANS_LOGI("invalid headFilter"); + } + } + +private: + std::weak_ptr filter_ {}; +}; + +class InputBufAvailableListener : public Media::IConsumerListener { +public: + explicit InputBufAvailableListener(const std::weak_ptr inputFilter) + { + inputFilter_ = inputFilter; + } + + void OnBufferAvailable() override + { + auto inputFilter = inputFilter_.lock(); + if (inputFilter != nullptr) { + inputFilter->ProcessInputBuffer(); + } + } + +private: + std::weak_ptr inputFilter_; +}; + +AVTransBusInputFilter::AVTransBusInputFilter(std::string name, FilterType type, bool isAsyncMode) + : Filter(name, type) +{ +} + +AVTransBusInputFilter::~AVTransBusInputFilter() +{ + nextFiltersMap_.clear(); +} + +void AVTransBusInputFilter::Init(const std::shared_ptr& receiver, + const std::shared_ptr& callback) +{ + AVTRANS_LOGI("AVTransBusInputFilter::Init"); + receiver_ = receiver; + callback_ = callback; + AVTRANS_LOGI("AVTransBusInputFilter::Init Done"); +} + +Status AVTransBusInputFilter::DoInitAfterLink() +{ + AVTRANS_LOGI("AVTransBusInputFilter::DoInitAfterLink"); + return Status::OK; +} + +void AVTransBusInputFilter::PrepareInputBuffer() +{ + AVTRANS_LOGI("Preparing input buffer."); + int32_t inputBufNum = DEFAULT_BUFFER_NUM; + Media::MemoryType memoryType = Media::MemoryType::VIRTUAL_MEMORY; + if (inputBufQue_ == nullptr) { + inputBufQue_ = Media::AVBufferQueue::Create(inputBufNum, memoryType, INPUT_BUFFER_QUEUE_NAME); + } + if (inputBufQue_ == nullptr) { + AVTRANS_LOGE("Create buffer queue failed."); + return; + } + inputBufQueProducer_ = inputBufQue_->GetProducer(); + TRUE_RETURN((inputBufQueProducer_ == nullptr), "Get producer failed"); + + inputBufQueConsumer_ = inputBufQue_->GetConsumer(); + TRUE_RETURN((inputBufQueConsumer_ == nullptr), "Get consumer failed"); + + sptr listener(new InputBufAvailableListener(shared_from_this())); + inputBufQueConsumer_->SetBufferAvailableListener(listener); +} + +Status AVTransBusInputFilter::DoPrepare() +{ + AVTRANS_LOGI("AVTransBusInputFilter::DoPrepare"); + std::string str; + if (meta_ != nullptr) { + meta_->GetData(Media::Tag::MEDIA_DESCRIPTION, str); + } + cJSON *jParam = cJSON_Parse(str.c_str()); + TRUE_RETURN_V_MSG_E(jParam == nullptr, Status::ERROR_NULL_POINTER, "Failed to parse json."); + cJSON *ownerName = cJSON_GetObjectItem(jParam, KEY_ONWER_NAME.c_str()); + if (ownerName == nullptr || !cJSON_IsString(ownerName)) { + AVTRANS_LOGE("The key ownerName is null."); + cJSON_Delete(jParam); + return Status::ERROR_NULL_POINTER; + } + ownerName_ = std::string(ownerName->valuestring); + cJSON *peerDevId = cJSON_GetObjectItem(jParam, KEY_PEERDEVID_NAME.c_str()); + if (peerDevId == nullptr || !cJSON_IsString(peerDevId)) { + AVTRANS_LOGE("The key peerDevId is null."); + cJSON_Delete(jParam); + return Status::ERROR_NULL_POINTER; + } + peerDevId_ = std::string(peerDevId->valuestring); + cJSON_Delete(jParam); + AVTRANS_LOGD("ownerName = %{public}s, peerDevId = %{public}s.", ownerName_.c_str(), peerDevId_.c_str()); + sessionName_ = ownerName_ + "_" + RECEIVER_DATA_SESSION_NAME_SUFFIX; + SoftbusChannelAdapter::GetInstance().RegisterChannelListener(sessionName_, peerDevId_, this); + int32_t ret = SoftbusChannelAdapter::GetInstance().CreateChannelServer(TransName2PkgName(ownerName_), sessionName_); + TRUE_RETURN_V_MSG_E(ret != DH_AVT_SUCCESS, Status::ERROR_INVALID_OPERATION, + "Create Session Server failed ret: %{public}d.", ret); + TRUE_RETURN_V_MSG_E(callback_ == nullptr || meta_ == nullptr, Status::ERROR_NULL_POINTER, "callback is null"); + int32_t mimeType = 0; + auto filterType = StreamType::STREAMTYPE_DECODED_AUDIO; + meta_->GetData(Media::Tag::MIME_TYPE, mimeType); + if (static_cast(mimeType) == AudioCodecType::AUDIO_CODEC_AAC) { + filterType = StreamType::STREAMTYPE_RAW_AUDIO; + } + callback_->OnCallback(shared_from_this(), FilterCallBackCommand::NEXT_FILTER_NEEDED, filterType); + return Status::OK; +} + +Status AVTransBusInputFilter::DoStart() +{ + AVTRANS_LOGI("AVTransBusInputFilter::DoStart"); + return Status::OK; +} + +Status AVTransBusInputFilter::DoPause() +{ + AVTRANS_LOGI("AVTransBusInputFilter::DoPause"); + return Status::OK; +} + +Status AVTransBusInputFilter::DoPauseDragging() +{ + AVTRANS_LOGI("AVTransBusInputFilter::DoPauseDragging"); + return Status::OK; +} + +Status AVTransBusInputFilter::DoResume() +{ + AVTRANS_LOGI("AVTransBusInputFilter::DoResume"); + return Status::OK; +} + +Status AVTransBusInputFilter::DoResumeDragging() +{ + AVTRANS_LOGI("AVTransBusInputFilter::DoResumeDragging"); + return Status::OK; +} + +Status AVTransBusInputFilter::DoStop() +{ + AVTRANS_LOGI("AVTransBusInputFilter::DoStop"); + return Status::OK; +} + +Status AVTransBusInputFilter::DoFlush() +{ + AVTRANS_LOGI("AVTransBusInputFilter::DoFlush"); + return Status::OK; +} + +Status AVTransBusInputFilter::DoRelease() +{ + AVTRANS_LOGI("AVTransBusInputFilter::DoRelease"); + SoftbusChannelAdapter::GetInstance().RemoveChannelServer(TransName2PkgName(ownerName_), sessionName_); + SoftbusChannelAdapter::GetInstance().UnRegisterChannelListener(sessionName_, peerDevId_); + return Status::OK; +} + +Status AVTransBusInputFilter::DoProcessInputBuffer(int recvArg, bool dropFrame) +{ + AVTRANS_LOGI("AVTransBusInputFilter::DoProcessInputBuffer"); + (void)recvArg; + (void)dropFrame; + std::shared_ptr filledBuffer = nullptr; + if (curState_ != FilterState::RUNNING) { + AVTRANS_LOGE("Current status ia not running."); + return Status::ERROR_WRONG_STATE; + } + Media::Status ret = inputBufQueConsumer_->AcquireBuffer(filledBuffer); + if (ret != Media::Status::OK) { + AVTRANS_LOGE("Acquire buffer err."); + return Status::ERROR_INVALID_OPERATION; + } + ProcessAndSendBuffer(filledBuffer); + inputBufQueConsumer_->ReleaseBuffer(filledBuffer); + return Status::OK; +} + +Status AVTransBusInputFilter::ProcessAndSendBuffer(const std::shared_ptr buffer) +{ + if (buffer == nullptr || buffer->memory_ == nullptr) { + AVTRANS_LOGE("AVBuffer is null"); + return Status::ERROR_NULL_POINTER; + } + + TRUE_RETURN_V_MSG_E((outputBufQueProducer_ == nullptr), Status::ERROR_NULL_POINTER, "Producer is null"); + Media::AVBufferConfig config(buffer->GetConfig()); + AVTRANS_LOGD("outPut config, size: %{public}u, capacity: %{public}u, memtype: %{public}hhu", + config.size, config.capacity, config.memoryType); + std::shared_ptr outBuffer = nullptr; + outputBufQueProducer_->RequestBuffer(outBuffer, config, MAX_TIME_OUT_MS); + TRUE_RETURN_V_MSG_E((outBuffer == nullptr || outBuffer->memory_ == nullptr), Status::ERROR_NULL_POINTER, + "OutBuffer or memory is null"); + auto meta = outBuffer->meta_; + if (meta == nullptr) { + AVTRANS_LOGE("Meta of AVBuffer is null"); + outputBufQueProducer_->PushBuffer(outBuffer, true); + return Status::ERROR_NULL_POINTER; + } + meta->SetData(Media::Tag::USER_FRAME_PTS, outBuffer->pts_); + outBuffer->memory_->Write(buffer->memory_->GetAddr(), buffer->memory_->GetSize(), 0); + outputBufQueProducer_->PushBuffer(outBuffer, true); + return Status::OK; +} + +Status AVTransBusInputFilter::DoProcessOutputBuffer(int recvArg, bool dropFrame, bool byIdx, + uint32_t idx, int64_t renderTimee) +{ + AVTRANS_LOGI("AVTransBusInputFilter::DoProcessOutputBuffer"); + return Status::OK; +} + +void AVTransBusInputFilter::SetParameter(const std::shared_ptr& meta) +{ + meta_ = meta; +} + +void AVTransBusInputFilter::GetParameter(std::shared_ptr& meta) +{ + meta = meta_; +} + +Status AVTransBusInputFilter::LinkNext(const std::shared_ptr& nextFilter, StreamType outType) +{ + AVTRANS_LOGI("cur: AVTransBusInputFilter, link next filter.."); + nextFilter_ = nextFilter; + nextFiltersMap_[outType].push_back(nextFilter_); + auto filterLinkCallback = std::make_shared(shared_from_this()); + auto ret = nextFilter->OnLinked(outType, meta_, filterLinkCallback); + if (ret != Status::OK) { + AVTRANS_LOGE("Onlinked failed, status: %{public}d.", ret); + return ret; + } + return Status::OK; +} + +Status AVTransBusInputFilter::UpdateNext(const std::shared_ptr&, StreamType) +{ + return Status::OK; +} + +Status AVTransBusInputFilter::UnLinkNext(const std::shared_ptr&, StreamType) +{ + AVTRANS_LOGI("cur: AVTransBusInputFilter, unlink next filter.."); + return Status::OK; +} + +Status AVTransBusInputFilter::OnLinked(StreamType inType, const std::shared_ptr &meta, + const std::shared_ptr &callback) +{ + AVTRANS_LOGI("cur: AVTransBusInputFilter, OnLinked"); + return Status::OK; +}; + +Status AVTransBusInputFilter::OnUpdated(StreamType, const std::shared_ptr&, + const std::shared_ptr&) +{ + return Status::OK; +} + +Status AVTransBusInputFilter::OnUnLinked(StreamType, const std::shared_ptr&) +{ + AVTRANS_LOGI("cur: AVTransBusInputFilter, OnUnLinked."); + return Status::OK; +} + +void AVTransBusInputFilter::OnLinkedResult(const sptr& queue, + std::shared_ptr& meta) +{ + AVTRANS_LOGI("cur: AVTransBusInputFilter, OnLinkedResult"); + outputBufQueProducer_ = queue; +} + +void AVTransBusInputFilter::OnUnlinkedResult(std::shared_ptr& meta) +{ + (void)meta; +} + +void AVTransBusInputFilter::OnUpdatedResult(std::shared_ptr& meta) +{ + (void)meta; +} + +void AVTransBusInputFilter::OnChannelEvent(const AVTransEvent &event) +{ + AVTRANS_LOGI("OnChannelEvent enter, event type: %{public}d", event.type); + switch (event.type) { + case OHOS::DistributedHardware::EventType::EVENT_CHANNEL_OPENED: { + AVTRANS_LOGD("channel opened."); + TRUE_RETURN(receiver_ == nullptr, "receiver_ is nullptr"); + Event channelEvent; + channelEvent.type = EventType::EVENT_AUDIO_PROGRESS; + channelEvent.param = event; + receiver_->OnEvent(channelEvent); + break; + } + case OHOS::DistributedHardware::EventType::EVENT_CHANNEL_OPEN_FAIL: { + AVTRANS_LOGE("channel open failed."); + break; + } + case OHOS::DistributedHardware::EventType::EVENT_CHANNEL_CLOSED: { + AVTRANS_LOGI("channel closed."); + break; + } + default: + AVTRANS_LOGE("Unsupported event type."); + } +} + +void AVTransBusInputFilter::OnStreamReceived(const StreamData *data, const StreamData *ext) +{ + if (ext == nullptr) { + AVTRANS_LOGE("ext is nullptr."); + return; + } + std::string message(reinterpret_cast(ext->buf), ext->bufLen); + AVTRANS_LOGD("Receive message : %{public}s", message.c_str()); + + cJSON *resMsg = cJSON_Parse(message.c_str()); + if (resMsg == nullptr) { + AVTRANS_LOGE("The resMsg parse failed."); + return; + } + if (!IsUInt32(resMsg, AVT_DATA_META_TYPE)) { + AVTRANS_LOGE("Invalid data type."); + cJSON_Delete(resMsg); + return; + } + StreamDataEnqueue(data, resMsg); + cJSON_Delete(resMsg); +} + +bool AVTransBusInputFilter::UnmarshalAudioMeta(const std::string& jsonStr, int64_t& pts) +{ + cJSON *metaJson = cJSON_Parse(jsonStr.c_str()); + if (metaJson == nullptr) { + return false; + } + cJSON *ptsObj = cJSON_GetObjectItemCaseSensitive(metaJson, META_TIMESTAMP.c_str()); + if (ptsObj == nullptr || !cJSON_IsNumber(ptsObj)) { + cJSON_Delete(metaJson); + return false; + } + pts = static_cast(ptsObj->valueint); + cJSON_Delete(metaJson); + return true; +} + +void AVTransBusInputFilter::StreamDataEnqueue(const StreamData *data, const cJSON *extMsg) +{ + TRUE_RETURN((outputBufQueProducer_ == nullptr || data == nullptr || extMsg == nullptr), "Producer is null"); + Media::AVBufferConfig config; + config.size = data->bufLen; + config.memoryType = Media::MemoryType::VIRTUAL_MEMORY; + config.memoryFlag = Media::MemoryFlag::MEMORY_READ_WRITE; + AVTRANS_LOGD("outPut config, size: %{public}u, capacity: %{public}u, memtype: %{public}hhu", + config.size, config.capacity, config.memoryType); + std::shared_ptr outBuffer = nullptr; + outputBufQueProducer_->RequestBuffer(outBuffer, config, MAX_TIME_OUT_MS); + TRUE_RETURN((outBuffer == nullptr || outBuffer->memory_ == nullptr), + "OutBuffer or memory is null"); + auto meta = outBuffer->meta_; + if (meta == nullptr) { + AVTRANS_LOGE("Meta of AVBuffer is null"); + outputBufQueProducer_->PushBuffer(outBuffer, false); + return; + } + cJSON *paramItem = cJSON_GetObjectItem(extMsg, AVT_DATA_PARAM.c_str()); + if (paramItem == nullptr || !cJSON_IsString(paramItem)) { + AVTRANS_LOGE("paramItem is invalid."); + return; + } + int64_t ptsValue = 0; + UnmarshalAudioMeta(std::string(paramItem->valuestring), ptsValue); + outBuffer->pts_ = ptsValue; + meta->SetData(Media::Tag::USER_FRAME_PTS, ptsValue); + outBuffer->memory_->Write(reinterpret_cast(data->buf), data->bufLen, 0); + outputBufQueProducer_->PushBuffer(outBuffer, true); +} + +sptr AVTransBusInputFilter::GetInputBufQueProducer() +{ + return inputBufQueProducer_; +} + +std::string AVTransBusInputFilter::TransName2PkgName(const std::string &ownerName) +{ + const static std::pair mapArray[] = { + {OWNER_NAME_D_MIC, PKG_NAME_D_AUDIO}, + {OWNER_NAME_D_VIRMODEM_MIC, PKG_NAME_D_CALL}, + {OWNER_NAME_D_CAMERA, PKG_NAME_D_CAMERA}, + {OWNER_NAME_D_SCREEN, PKG_NAME_D_SCREEN}, + {OWNER_NAME_D_SPEAKER, PKG_NAME_D_AUDIO}, + {OWNER_NAME_D_VIRMODEM_SPEAKER, PKG_NAME_D_CALL}, + {AV_SYNC_SENDER_CONTROL_SESSION_NAME, PKG_NAME_DH_FWK}, + {AV_SYNC_RECEIVER_CONTROL_SESSION_NAME, PKG_NAME_DH_FWK}, + }; + auto foundItem = std::find_if(std::begin(mapArray), std::end(mapArray), + [&](const auto& item) { return item.first == ownerName; }); + if (foundItem != std::end(mapArray)) { + return foundItem->second; + } + return EMPTY_STRING; +} +} // namespace Pipeline +} // namespace DistributedHardware +} // namespace OHOS diff --git a/av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.h b/av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.h new file mode 100644 index 00000000..bfeb26a4 --- /dev/null +++ b/av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.h @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_AV_TRANS_AUDIO_INPUT_FILTER_H +#define OHOS_AV_TRANS_AUDIO_INPUT_FILTER_H + +#include +#include +#include + +#include "buffer/avbuffer_queue.h" +#include "buffer/avbuffer_queue_consumer.h" +#include "buffer/avbuffer_queue_producer.h" +#include "cJSON.h" + +#include "pipeline_status.h" +#include "filter.h" +#include "softbus_channel_adapter.h" + +namespace OHOS { +namespace DistributedHardware { +namespace Pipeline { +class AVTransBusInputFilter : public Filter, + public ISoftbusChannelListener, + public std::enable_shared_from_this { +public: + AVTransBusInputFilter(std::string name, FilterType type, bool asyncMode = false); + virtual ~AVTransBusInputFilter(); + + void Init(const std::shared_ptr& receiver, const std::shared_ptr& callback) override; + Status DoInitAfterLink() override; + Status DoPrepare() override; + Status DoStart() override; + Status DoPause() override; + Status DoPauseDragging() override; + Status DoResume() override; + Status DoResumeDragging() override; + Status DoStop() override; + Status DoFlush() override; + Status DoRelease() override; + + Status DoProcessInputBuffer(int recvArg, bool dropFrame) override; + Status DoProcessOutputBuffer(int recvArg, bool dropFrame, bool byIdx, uint32_t idx, int64_t renderTime) override; + void SetParameter(const std::shared_ptr& meta) override; + void GetParameter(std::shared_ptr& meta) override; + + Status LinkNext(const std::shared_ptr& nextFilter, StreamType outType) override; + Status UpdateNext(const std::shared_ptr& nextFilter, StreamType outType) override; + Status UnLinkNext(const std::shared_ptr& nextFilter, StreamType outType) override; + + Status OnLinked(StreamType inType, const std::shared_ptr& meta, + const std::shared_ptr& callback) override; + Status OnUpdated(StreamType inType, const std::shared_ptr& meta, + const std::shared_ptr& callback) override; + Status OnUnLinked(StreamType inType, const std::shared_ptr& callback) override; + + void OnLinkedResult(const sptr& queue, std::shared_ptr& meta); + void OnUnlinkedResult(std::shared_ptr& meta); + void OnUpdatedResult(std::shared_ptr& meta); + sptr GetInputBufQueProducer(); + + void OnChannelEvent(const AVTransEvent &event) override; + void OnStreamReceived(const StreamData *data, const StreamData *ext) override; + +private: + void PrepareInputBuffer(); + Status ProcessAndSendBuffer(const std::shared_ptr buffer); + void StreamDataEnqueue(const StreamData *data, const cJSON *extMsg); + std::string TransName2PkgName(const std::string &ownerName); + bool UnmarshalAudioMeta(const std::string& jsonStr, int64_t& pts); + +private: + std::shared_ptr configureParam_ {nullptr}; + std::shared_ptr nextFilter_ {nullptr}; + std::shared_ptr onLinkedResultCallback_ {nullptr}; + + std::shared_ptr inputBufQue_ {nullptr}; + sptr inputBufQueProducer_ {nullptr}; + sptr inputBufQueConsumer_ {nullptr}; + sptr outputBufQueProducer_ {nullptr}; + std::string ownerName_; + std::string sessionName_; + std::string peerDevId_; +}; +} // namespace Pipeline +} // namespace DistributedHardware +} // namespace OHOS +#endif diff --git a/av_transport/av_trans_engine/filters/av_trans_output/daudio_output_filter.cpp b/av_transport/av_trans_engine/filters/av_trans_output/daudio_output_filter.cpp new file mode 100644 index 00000000..a09c1c2f --- /dev/null +++ b/av_transport/av_trans_engine/filters/av_trans_output/daudio_output_filter.cpp @@ -0,0 +1,315 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "daudio_output_filter.h" + +#include + +#include "av_trans_log.h" +#include "filter_factory.h" + +#include "cJSON.h" +#include "av_trans_errno.h" +#include "av_trans_log.h" +#include "av_trans_constants.h" + +#undef DH_LOG_TAG +#define DH_LOG_TAG "DAudioOutputFilter" + +namespace OHOS { +namespace DistributedHardware { +namespace Pipeline { +constexpr int32_t DEFAULT_BUFFER_NUM = 8; +const std::string OUTPUT_BUFFER_QUEUE_NAME = "AVTransAudioOutputBufferQueue"; + +static AutoRegisterFilter g_registerAudioEncoderFilter("builtin.daudio.output", + FilterType::FILTERTYPE_SSINK, + [](const std::string& name, const FilterType type) { + return std::make_shared(name, FilterType::FILTERTYPE_SSINK); + }); + +class DAudioOutputFilterLinkCallback : public FilterLinkCallback { +public: + explicit DAudioOutputFilterLinkCallback(std::shared_ptr filter) + : outFilter_(std::move(filter)) {} + ~DAudioOutputFilterLinkCallback() = default; + + void OnLinkedResult(const sptr &queue, std::shared_ptr &meta) override + { + if (auto filter = outFilter_.lock()) { + filter->OnLinkedResult(queue, meta); + } else { + AVTRANS_LOGI("invalid dAudioOutputFilter"); + } + } + + void OnUnlinkedResult(std::shared_ptr &meta) override + { + if (auto filter = outFilter_.lock()) { + filter->OnUnlinkedResult(meta); + } else { + AVTRANS_LOGI("invalid dAudioOutputFilter"); + } + } + + void OnUpdatedResult(std::shared_ptr &meta) override + { + if (auto filter = outFilter_.lock()) { + filter->OnUpdatedResult(meta); + } else { + AVTRANS_LOGI("invalid dAudioOutputFilter"); + } + } + +private: + std::weak_ptr outFilter_ {}; +}; + +class AVBufferAvailableListener : public Media::IConsumerListener { +public: + explicit AVBufferAvailableListener(std::weak_ptr outputFilter) + { + outputFilter_ = outputFilter; + } + + void OnBufferAvailable() override + { + auto outputFilter = outputFilter_.lock(); + if (outputFilter != nullptr) { + outputFilter->ProcessInputBuffer(); + } + } + +private: + std::weak_ptr outputFilter_; +}; + +DAudioOutputFilter::DAudioOutputFilter(std::string name, FilterType type) + : Filter(name, type) +{ +} + +DAudioOutputFilter::~DAudioOutputFilter() +{ + nextFiltersMap_.clear(); +} + +void DAudioOutputFilter::Init(const std::shared_ptr& receiver, + const std::shared_ptr& callback) +{ + eventReceiver_ = receiver; + filterCallback_ = callback; +} + +Status DAudioOutputFilter::DoInitAfterLink() +{ + return Status::OK; +} + +void DAudioOutputFilter::PrepareInputBuffer() +{ + AVTRANS_LOGI("Preparing input buffer."); + int32_t outputBufNum = DEFAULT_BUFFER_NUM; + Media::MemoryType memoryType = Media::MemoryType::VIRTUAL_MEMORY; + if (outputBufQue_ == nullptr) { + outputBufQue_ = Media::AVBufferQueue::Create(outputBufNum, memoryType, OUTPUT_BUFFER_QUEUE_NAME); + } + if (outputBufQue_ == nullptr) { + AVTRANS_LOGE("Create buffer queue failed."); + return; + } + inputBufQueProducer_ = outputBufQue_->GetProducer(); + TRUE_RETURN((inputBufQueProducer_ == nullptr), "Get producer failed"); + + inputBufQueConsumer_ = outputBufQue_->GetConsumer(); + TRUE_RETURN((inputBufQueConsumer_ == nullptr), "Get consumer failed"); + + sptr listener(new AVBufferAvailableListener(shared_from_this())); + inputBufQueConsumer_->SetBufferAvailableListener(listener); +} + +Status DAudioOutputFilter::DoPrepare() +{ + PrepareInputBuffer(); + int32_t inputBufNum = DEFAULT_BUFFER_NUM; + sptr producer = nullptr; + Media::MemoryType memoryType = Media::MemoryType::UNKNOWN_MEMORY; + if (outputBufQue_ == nullptr) { + outputBufQue_ = Media::AVBufferQueue::Create(inputBufNum, memoryType, OUTPUT_BUFFER_QUEUE_NAME); + } + if (outputBufQue_ == nullptr) { + AVTRANS_LOGE("Create buffer queue failed."); + return Status::ERROR_NULL_POINTER; + } + producer = outputBufQue_->GetProducer(); + TRUE_RETURN_V_MSG_E((producer == nullptr), Status::ERROR_NULL_POINTER, "Get producer failed"); + sptr consumer = outputBufQue_->GetConsumer(); + sptr listener(new AVBufferAvailableListener(shared_from_this())); + consumer->SetBufferAvailableListener(listener); + + std::shared_ptr meta = std::make_shared(); + if (onLinkedResultCallback_ != nullptr) { + onLinkedResultCallback_->OnLinkedResult(producer, meta); + } + return Status::OK; +} + +Status DAudioOutputFilter::DoStart() +{ + AVTRANS_LOGI("Do Start"); + return Status::OK; +} + +Status DAudioOutputFilter::DoPause() +{ + return Status::OK; +} + +Status DAudioOutputFilter::DoPauseDragging() +{ + return Status::OK; +} + +Status DAudioOutputFilter::DoResume() +{ + return Status::OK; +} + +Status DAudioOutputFilter::DoResumeDragging() +{ + return Status::OK; +} + +Status DAudioOutputFilter::DoStop() +{ + return Status::OK; +} + +Status DAudioOutputFilter::DoFlush() +{ + return Status::OK; +} + +Status DAudioOutputFilter::DoRelease() +{ + return Status::OK; +} + +Status DAudioOutputFilter::DoProcessInputBuffer(int recvArg, bool dropFrame) +{ + (void)recvArg; + (void)dropFrame; + std::shared_ptr filledBuffer = nullptr; + Media::Status ret = inputBufQueConsumer_->AcquireBuffer(filledBuffer); + if (ret != Media::Status::OK) { + AVTRANS_LOGE("Acquire buffer err: %{public}d.", ret); + return Status::ERROR_INVALID_OPERATION; + } + ProcessAndSendBuffer(filledBuffer); + inputBufQueConsumer_->ReleaseBuffer(filledBuffer); + return Status::OK; +} + +Status DAudioOutputFilter::DoProcessOutputBuffer(int recvArg, bool dropFrame, bool byIdx, + uint32_t idx, int64_t renderTimee) +{ + return Status::OK; +} + +Status DAudioOutputFilter::ProcessAndSendBuffer(const std::shared_ptr buffer) +{ + if (buffer == nullptr) { + AVTRANS_LOGE("ProcessAndSendBuffer buffer is nullptr"); + return Status::ERROR_INVALID_OPERATION; + } + TRUE_RETURN_V_MSG_E(eventReceiver_ == nullptr, Status::ERROR_NULL_POINTER, "receiver_ is nullptr"); + Event event; + event.type = EventType::EVENT_BUFFER_PROGRESS; + event.param = buffer; + eventReceiver_->OnEvent(event); + return Status::OK; +} + +void DAudioOutputFilter::SetParameter(const std::shared_ptr& meta) +{ + meta_ = meta; +} + +void DAudioOutputFilter::GetParameter(std::shared_ptr& meta) +{ + meta = meta_; +} + +Status DAudioOutputFilter::LinkNext(const std::shared_ptr& nextFilter, StreamType outType) +{ + nextFilter_ = nextFilter; + nextFiltersMap_[outType].push_back(nextFilter_); + auto filterLinkCallback = std::make_shared(shared_from_this()); + auto ret = nextFilter->OnLinked(outType, meta_, filterLinkCallback); + if (ret != Status::OK) { + AVTRANS_LOGE("Onlinked failed, status: %{public}d.", ret); + return ret; + } + return Status::OK; +} + +Status DAudioOutputFilter::UpdateNext(const std::shared_ptr&, StreamType) +{ + return Status::OK; +} + +Status DAudioOutputFilter::UnLinkNext(const std::shared_ptr&, StreamType) +{ + AVTRANS_LOGI("cur: DAudioOutputFilter, unlink next filter.."); + return Status::OK; +} + +Status DAudioOutputFilter::OnLinked(StreamType inType, const std::shared_ptr &meta, + const std::shared_ptr &callback) +{ + AVTRANS_LOGI("cur: DAudioOutputFilter, OnLinked"); + onLinkedResultCallback_ = callback; + SetParameter(meta); + return Status::OK; +}; + +Status DAudioOutputFilter::OnUpdated(StreamType, const std::shared_ptr&, + const std::shared_ptr&) +{ + return Status::OK; +} + +Status DAudioOutputFilter::OnUnLinked(StreamType, const std::shared_ptr&) +{ + AVTRANS_LOGI("cur: DAudioOutputFilter, OnUnLinked."); + return Status::OK; +} + +void DAudioOutputFilter::OnLinkedResult(const sptr& queue, + std::shared_ptr& meta) +{ + outputBufQueProducer_ = queue; +} + +void DAudioOutputFilter::OnUnlinkedResult(std::shared_ptr& meta) +{ +} + +void DAudioOutputFilter::OnUpdatedResult(std::shared_ptr& meta) +{ +} +} // namespace Pipeline +} // namespace DistributedHardware +} // namespace OHOS diff --git a/av_transport/av_trans_engine/filters/av_trans_output/daudio_output_filter.h b/av_transport/av_trans_engine/filters/av_trans_output/daudio_output_filter.h new file mode 100644 index 00000000..eaa67ec4 --- /dev/null +++ b/av_transport/av_trans_engine/filters/av_trans_output/daudio_output_filter.h @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_DAUDIO_OUTPUT_FILTER_H +#define OHOS_DAUDIO_OUTPUT_FILTER_H + +#include +#include +#include + +#include "buffer/avbuffer_queue.h" +#include "buffer/avbuffer_queue_consumer.h" +#include "buffer/avbuffer_queue_producer.h" +#include "softbus_channel_adapter.h" + +#include "pipeline_status.h" +#include "filter.h" + +namespace OHOS { +namespace DistributedHardware { +namespace Pipeline { +class DAudioOutputFilter : public Filter, public std::enable_shared_from_this { +public: + DAudioOutputFilter(std::string name, FilterType type); + virtual ~DAudioOutputFilter(); + + void Init(const std::shared_ptr& receiver, const std::shared_ptr& callback) override; + Status DoInitAfterLink() override; + Status DoPrepare() override; + Status DoStart() override; + Status DoPause() override; + Status DoPauseDragging() override; + Status DoResume() override; + Status DoResumeDragging() override; + Status DoStop() override; + Status DoFlush() override; + Status DoRelease() override; + + Status DoProcessInputBuffer(int recvArg, bool dropFrame) override; + Status DoProcessOutputBuffer(int recvArg, bool dropFrame, bool byIdx, uint32_t idx, int64_t renderTime) override; + void SetParameter(const std::shared_ptr& meta) override; + void GetParameter(std::shared_ptr& meta) override; + + Status LinkNext(const std::shared_ptr& nextFilter, StreamType outType) override; + Status UpdateNext(const std::shared_ptr& nextFilter, StreamType outType) override; + Status UnLinkNext(const std::shared_ptr& nextFilter, StreamType outType) override; + + Status OnLinked(StreamType inType, const std::shared_ptr& meta, + const std::shared_ptr& callback) override; + Status OnUpdated(StreamType inType, const std::shared_ptr& meta, + const std::shared_ptr& callback) override; + Status OnUnLinked(StreamType inType, const std::shared_ptr& callback) override; + + void OnLinkedResult(const sptr& queue, std::shared_ptr& meta); + void OnUnlinkedResult(std::shared_ptr& meta); + void OnUpdatedResult(std::shared_ptr& meta); + +private: + void PrepareInputBuffer(); + Status ProcessAndSendBuffer(const std::shared_ptr buffer); + +private: + std::shared_ptr meta_ {nullptr}; + std::shared_ptr nextFilter_ {nullptr}; + std::shared_ptr eventReceiver_ {nullptr}; + std::shared_ptr filterCallback_ {nullptr}; + std::shared_ptr onLinkedResultCallback_ {nullptr}; + + std::shared_ptr outputBufQue_ {nullptr}; + sptr inputBufQueProducer_ {nullptr}; + sptr inputBufQueConsumer_ {nullptr}; + sptr outputBufQueProducer_ {nullptr}; +}; +} // namespace Pipeline +} // namespace DistributedHardware +} // namespace OHOS +#endif diff --git a/av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.cpp b/av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.cpp new file mode 100644 index 00000000..432a4909 --- /dev/null +++ b/av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.cpp @@ -0,0 +1,409 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "dsoftbus_output_filter.h" + +#include + +#include "av_trans_log.h" +#include "filter_factory.h" +#include "cJSON.h" +#include "av_trans_errno.h" +#include "av_trans_log.h" +#include "av_trans_constants.h" + +#undef DH_LOG_TAG +#define DH_LOG_TAG "DSoftbusOutputFilter" + +namespace OHOS { +namespace DistributedHardware { +namespace Pipeline { +constexpr int32_t DEFAULT_BUFFER_NUM = 8; +const std::string META_DATA_TYPE = "meta_data_type"; +const std::string META_TIMESTAMP = "meta_timestamp"; +const std::string META_FRAME_NUMBER = "meta_frame_number"; +const std::string META_EXT_TIMESTAMP = "meta_ext_timestamp"; +const std::string META_EXT_FRAME_NUMBER = "meta_ext_frame_number"; +const std::string INPUT_BUFFER_QUEUE_NAME = "buffer_queue_input"; +const std::string SENDER_DATA_SESSION_NAME_SUFFIX = "sender.avtrans.data"; +const std::string RECEIVER_DATA_SESSION_NAME_SUFFIX = "receiver.avtrans.data"; +const std::string OUTPUT_BUFFER_QUEUE_NAME = "AVTransAudioOutputBufferQueue"; + +static AutoRegisterFilter g_registerAudioEncoderFilter("builtin.avtransport.avoutput", + FilterType::FILTERTYPE_ASINK, + [](const std::string& name, const FilterType type) { + return std::make_shared(name, FilterType::FILTERTYPE_ASINK); + }); + +class DAudioSoftbusFilterLinkCallback : public FilterLinkCallback { +public: + explicit DAudioSoftbusFilterLinkCallback(std::shared_ptr filter) + : outFilter_(std::move(filter)) {} + ~DAudioSoftbusFilterLinkCallback() = default; + + void OnLinkedResult(const sptr &queue, std::shared_ptr &meta) override + { + if (auto filter = outFilter_.lock()) { + filter->OnLinkedResult(queue, meta); + } else { + AVTRANS_LOGI("invalid DSoftbusOutputFilter"); + } + } + + void OnUnlinkedResult(std::shared_ptr &meta) override + { + if (auto filter = outFilter_.lock()) { + filter->OnUnlinkedResult(meta); + } else { + AVTRANS_LOGI("invalid DSoftbusOutputFilter"); + } + } + + void OnUpdatedResult(std::shared_ptr &meta) override + { + if (auto filter = outFilter_.lock()) { + filter->OnUpdatedResult(meta); + } else { + AVTRANS_LOGI("invalid daudioSoftbusOutputFilter"); + } + } + +private: + std::weak_ptr outFilter_ {}; +}; + +class AVBufferAvailableListener : public Media::IConsumerListener { +public: + explicit AVBufferAvailableListener(std::weak_ptr dsoftbusFilter) + { + dsoftbusFilter_ = dsoftbusFilter; + } + + void OnBufferAvailable() override + { + auto dsoftbusFilter = dsoftbusFilter_.lock(); + if (dsoftbusFilter != nullptr) { + dsoftbusFilter->ProcessInputBuffer(); + } + } + +private: + std::weak_ptr dsoftbusFilter_; +}; + +DSoftbusOutputFilter::DSoftbusOutputFilter(std::string name, FilterType type) + : Filter(name, type) +{ +} + +DSoftbusOutputFilter::~DSoftbusOutputFilter() +{ + nextFiltersMap_.clear(); +} + +void DSoftbusOutputFilter::Init(const std::shared_ptr& receiver, + const std::shared_ptr& callback) +{ + eventReceiver_ = receiver; + filterCallback_ = callback; +} + +Status DSoftbusOutputFilter::DoInitAfterLink() +{ + return Status::OK; +} + +void DSoftbusOutputFilter::PrepareInputBuffer() +{ + AVTRANS_LOGI("Preparing input buffer."); + int32_t outputBufNum = DEFAULT_BUFFER_NUM; + Media::MemoryType memoryType = Media::MemoryType::VIRTUAL_MEMORY; + if (outputBufQue_ == nullptr) { + outputBufQue_ = Media::AVBufferQueue::Create(outputBufNum, memoryType, OUTPUT_BUFFER_QUEUE_NAME); + } + if (outputBufQue_ == nullptr) { + AVTRANS_LOGE("Create buffer queue failed."); + return; + } + inputBufQueProducer_ = outputBufQue_->GetProducer(); + TRUE_RETURN((inputBufQueProducer_ == nullptr), "Get producer failed"); + + inputBufQueConsumer_ = outputBufQue_->GetConsumer(); + TRUE_RETURN((inputBufQueConsumer_ == nullptr), "Get consumer failed"); + + sptr listener(new AVBufferAvailableListener(shared_from_this())); + inputBufQueConsumer_->SetBufferAvailableListener(listener); +} + +Status DSoftbusOutputFilter::DoPrepare() +{ + std::string str; + if (meta_ != nullptr) { + meta_->GetData(Media::Tag::MEDIA_DESCRIPTION, str); + } + cJSON *jParam = cJSON_Parse(str.c_str()); + if (jParam == nullptr) { + AVTRANS_LOGE("Failed to parse json."); + return Status::ERROR_NULL_POINTER; + } + cJSON *ownerName = cJSON_GetObjectItem(jParam, KEY_ONWER_NAME.c_str()); + if (ownerName == nullptr || !cJSON_IsString(ownerName)) { + AVTRANS_LOGE("The key ownerName is null."); + cJSON_Delete(jParam); + return Status::ERROR_NULL_POINTER; + } + AVTRANS_LOGI("RegData type is : %{public}s.", ownerName->valuestring); + ownerName_ = std::string(ownerName->valuestring); + + cJSON *peerDevId = cJSON_GetObjectItem(jParam, KEY_PEERDEVID_NAME.c_str()); + if (peerDevId == nullptr || !cJSON_IsString(peerDevId)) { + AVTRANS_LOGE("The key peerDevId is null."); + cJSON_Delete(jParam); + return Status::ERROR_NULL_POINTER; + } + AVTRANS_LOGI("RegData type is : %{public}s.", peerDevId->valuestring); + peerDevId_ = std::string(peerDevId->valuestring); + sessionName_ = ownerName_ + "_" + SENDER_DATA_SESSION_NAME_SUFFIX; + cJSON_Delete(jParam); + PrepareInputBuffer(); + AVTRANS_LOGI("OnLinkedResult."); + + std::shared_ptr meta = std::make_shared(); + if (onLinkedResultCallback_ != nullptr) { + onLinkedResultCallback_->OnLinkedResult(inputBufQueProducer_, meta); + } + return Status::OK; +} + +Status DSoftbusOutputFilter::DoStart() +{ + std::string peerSessName = ownerName_ + "_" + RECEIVER_DATA_SESSION_NAME_SUFFIX; + SoftbusChannelAdapter::GetInstance().RegisterChannelListener(sessionName_, peerDevId_, this); + int32_t ret = SoftbusChannelAdapter::GetInstance().OpenSoftbusChannel(sessionName_, peerSessName, peerDevId_); + if ((ret != DH_AVT_SUCCESS) && (ret != ERR_DH_AVT_SESSION_HAS_OPENED)) { + AVTRANS_LOGE("Open softbus channel failed ret: %{public}d.", ret); + return Status::ERROR_INVALID_OPERATION; + } + return Status::OK; +} + +Status DSoftbusOutputFilter::DoPause() +{ + return Status::OK; +} + +Status DSoftbusOutputFilter::DoPauseDragging() +{ + return Status::OK; +} + +Status DSoftbusOutputFilter::DoResume() +{ + return Status::OK; +} + +Status DSoftbusOutputFilter::DoResumeDragging() +{ + return Status::OK; +} + +Status DSoftbusOutputFilter::DoStop() +{ + int32_t ret = SoftbusChannelAdapter::GetInstance().CloseSoftbusChannel(sessionName_, peerDevId_); + if (ret != DH_AVT_SUCCESS) { + AVTRANS_LOGE("Close softbus channel failed ret: %{public}d.", ret); + return Status::ERROR_NULL_POINTER; + } + return Status::OK; +} + +Status DSoftbusOutputFilter::DoFlush() +{ + return Status::OK; +} + +Status DSoftbusOutputFilter::DoRelease() +{ + return Status::OK; +} + +Status DSoftbusOutputFilter::DoProcessInputBuffer(int recvArg, bool dropFrame) +{ + (void)recvArg; + (void)dropFrame; + std::shared_ptr filledBuffer = nullptr; + Media::Status ret = inputBufQueConsumer_->AcquireBuffer(filledBuffer); + if (ret != Media::Status::OK) { + AVTRANS_LOGE("Acquire buffer err: %{public}d.", ret); + return Status::ERROR_INVALID_OPERATION; + } + ProcessAndSendBuffer(filledBuffer); + inputBufQueConsumer_->ReleaseBuffer(filledBuffer); + return Status::OK; +} + +Status DSoftbusOutputFilter::DoProcessOutputBuffer(int recvArg, bool dropFrame, bool byIdx, + uint32_t idx, int64_t renderTimee) +{ + return Status::OK; +} + +std::string DSoftbusOutputFilter::MarshalAudioMeta(BufferDataType dataType, int64_t pts, uint32_t frameNumber) +{ + cJSON *metaJson = cJSON_CreateObject(); + if (metaJson == nullptr) { + return ""; + } + cJSON_AddNumberToObject(metaJson, META_DATA_TYPE.c_str(), static_cast(dataType)); + cJSON_AddNumberToObject(metaJson, META_TIMESTAMP.c_str(), pts); + cJSON_AddNumberToObject(metaJson, META_FRAME_NUMBER.c_str(), frameNumber); + char *data = cJSON_PrintUnformatted(metaJson); + if (data == nullptr) { + cJSON_Delete(metaJson); + return ""; + } + std::string jsonstr(data); + cJSON_free(data); + cJSON_Delete(metaJson); + return jsonstr; +} + +Status DSoftbusOutputFilter::ProcessAndSendBuffer(const std::shared_ptr buffer) +{ + if (buffer == nullptr || buffer->memory_ == nullptr || buffer->meta_ == nullptr || meta_ == nullptr) { + AVTRANS_LOGE("AVBuffer is nullptr"); + return Status::ERROR_NULL_POINTER; + } + auto bufferData = buffer->memory_; + int64_t pts = 0; + uint32_t frameNumber = 0; + buffer->meta_->GetData(Media::Tag::USER_FRAME_PTS, pts); + buffer->meta_->GetData(Media::Tag::AUDIO_OBJECT_NUMBER, frameNumber); + BufferDataType dataType; + meta_->GetData(Media::Tag::MEDIA_STREAM_TYPE, dataType); + auto dataParam = MarshalAudioMeta(dataType, pts, frameNumber); + cJSON *jsonObj = cJSON_CreateObject(); + if (jsonObj == nullptr) { + return Status::ERROR_NULL_POINTER; + } + cJSON_AddNumberToObject(jsonObj, AVT_DATA_META_TYPE.c_str(), static_cast(dataType)); + cJSON_AddStringToObject(jsonObj, AVT_DATA_PARAM.c_str(), dataParam.c_str()); + auto str = cJSON_PrintUnformatted(jsonObj); + if (str == nullptr) { + cJSON_Delete(jsonObj); + return Status::ERROR_NULL_POINTER; + } + std::string jsonStr = std::string(str); + cJSON_free(str); + StreamData data = {reinterpret_cast(const_cast(bufferData->GetAddr())), + bufferData->GetSize()}; + StreamData ext = {const_cast(jsonStr.c_str()), jsonStr.length()}; + int32_t ret = SoftbusChannelAdapter::GetInstance().SendStreamData(sessionName_, peerDevId_, &data, &ext); + if (ret != DH_AVT_SUCCESS) { + AVTRANS_LOGE("Send data to softbus failed."); + cJSON_Delete(jsonObj); + return Status::ERROR_INVALID_OPERATION; + } + cJSON_Delete(jsonObj); + return Status::OK; +} + +void DSoftbusOutputFilter::SetParameter(const std::shared_ptr& meta) +{ + meta_ = meta; +} + +void DSoftbusOutputFilter::GetParameter(std::shared_ptr& meta) +{ + meta = meta_; +} + +Status DSoftbusOutputFilter::LinkNext(const std::shared_ptr& nextFilter, StreamType outType) +{ + nextFilter_ = nextFilter; + nextFiltersMap_[outType].push_back(nextFilter_); + auto filterLinkCallback = std::make_shared(shared_from_this()); + auto ret = nextFilter->OnLinked(outType, meta_, filterLinkCallback); + if (ret != Status::OK) { + AVTRANS_LOGE("Onlinked failed, status: %{public}d.", ret); + return ret; + } + return Status::OK; +} + +Status DSoftbusOutputFilter::UpdateNext(const std::shared_ptr&, StreamType outType) +{ + return Status::OK; +} + +Status DSoftbusOutputFilter::UnLinkNext(const std::shared_ptr&, StreamType outType) +{ + AVTRANS_LOGI("cur: DSoftbusOutputFilter, unlink next filter.."); + return Status::OK; +} + +Status DSoftbusOutputFilter::OnLinked(StreamType inType, const std::shared_ptr &meta, + const std::shared_ptr &callback) +{ + AVTRANS_LOGI("DSoftbusOutputFilter, OnLinked"); + onLinkedResultCallback_ = callback; + SetParameter(meta); + return Status::OK; +}; + +Status DSoftbusOutputFilter::OnUpdated(StreamType inType, const std::shared_ptr& meta, + const std::shared_ptr& callback) +{ + AVTRANS_LOGI("DSoftbusOutputFilter, OnUpdated"); + return Status::OK; +} + +Status DSoftbusOutputFilter::OnUnLinked(StreamType, const std::shared_ptr& callback) +{ + return Status::OK; +} + +void DSoftbusOutputFilter::OnLinkedResult(const sptr& queue, + std::shared_ptr& meta) +{ + outputBufQueProducer_ = queue; +} + +void DSoftbusOutputFilter::OnUpdatedResult(std::shared_ptr& meta) +{ +} + +void DSoftbusOutputFilter::OnUnlinkedResult(std::shared_ptr& meta) +{ +} + +void DSoftbusOutputFilter::OnChannelEvent(const AVTransEvent &event) +{ + AVTRANS_LOGI("OnChannelEvent enter, event type: %{public}d", event.type); + TRUE_RETURN(eventReceiver_ == nullptr, "receiver_ is nullptr"); + Event channelEvent; + channelEvent.type = EventType::EVENT_AUDIO_PROGRESS; + channelEvent.param = event; + eventReceiver_->OnEvent(channelEvent); +} + +void DSoftbusOutputFilter::OnStreamReceived(const StreamData *data, const StreamData *ext) +{ + (void)data; + (void)ext; +} +} // namespace Pipeline +} // namespace DistributedHardware +} // namespace OHOS diff --git a/av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.h b/av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.h new file mode 100644 index 00000000..7313d1fb --- /dev/null +++ b/av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.h @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef OHOS_DSOFTFBUS_OUTPUT_AUDIO_FILTER_H +#define OHOS_DSOFTFBUS_OUTPUT_AUDIO_FILTER_H + +#include +#include +#include + +#include "buffer/avbuffer_queue.h" +#include "buffer/avbuffer_queue_consumer.h" +#include "buffer/avbuffer_queue_producer.h" +#include "softbus_channel_adapter.h" + +#include "pipeline_status.h" +#include "filter.h" + +namespace OHOS { +namespace DistributedHardware { +namespace Pipeline { +class DSoftbusOutputFilter : public Filter, + public ISoftbusChannelListener, + public std::enable_shared_from_this { +public: + DSoftbusOutputFilter(std::string name, FilterType type); + virtual ~DSoftbusOutputFilter(); + + void Init(const std::shared_ptr& receiver, const std::shared_ptr& callback) override; + Status DoInitAfterLink() override; + Status DoPrepare() override; + Status DoStart() override; + Status DoPause() override; + Status DoPauseDragging() override; + Status DoResume() override; + Status DoResumeDragging() override; + Status DoStop() override; + Status DoFlush() override; + Status DoRelease() override; + + Status DoProcessInputBuffer(int recvArg, bool dropFrame) override; + Status DoProcessOutputBuffer(int recvArg, bool dropFrame, bool byIdx, uint32_t idx, int64_t renderTime) override; + + void SetParameter(const std::shared_ptr& meta) override; + void GetParameter(std::shared_ptr& meta) override; + + Status LinkNext(const std::shared_ptr& nextFilter, StreamType outType) override; + Status UpdateNext(const std::shared_ptr& nextFilter, StreamType outType) override; + Status UnLinkNext(const std::shared_ptr& nextFilter, StreamType outType) override; + + Status OnLinked(StreamType inType, const std::shared_ptr& meta, + const std::shared_ptr& callback) override; + Status OnUpdated(StreamType inType, const std::shared_ptr& meta, + const std::shared_ptr& callback) override; + Status OnUnLinked(StreamType inType, const std::shared_ptr& callback) override; + + void OnLinkedResult(const sptr& queue, std::shared_ptr& meta); + void OnUnlinkedResult(std::shared_ptr& meta); + void OnUpdatedResult(std::shared_ptr& meta); + void OnChannelEvent(const AVTransEvent &event) override; + void OnStreamReceived(const StreamData *data, const StreamData *ext) override; + +private: + void PrepareInputBuffer(); + Status ProcessAndSendBuffer(const std::shared_ptr buffer); + std::string MarshalAudioMeta(BufferDataType dataType, int64_t pts, uint32_t frameNumber); + std::shared_ptr meta_ {nullptr}; + std::shared_ptr nextFilter_ {nullptr}; + + std::shared_ptr eventReceiver_ {nullptr}; + std::shared_ptr filterCallback_ {nullptr}; + std::shared_ptr onLinkedResultCallback_ {nullptr}; + + std::shared_ptr outputBufQue_ {nullptr}; + sptr inputBufQueProducer_ {nullptr}; + sptr inputBufQueConsumer_ {nullptr}; + sptr outputBufQueProducer_ {nullptr}; + std::mutex paramsMapMutex_; + std::string ownerName_; + std::string sessionName_; + std::string peerDevId_; +}; +} // namespace Pipeline +} // namespace DistributedHardware +} // namespace OHOS +#endif -- Gitee