diff --git a/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc b/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc index 7990d48d72d50ed22ec214cbf54ea3e539b49bed..e91a8f79c66dcf11a7d9c79e95c8c4d30fae4360 100644 --- a/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc +++ b/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc @@ -62,6 +62,10 @@ const static int64_t kStringTypeDepth = 64LL; const int64_t kUnknownShapeDepth = 3LL; const uint64_t PARALLEL_MEMORY_TRHESHOLD = 10 * 1024 * 1024ULL; const uint32_t MAX_THREAD_NUM = 4U; +const uint32_t kDataPerRecord = 1024U; +const uint64_t kMicroSecondConvert = 1000000ULL; +const uint64_t kMilliSecondConvert = 1000ULL; +const uint32_t kFormatTimeLen = 64U; std::atomic tdt_release(false); std::atomic is_hold_type(false); @@ -70,6 +74,7 @@ const uint64_t kTotalBytes = 8 * 1024 * 1024 * 1024LL; const int64_t kMaxBytes = 2 * 1024 * 1024 * 1024LL; enum class ChannelType { TDT = 0, ACL_QUEUE = 1, HOST_QUEUE = 2 }; /* ACL_QUEUE indicates mbuf */ enum class ThreadType : size_t { RECV = 0, SEND = 1, BUTT }; +using TimePoint = std::chrono::system_clock::time_point; class HostQueueDatasetOp : public DatasetOpKernel { public: @@ -283,13 +288,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { } ~Iterator() override { - ADP_LOG(EVENT) << "DataThreadPerf[" << dataset()->device_id_ << - "]::channel_name:" << dataset()->channel_name_ << - "[" << buffer_.size() << "," << IsHoldDataTrans() << "], recv [" << - data_thread_perf_stat_[static_cast(ThreadType::RECV)].elapsed_time << "us, " << - data_thread_perf_stat_[static_cast(ThreadType::RECV)].total_bytes << "], send [" << - data_thread_perf_stat_[static_cast(ThreadType::SEND)].elapsed_time << "us, " << - data_thread_perf_stat_[static_cast(ThreadType::SEND)].total_bytes << "]."; + ShowDataThreadPerfRecord(); std::vector stop_message; data_deliver_->ParallelSendDataVec(stop_message); { @@ -321,6 +320,58 @@ class HostQueueDatasetOp : public DatasetOpKernel { ADP_LOG(INFO) << "HostQueueDatasetOp's iterator is released."; } + std::string FormatTimestampToDate(uint64_t timestamp) { + uint64_t time_second = timestamp / kMicroSecondConvert; + uint64_t time_millisecond = (timestamp / kMilliSecondConvert) % kMilliSecondConvert; + uint64_t time_microsecond = timestamp % kMilliSecondConvert; + + time_t time = time_second; + std::tm *localtime = std::localtime(&time); + char time_str[kFormatTimeLen] = { 0 }; + std::strftime(time_str, sizeof(time_str), "%Y-%m-%d %H:%M:%S", localtime); + std::string format_time(time_str); + int ret = sprintf_s(time_str, sizeof(time_str), ".%03llu.%03llu", + time_millisecond, time_microsecond); + if (ret < 0) { + ADP_LOG(WARNING) << "sprintf_s failed."; + return format_time; + } + return format_time + std::string(time_str); + } + + void ShowDataThreadPerfRecord() { + for (uint32_t i = 0; i < static_cast(ThreadType::BUTT); i++) { + uint32_t start_index = 0; + uint32_t record_num = data_thread_perf_stat_[i].data_record_index; + if (data_thread_perf_stat_[i].data_record_index >= kDataPerRecord) { + start_index = data_thread_perf_stat_[i].data_record_index - kDataPerRecord; + record_num = kDataPerRecord; + } + + for (uint32_t j = 0; j < record_num; j++) { + uint32_t index = (start_index + j + 1) % kDataPerRecord; + auto record = &data_thread_perf_stat_[i].data_perf_record[index]; + ADP_LOG(EVENT) << "DataThreadPerf: " << i + << "(0:revc, 1:send), data_index: " << record->data_index + << ", start_time: " << FormatTimestampToDate(record->start_time) + << ", end_time: " << FormatTimestampToDate(record->end_time) + << ", elapsed_time: " << record->elapsed_time + << " us, buffer_size: " << record->buffer_size + << ", total_bytes: " << record->total_bytes << " bytes"; + } + auto record_max = &data_thread_perf_stat_[i].data_per_record_max; + ADP_LOG(EVENT) << "DataThreadPerf: " << i + << "(0:revc, 1:send), device_id: " << dataset()->device_id_ + << ", channel_name: " << dataset()->channel_name_ + << ", longest time data_index: " << record_max->data_index + << ", start_time: " << FormatTimestampToDate(record_max->start_time) + << ", end_time: " << FormatTimestampToDate(record_max->end_time) + << ", elapsed_time: " << record_max->elapsed_time + << " us, buffer_size: " << record_max->buffer_size + << ", total_bytes: " << record_max->total_bytes << " bytes"; + } + } + void DestroyQueue() { ADP_LOG(INFO) << "Start to destroy queue."; if (dataset()->channel_type_ == ChannelType::ACL_QUEUE) { @@ -466,12 +517,29 @@ class HostQueueDatasetOp : public DatasetOpKernel { if (!finish_send_) { showLog(); } } - void RefreshDataThreadPerf(const ThreadType type, const double elapsed_time, - const uint64_t args_total_bytes) { - if (elapsed_time > data_thread_perf_stat_[static_cast(type)].elapsed_time) { - data_thread_perf_stat_[static_cast(type)].total_bytes = args_total_bytes; - data_thread_perf_stat_[static_cast(type)].elapsed_time = elapsed_time; + void RefreshDataThreadPerf(const ThreadType type, const TimePoint &start, + const TimePoint &end, const uint64_t args_total_bytes) { + auto *perf_stat = &data_thread_perf_stat_[static_cast(type)]; + perf_stat->data_record_index++; + uint32_t index = perf_stat->data_record_index % kDataPerRecord; + perf_stat->data_perf_record[index].data_index = perf_stat->data_record_index; + auto elapsed_time = std::chrono::duration(end - start).count(); + perf_stat->data_perf_record[index].elapsed_time = elapsed_time; + perf_stat->data_perf_record[index].start_time = + std::chrono::duration_cast(start.time_since_epoch()).count(); + perf_stat->data_perf_record[index].end_time = + std::chrono::duration_cast(end.time_since_epoch()).count(); + perf_stat->data_perf_record[index].total_bytes = args_total_bytes; + perf_stat->data_perf_record[index].buffer_size = buffer_.size(); + if (elapsed_time >= perf_stat->data_per_record_max.elapsed_time) { + perf_stat->data_per_record_max = perf_stat->data_perf_record[index]; } + + ADP_LOG(INFO) << "DataThreadPerf: " << static_cast(type) + << "(0:revc, 1:send), data_index: " << perf_stat->data_record_index + << ", elapsed_time: " << elapsed_time + << " us, buffer_size: " << buffer_.size() + << ", total_bytes: " << args_total_bytes << " bytes"; } void GetDataThread(const std::shared_ptr &ctx) { @@ -519,9 +587,9 @@ class HostQueueDatasetOp : public DatasetOpKernel { return; } - auto start = std::chrono::steady_clock::now(); + auto start = std::chrono::system_clock::now(); buffer_element.status = input_impls_[1]->GetNext(ctx.get(), &args, &end_of_sequence); - auto end = std::chrono::steady_clock::now(); + auto end = std::chrono::system_clock::now(); if ((!buffer_element.status.ok()) || (buffer_element.status.ok() && end_of_sequence)) { HandleGetNextStatus(buffer_element.status, end_of_sequence); mutex_lock lck(mu_); @@ -558,8 +626,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { args_tensor_size += tensor.TotalBytes(); total_bytes_ += tensor.TotalBytes(); } - auto elapsed_time = std::chrono::duration(end - start).count(); - RefreshDataThreadPerf(ThreadType::RECV, elapsed_time, args_tensor_size); + RefreshDataThreadPerf(ThreadType::RECV, start, end, args_tensor_size); } if ((!is_string) && (from_npu_dataset != NPU_ALLOCATOR_NPU) && (dataset()->channel_type_ == ChannelType::ACL_QUEUE)) { @@ -649,13 +716,12 @@ class HostQueueDatasetOp : public DatasetOpKernel { } while (std::chrono::high_resolution_clock::now() < end); continue; } - auto start = std::chrono::steady_clock::now(); + auto start = std::chrono::system_clock::now(); status = SendTensorsByAcl(acl_handle_, data_type, args, is_need_resend); if (!status.ok()) { break; } if (!is_need_resend) { - auto end = std::chrono::steady_clock::now(); - auto elapsed_time = std::chrono::duration(end - start).count(); - RefreshDataThreadPerf(ThreadType::SEND, elapsed_time, args_total_bytes); + auto end = std::chrono::system_clock::now(); + RefreshDataThreadPerf(ThreadType::SEND, start, end, args_total_bytes); RecordMbufQueueBytes(is_hold_type, args_total_bytes); break; } @@ -1048,10 +1114,20 @@ class HostQueueDatasetOp : public DatasetOpKernel { acltdtChannelHandle *acl_handle_; uint32_t queue_id_; int active_thread_num_ = 0; + + using DataPerfRecord = struct { + uint64_t data_index; + uint64_t start_time; + uint64_t end_time; + double elapsed_time; + uint64_t total_bytes; + uint32_t buffer_size; + }; struct DataThreadPerf { - double elapsed_time = 0; - uint64_t total_bytes = 0; - } data_thread_perf_stat_[static_cast(ThreadType::BUTT)]; + DataPerfRecord data_per_record_max; + DataPerfRecord data_perf_record[kDataPerRecord]; + uint64_t data_record_index; + } data_thread_perf_stat_[static_cast(ThreadType::BUTT)] = {}; uint64_t mbuf_queue_bytes_[kStringTypeDepth] = { 0 }; uint64_t mbuf_queue_total_bytes_ = 0; size_t mbuf_queue_rear_ = 0;