From f06c9a023f39790d101e9e1a60e999c925161f4f Mon Sep 17 00:00:00 2001 From: byndyx Date: Mon, 21 Jul 2025 20:58:02 +0800 Subject: [PATCH] add timestamp Signed-off-by: byndyx --- .../src/av_audio_receiver_engine.cpp | 9 +++-- .../av_sender/src/av_audio_sender_engine.cpp | 2 ++ .../av_trans_audio_decoder_filter.cpp | 22 ++++++++++-- .../av_trans_audio_decoder_filter.h | 2 ++ .../av_trans_audio_encoder_filter.cpp | 35 ++++++++++++++++++- .../av_trans_audio_encoder_filter.h | 4 +++ .../av_trans_audio_input_filter.cpp | 11 +----- .../av_trans_bus_input_filter.cpp | 33 +++++++++++++++-- .../av_trans_bus_input_filter.h | 2 +- .../dsoftbus_output_filter.cpp | 13 +++++-- .../av_trans_output/dsoftbus_output_filter.h | 2 +- av_transport/common/include/av_trans_buffer.h | 6 ++++ av_transport/common/src/av_trans_buffer.cpp | 19 ++++++++++ 13 files changed, 135 insertions(+), 25 deletions(-) diff --git a/av_transport/av_trans_engine/av_receiver/src/av_audio_receiver_engine.cpp b/av_transport/av_trans_engine/av_receiver/src/av_audio_receiver_engine.cpp index 20dbecc7..ee8ef973 100644 --- a/av_transport/av_trans_engine/av_receiver/src/av_audio_receiver_engine.cpp +++ b/av_transport/av_trans_engine/av_receiver/src/av_audio_receiver_engine.cpp @@ -630,11 +630,14 @@ int32_t AVAudioReceiverEngine::HandleOutputBuffer(std::shared_ptr transBuffer = std::make_shared(MetaType::AUDIO); - TRUE_RETURN_V_MSG_E(hisBuffer == nullptr || hisBuffer->memory_ == nullptr, ERR_DH_AVT_NULL_POINTER, - "hisBuffer is invalid"); + TRUE_RETURN_V_MSG_E(hisBuffer == nullptr || hisBuffer->memory_ == nullptr || hisBuffer->meta_ == nullptr, + ERR_DH_AVT_NULL_POINTER, "hisBuffer is invalid"); transBuffer->WrapBufferData(hisBuffer->memory_->GetAddr(), hisBuffer->memory_->GetCapacity(), hisBuffer->memory_->GetSize()); - + transBuffer->SetPts(hisBuffer->pts_); + int64_t ptsSpecial = 0; + hisBuffer->meta_->GetData(Media::Tag::USER_FRAME_PTS, ptsSpecial); + transBuffer->SetPtsSpecial(ptsSpecial); SetCurrentState(StateId::PLAYING); TRUE_RETURN_V(receiverCallback_ == nullptr, ERR_DH_AVT_OUTPUT_DATA_FAILED); return receiverCallback_->OnDataAvailable(transBuffer); diff --git a/av_transport/av_trans_engine/av_sender/src/av_audio_sender_engine.cpp b/av_transport/av_trans_engine/av_sender/src/av_audio_sender_engine.cpp index 7e59e038..7b4bc3cf 100644 --- a/av_transport/av_trans_engine/av_sender/src/av_audio_sender_engine.cpp +++ b/av_transport/av_trans_engine/av_sender/src/av_audio_sender_engine.cpp @@ -641,6 +641,8 @@ int32_t AVAudioSenderEngine::PushData(const std::shared_ptr &buff return ERR_DH_AVT_PREPARE_FAILED; } outBuffer->memory_->Write(data->GetAddress(), bufferSize, 0); + outBuffer->pts_ = buffer->GetPts(); + AVTRANS_LOGI("buffer->GetPts(): %{public}" PRId64, buffer->GetPts()); producer->PushBuffer(outBuffer, true); SetCurrentState(StateId::PLAYING); return DH_AVT_SUCCESS; diff --git a/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_decoder_filter.cpp b/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_decoder_filter.cpp index d747dd5f..85d3b790 100644 --- a/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_decoder_filter.cpp +++ b/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_decoder_filter.cpp @@ -548,10 +548,18 @@ Status AudioDecoderFilter::ProcessData(std::shared_ptr audioDat TRUE_RETURN_V_MSG_E(err != EOK, Status::ERROR_INVALID_OPERATION, "memcpy_s err: %{public}d, memSize: %{public}d", err, memSize); int64_t pts = 0; - audioData->meta_->GetData(Media::Tag::USER_FRAME_PTS, pts); + pts = audioData->pts_; { std::lock_guard lock(ptsMutex_); - ptsMap_.insert(std::make_pair(index, pts)); + ptsMap_[frameInIndex_] = pts; + frameInIndex_++; + AVTRANS_LOGI("frameInIndex_: %{public}" PRIu64 " pts: %{public}" PRId64, frameInIndex_, pts); + } + if (frameInIndex_ == 15) { + audioData->meta_->GetData(Media::Tag::USER_FRAME_PTS, pts); + ptsMap_[15] = pts; + AVTRANS_LOGI("the fifth special process pts: %{public}" PRId64, pts); + frameInIndex_ = 0; } codecMem->buffer_->pts_ = pts; codecMem->buffer_->meta_->SetData(Media::Tag::USER_FRAME_PTS, pts); @@ -616,14 +624,22 @@ void AudioDecoderFilter::OnDecOutputBufferAvailable(uint32_t index, OH_AVBuffer int64_t pts = 0; { std::lock_guard lock(ptsMutex_); - auto iter = ptsMap_.find(index); + auto iter = ptsMap_.find(frameOutIndex_); if (iter != ptsMap_.end()) { pts = iter->second; ptsMap_.erase(iter); } + frameOutIndex_++; } outBuffer->pts_ = pts; meta->SetData(Media::Tag::USER_FRAME_PTS, pts); + AVTRANS_LOGI("after AudioDecoderFilter index %{public}" PRIu64", pts: %{public}" PRId64, frameOutIndex_, pts); + + if (frameOutIndex_ == 15) { + meta->SetData(Media::Tag::USER_FRAME_PTS, ptsMap_[15]); + AVTRANS_LOGI("the fifth special process pts: %{public}" PRId64, ptsMap_[15]); + frameOutIndex_ = 0; + } outBuffer->memory_->Write(buffer->buffer_->memory_->GetAddr(), buffer->buffer_->memory_->GetSize(), 0); outputProducer_->PushBuffer(outBuffer, true); auto ret = OH_AudioCodec_FreeOutputBuffer(audioDecoder_, index); diff --git a/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_decoder_filter.h b/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_decoder_filter.h index 07c70e91..4b3d63d5 100644 --- a/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_decoder_filter.h +++ b/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_decoder_filter.h @@ -148,6 +148,8 @@ private: std::queue> inputDataBufferQueue_; std::map ptsMap_; std::mutex ptsMutex_; + uint64_t frameInIndex_ = 0; + uint64_t frameOutIndex_ = 0; }; } // namespace Pipeline } // namespace DistributedHardware diff --git a/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_encoder_filter.cpp b/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_encoder_filter.cpp index 37624f16..c6d4c984 100644 --- a/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_encoder_filter.cpp +++ b/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_encoder_filter.cpp @@ -547,7 +547,17 @@ Status AudioEncoderFilter::ProcessData(std::shared_ptr audioDat "memcpy_s err: %{public}d, memSize: %{public}d", err, memSize); int64_t pts = 0; audioData->meta_->GetData(Media::Tag::USER_FRAME_PTS, pts); + { + std::lock_guard lock(ptsMutex_); + ptsMap_[frameInIndex_] = pts; + AVTRANS_LOGI("ProcessData frameInIndex_ %{public}" PRId64", pts: %{public}" PRId64, frameInIndex_, pts); + frameInIndex_++; + if (frameInIndex_ % 16 == 0) { + frameInIndex_ = 0; + } + } codecMem->buffer_->meta_->SetData(Media::Tag::USER_FRAME_PTS, pts); + AVTRANS_LOGI("before AudioEncoderFilter index %{public}u, pts: %{public}" PRId64, index, pts); codecMem->buffer_->flag_ = MediaAVCodec::AVCODEC_BUFFER_FLAG_NONE; auto ret = OH_AudioCodec_PushInputBuffer(audioEncoder_, index); TRUE_RETURN_V_MSG_E(ret != AV_ERR_OK, Status::ERROR_INVALID_OPERATION, @@ -607,9 +617,32 @@ void AudioEncoderFilter::OnEncOutputBufferAvailable(uint32_t index, OH_AVBuffer return; } int64_t pts = 0; - buffer->buffer_->meta_->GetData(Media::Tag::USER_FRAME_PTS, pts); + { + std::lock_guard lock(ptsMutex_); + auto iter = ptsMap_.find(frameOutIndex_); + if (iter != ptsMap_.end()) { + pts = iter->second; + ptsMap_.erase(iter); + } + frameOutIndex_++; + } outBuffer->pts_ = pts; meta->SetData(Media::Tag::USER_FRAME_PTS, pts); + AVTRANS_LOGD("after AudioEncoderFilter index %{public}u, frameOutIndex_ %{public}" PRIu64", pts: %{public}" PRId64, + index, frameOutIndex_, pts); + if (frameOutIndex_ == 15) { + { + std::lock_guard lock(ptsMutex_); + auto iter = ptsMap_.find(frameOutIndex_); + if (iter != ptsMap_.end()) { + pts = iter->second; + ptsMap_.erase(iter); + } + } + AVTRANS_LOGD("the fifth special process index %{public}" PRIu64", pts: %{public}" PRId64, frameOutIndex_, pts); + meta->SetData(Media::Tag::USER_FRAME_PTS, pts); + frameOutIndex_ = 0; + } meta->SetData(Media::Tag::AUDIO_OBJECT_NUMBER, index); outBuffer->memory_->Write(buffer->buffer_->memory_->GetAddr(), buffer->buffer_->memory_->GetSize(), 0); outputProducer_->PushBuffer(outBuffer, true); diff --git a/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_encoder_filter.h b/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_encoder_filter.h index aff40982..9b7c2151 100644 --- a/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_encoder_filter.h +++ b/av_transport/av_trans_engine/filters/av_trans_coder/av_trans_audio_encoder_filter.h @@ -144,6 +144,10 @@ private: std::queue codecBufQueue_; std::queue codecIndexQueue_; std::queue> inputDataBufferQueue_; + std::map ptsMap_; + std::mutex ptsMutex_; + uint64_t frameInIndex_ = 0; + uint64_t frameOutIndex_ = 0; }; } // namespace Pipeline } // namespace DistributedHardware diff --git a/av_transport/av_trans_engine/filters/av_trans_input/av_trans_audio_input_filter.cpp b/av_transport/av_trans_engine/filters/av_trans_input/av_trans_audio_input_filter.cpp index c3bbdc85..719fc6d6 100644 --- a/av_transport/av_trans_engine/filters/av_trans_input/av_trans_audio_input_filter.cpp +++ b/av_transport/av_trans_engine/filters/av_trans_input/av_trans_audio_input_filter.cpp @@ -30,16 +30,7 @@ namespace Pipeline { namespace { constexpr int32_t DEFAULT_BUFFER_NUM = 8; constexpr int32_t MAX_TIME_OUT_MS = 1; -constexpr int64_t MS_ONE_S = 1000; -constexpr int64_t NS_ONE_S = 1000000; const std::string INPUT_BUFFER_QUEUE_NAME = "AVTransAudioInputBufferQueue"; - -int64_t GetCurrentTime() -{ - struct timespec time = { 0, 0 }; - clock_gettime(CLOCK_MONOTONIC, &time); - return time.tv_sec * MS_ONE_S + time.tv_nsec / NS_ONE_S; -} } static AutoRegisterFilter g_registerAudioEncoderFilter("builtin.avtrans.audio.input", @@ -264,7 +255,7 @@ Status AVTransAudioInputFilter::ProcessAndSendBuffer(const std::shared_ptrpts_ = GetCurrentTime(); + outBuffer->pts_ = buffer->pts_; meta->SetData(Media::Tag::USER_FRAME_PTS, outBuffer->pts_); outBuffer->memory_->Write(buffer->memory_->GetAddr(), buffer->memory_->GetSize(), 0); outputBufQueProducer_->PushBuffer(outBuffer, true); diff --git a/av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.cpp b/av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.cpp index 0dfcccd7..5265fedf 100644 --- a/av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.cpp +++ b/av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.cpp @@ -35,6 +35,8 @@ constexpr int32_t DEFAULT_BUFFER_NUM = 8; constexpr int32_t MAX_TIME_OUT_MS = 1; const std::string INPUT_BUFFER_QUEUE_NAME = "AVTransBusInputBufferQueue"; const std::string META_TIMESTAMP = "meta_timestamp"; +const std::string META_TIMESTAMP_STRING = "meta_timestamp_string"; +const std::string META_TIMESTAMP_SPECIAL = "meta_timestamp_special"; bool IsUInt32(const cJSON *jsonObj, const std::string &key) { @@ -417,7 +419,7 @@ void AVTransBusInputFilter::OnStreamReceived(const StreamData *data, const Strea cJSON_Delete(resMsg); } -bool AVTransBusInputFilter::UnmarshalAudioMeta(const std::string& jsonStr, int64_t& pts) +bool AVTransBusInputFilter::UnmarshalAudioMeta(const std::string& jsonStr, int64_t& pts, int64_t& ptsSpecial) { cJSON *metaJson = cJSON_Parse(jsonStr.c_str()); if (metaJson == nullptr) { @@ -429,6 +431,29 @@ bool AVTransBusInputFilter::UnmarshalAudioMeta(const std::string& jsonStr, int64 return false; } pts = static_cast(ptsObj->valueint); + cJSON *ptsObj = cJSON_GetObjectItemCaseSensitive(metaJson, META_TIMESTAMP_STRING.c_str()); + if (ptsObj == nullptr || !cJSON_IsString(ptsObj)) { + cJSON_Delete(metaJson); + return false; + } + auto ptsStr = std::string(ptsObj->valuestring); + if (ptsStr.empty()) { + cJSON_Delete(metaJson); + return false; + } + pts = std::stoll(ptsStr); + cJSON *ptsSpecialObj = cJSON_GetObjectItemCaseSensitive(metaJson, META_TIMESTAMP_SPECIAL.c_str()); + if (ptsSpecialObj == nullptr || !cJSON_IsString(ptsSpecialObj)) { + cJSON_Delete(metaJson); + return false; + } + auto ptsSpecialStr = std::string(ptsSpecialObj->valuestring); + if (ptsSpecialStr.empty()) { + cJSON_Delete(metaJson); + return false; + } + ptsSpecial = std::stoll(ptsSpecialStr); + AVTRANS_LOGD("pts: %{public}" PRId64", ptsSpecial: %{public}" PRId64, pts, ptsSpecial); cJSON_Delete(metaJson); return true; } @@ -458,9 +483,11 @@ void AVTransBusInputFilter::StreamDataEnqueue(const StreamData *data, const cJSO return; } int64_t ptsValue = 0; - UnmarshalAudioMeta(std::string(paramItem->valuestring), ptsValue); + int64_t ptsSpecialValue = 0; + UnmarshalAudioMeta(std::string(paramItem->valuestring), ptsValue, ptsSpecialValue); + AVTRANS_LOGI("buffer->GetPts(): %{public}" PRId64, ptsValue); outBuffer->pts_ = ptsValue; - meta->SetData(Media::Tag::USER_FRAME_PTS, ptsValue); + meta->SetData(Media::Tag::USER_FRAME_PTS, ptsSpecialValue); outBuffer->memory_->Write(reinterpret_cast(data->buf), data->bufLen, 0); outputBufQueProducer_->PushBuffer(outBuffer, true); } diff --git a/av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.h b/av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.h index bfeb26a4..4c0f80bf 100644 --- a/av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.h +++ b/av_transport/av_trans_engine/filters/av_trans_input/av_trans_bus_input_filter.h @@ -79,7 +79,7 @@ private: Status ProcessAndSendBuffer(const std::shared_ptr buffer); void StreamDataEnqueue(const StreamData *data, const cJSON *extMsg); std::string TransName2PkgName(const std::string &ownerName); - bool UnmarshalAudioMeta(const std::string& jsonStr, int64_t& pts); + bool UnmarshalAudioMeta(const std::string& jsonStr, int64_t& pts, int64_t& ptsSpecial); private: std::shared_ptr configureParam_ {nullptr}; diff --git a/av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.cpp b/av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.cpp index 432a4909..1a83f919 100644 --- a/av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.cpp +++ b/av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.cpp @@ -33,6 +33,8 @@ namespace Pipeline { constexpr int32_t DEFAULT_BUFFER_NUM = 8; const std::string META_DATA_TYPE = "meta_data_type"; const std::string META_TIMESTAMP = "meta_timestamp"; +const std::string META_TIMESTAMP_STRING = "meta_timestamp_string"; +const std::string META_TIMESTAMP_SPECIAL = "meta_timestamp_special"; const std::string META_FRAME_NUMBER = "meta_frame_number"; const std::string META_EXT_TIMESTAMP = "meta_ext_timestamp"; const std::string META_EXT_FRAME_NUMBER = "meta_ext_frame_number"; @@ -260,7 +262,7 @@ Status DSoftbusOutputFilter::DoProcessOutputBuffer(int recvArg, bool dropFrame, return Status::OK; } -std::string DSoftbusOutputFilter::MarshalAudioMeta(BufferDataType dataType, int64_t pts, uint32_t frameNumber) +std::string DSoftbusOutputFilter::MarshalAudioMeta(BufferDataType dataType, int64_t pts, int64_t ptsSpecail, uint32_t frameNumber) { cJSON *metaJson = cJSON_CreateObject(); if (metaJson == nullptr) { @@ -269,6 +271,8 @@ std::string DSoftbusOutputFilter::MarshalAudioMeta(BufferDataType dataType, int6 cJSON_AddNumberToObject(metaJson, META_DATA_TYPE.c_str(), static_cast(dataType)); cJSON_AddNumberToObject(metaJson, META_TIMESTAMP.c_str(), pts); cJSON_AddNumberToObject(metaJson, META_FRAME_NUMBER.c_str(), frameNumber); + cJSON_AddStringToObject(metaJson, META_TIMESTAMP_STRING.c_str(), std::to_string(pts).c_str()); + cJSON_AddStringToObject(metaJson, META_TIMESTAMP_SPECIAL.c_str(), std::to_string(ptsSpecail).c_str()); char *data = cJSON_PrintUnformatted(metaJson); if (data == nullptr) { cJSON_Delete(metaJson); @@ -288,12 +292,15 @@ Status DSoftbusOutputFilter::ProcessAndSendBuffer(const std::shared_ptrmemory_; int64_t pts = 0; + int64_t ptsSpecail = 0; uint32_t frameNumber = 0; - buffer->meta_->GetData(Media::Tag::USER_FRAME_PTS, pts); + pts = buffer->pts_; + AVTRANS_LOGD("AVBuffer pts is %{public}" PRId64, pts); + buffer->meta_->GetData(Media::Tag::USER_FRAME_PTS, ptsSpecail); buffer->meta_->GetData(Media::Tag::AUDIO_OBJECT_NUMBER, frameNumber); BufferDataType dataType; meta_->GetData(Media::Tag::MEDIA_STREAM_TYPE, dataType); - auto dataParam = MarshalAudioMeta(dataType, pts, frameNumber); + auto dataParam = MarshalAudioMeta(dataType, pts, ptsSpecail, frameNumber); cJSON *jsonObj = cJSON_CreateObject(); if (jsonObj == nullptr) { return Status::ERROR_NULL_POINTER; diff --git a/av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.h b/av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.h index 7313d1fb..6740c18d 100644 --- a/av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.h +++ b/av_transport/av_trans_engine/filters/av_trans_output/dsoftbus_output_filter.h @@ -75,7 +75,7 @@ public: private: void PrepareInputBuffer(); Status ProcessAndSendBuffer(const std::shared_ptr buffer); - std::string MarshalAudioMeta(BufferDataType dataType, int64_t pts, uint32_t frameNumber); + std::string MarshalAudioMeta(BufferDataType dataType, int64_t pts, int64_t ptsSpecail, uint32_t frameNumber); std::shared_ptr meta_ {nullptr}; std::shared_ptr nextFilter_ {nullptr}; diff --git a/av_transport/common/include/av_trans_buffer.h b/av_transport/common/include/av_trans_buffer.h index e919415f..9af839dc 100644 --- a/av_transport/common/include/av_trans_buffer.h +++ b/av_transport/common/include/av_trans_buffer.h @@ -89,10 +89,16 @@ public: uint32_t GetDataCount(); void Reset(); bool IsEmpty(); + void SetPts(int64_t pts); + int64_t GetPts(); + void SetPtsSpecial(int64_t ptsSpecial); + int64_t GetPtsSpecial(); private: std::vector> data_ {}; std::shared_ptr meta_; + int64_t pts_ = 0; + int64_t ptsSpecial_ = 0; }; } // namespace DistributedHardware } // namespace OHOS diff --git a/av_transport/common/src/av_trans_buffer.cpp b/av_transport/common/src/av_trans_buffer.cpp index eed0f4e1..0a754093 100644 --- a/av_transport/common/src/av_trans_buffer.cpp +++ b/av_transport/common/src/av_trans_buffer.cpp @@ -80,6 +80,25 @@ bool AVTransBuffer::IsEmpty() return data_.empty(); } +void AVTransBuffer::SetPts(int64_t pts) +{ + pts_ = pts; +} + +int64_t AVTransBuffer::GetPts() +{ + return pts_; +} + +void AVTransBuffer::SetPtsSpecial(int64_t ptsSpecial) +{ + ptsSpecial_ = ptsSpecial; +} + +int64_t AVTransBuffer::GetPtsSpecial() +{ + return ptsSpecial_; +} void AVTransBuffer::Reset() { if (data_[0]) { -- Gitee