diff --git a/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc b/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc index 3398a3c0fc9ee171b6eee837eb85387eebedcd9f..f299b2a8c8bbd5292f54fe276381303b901e0010 100644 --- a/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc +++ b/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc @@ -56,11 +56,14 @@ constexpr int64 kSleepUs = 10; const uint32_t kMaxValue = 128U; const size_t kMaxDepth = 128UL; const int32_t kSleepTime = 1; +const uint32_t kSleepDuration = 5000; const static int64_t kStringTypeDepth = 64LL; const int64_t kUnknownShapeDepth = 3LL; const uint64_t PARALLEL_MEMORY_TRHESHOLD = 10 * 1024 * 1024ULL; const uint32_t MAX_THREAD_NUM = 4U; std::atomic tdt_release(false); +std::atomic is_hold_type(false); + // total memory usage controlled below 2G const uint64_t kTotalBytes = 8 * 1024 * 1024 * 1024LL; const int64_t kMaxBytes = 2 * 1024 * 1024 * 1024LL; @@ -192,6 +195,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { for (size_t i = 0UL; i < output_shape_size; i++) { DataType tensor_data_type = output_types_.at(i); if (tensor_data_type == DT_STRING) { + is_hold_type.store(true); ADP_LOG(INFO) << "Current tensor type is DT_STRING."; return kStringTypeDepth; } @@ -280,7 +284,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { ~Iterator() override { ADP_LOG(EVENT) << "DataThreadPerf[" << dataset()->device_id_ << "]::channel_name:" << dataset()->channel_name_ << - "[" << buffer_.size() << "], recv [" << + "[" << buffer_.size() << "," << IsHoldDataTrans() << "], recv [" << data_thread_perf_stat_[static_cast(ThreadType::RECV)].elapsed_time << "us, " << data_thread_perf_stat_[static_cast(ThreadType::RECV)].total_bytes << "], send [" << data_thread_perf_stat_[static_cast(ThreadType::SEND)].elapsed_time << "us, " << @@ -305,7 +309,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { } cond_var_.notify_all(); - while (active_thread_num) { + while (active_thread_num_) { cond_var_.notify_all(); (void)usleep(kSleepUs); } @@ -317,7 +321,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { } void DestroyQueue() { - ADP_LOG(INFO) << "Start to destroy queue"; + ADP_LOG(INFO) << "Start to destroy queue."; if (dataset()->channel_type_ == ChannelType::ACL_QUEUE) { aclError acl_status = acltdtDestroyChannel(acl_handle_); if (acl_status != ACL_ERROR_NONE) { @@ -442,13 +446,9 @@ class HostQueueDatasetOp : public DatasetOpKernel { } void RefreshDataThreadPerf(const ThreadType type, const double elapsed_time, - const std::vector &args) { + const uint64_t args_total_bytes) { if (elapsed_time > data_thread_perf_stat_[static_cast(type)].elapsed_time) { - uint64_t total_bytes = 0; - for (auto &tensor : args) { - total_bytes += tensor.TotalBytes(); - } - data_thread_perf_stat_[static_cast(type)].total_bytes = total_bytes; + data_thread_perf_stat_[static_cast(type)].total_bytes = args_total_bytes; data_thread_perf_stat_[static_cast(type)].elapsed_time = elapsed_time; } } @@ -456,7 +456,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { void GetDataThread(const std::shared_ptr &ctx) { { mutex_lock lck(mu_); - active_thread_num++; + active_thread_num_++; } RecordStart(ctx.get()); @@ -464,7 +464,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { RecordStop(ctx.get()); { mutex_lock lck(mu_); - active_thread_num--; + active_thread_num_--; } }); enum NPU_ALLOCATOR_CHECK { @@ -495,7 +495,6 @@ class HostQueueDatasetOp : public DatasetOpKernel { BufferElement buffer_element; if (dataset()->local_rank_id_ > 0) { ADP_LOG(INFO) << "Do not need to GetNext."; - get_thread_exception_.store(true); return; } @@ -515,11 +514,8 @@ class HostQueueDatasetOp : public DatasetOpKernel { buffer_element.host_thread_finished = true; buffer_.push_back(std::move(buffer_element)); cond_var_.notify_all(); - get_thread_exception_.store(true); return; } - auto elapsed_time = std::chrono::duration(end - start).count(); - RefreshDataThreadPerf(ThreadType::RECV, elapsed_time, args); uint64_t args_tensor_size = 0ULL; if (from_npu_dataset == NPU_ALLOCATOR_UNKNOW) { if (args.empty()) { @@ -543,17 +539,17 @@ class HostQueueDatasetOp : public DatasetOpKernel { buffer_element.host_thread_finished = true; buffer_.push_back(std::move(buffer_element)); cond_var_.notify_all(); - get_thread_exception_.store(true); return; } args_tensor_size += tensor.TotalBytes(); total_bytes_ += tensor.TotalBytes(); } + auto elapsed_time = std::chrono::duration(end - start).count(); + RefreshDataThreadPerf(ThreadType::RECV, elapsed_time, args_tensor_size); } if ((!is_string) && (from_npu_dataset != NPU_ALLOCATOR_NPU) && (dataset()->channel_type_ == ChannelType::ACL_QUEUE)) { if (!HandleMemory(args, buffer_element.value, args_tensor_size)) { - get_thread_exception_.store(true); return; } } else { @@ -570,13 +566,13 @@ class HostQueueDatasetOp : public DatasetOpKernel { void SendDataThread() { { mutex_lock lck(mu_); - active_thread_num++; + active_thread_num_++; } auto cleanup = gtl::MakeCleanup([this] { { mutex_lock lck(mu_); - active_thread_num--; + active_thread_num_--; } }); std::vector items; @@ -597,22 +593,59 @@ class HostQueueDatasetOp : public DatasetOpKernel { ADP_LOG(INFO) << "Slave SendDataThread exit."; } - Status SendDataByAclQueue(const vector &args, const acltdtTensorType &data_type) { - Status status; + void RecordMbufQueueBytes(const bool is_hold, const uint64_t args_total_bytes) { + if (!is_hold) { return; } + mbuf_queue_rear_ = (mbuf_queue_rear_ + 1) % kStringTypeDepth; + mbuf_queue_total_bytes_ = mbuf_queue_total_bytes_ - mbuf_queue_bytes_[mbuf_queue_rear_] + args_total_bytes; + mbuf_queue_bytes_[mbuf_queue_rear_] = args_total_bytes; + } + + bool IsHoldDataTrans() { + if (mbuf_queue_total_bytes_ < static_cast(kMaxBytes)) { return false; } + size_t mbuf_size; + aclError status = acltdtQueryChannelSize(acl_handle_, &mbuf_size); + if (status != ACL_SUCCESS) { + ADP_LOG(ERROR) << "Failed to get the mbuf size, status = " << status; + return false; + } + if (mbuf_size > kStringTypeDepth) { + ADP_LOG(ERROR) << "An exception occurs::size[" << mbuf_size << "vs" << kStringTypeDepth <<"]."; + return true; + } + if (mbuf_size <= 1) { return false; } + uint64_t mbuf_total_bytes = 0; + for (size_t i = 0; i < mbuf_size; i++) { + size_t index = (mbuf_queue_rear_ + kStringTypeDepth - i) % kStringTypeDepth; + mbuf_total_bytes += mbuf_queue_bytes_[index]; + } + return (mbuf_total_bytes >= static_cast(kMaxBytes)); + } + + Status SendDataByAclQueue(const vector &args, const acltdtTensorType &data_type, + const uint64_t args_total_bytes) { + Status status = Status::OK(); bool is_need_resend = false; - do { - { - mutex_lock lck(mu_); - if (finish_send_) { break; } + + while (!finish_send_) { + if (IsHoldDataTrans()) { + auto start = std::chrono::high_resolution_clock::now(); + auto end = start + std::chrono::microseconds(kSleepDuration); + do { + std::this_thread::yield(); + } while (std::chrono::high_resolution_clock::now() < end); + continue; } auto start = std::chrono::steady_clock::now(); status = SendTensorsByAcl(acl_handle_, data_type, args, is_need_resend); - auto end = std::chrono::steady_clock::now(); - if (status.ok() && !is_need_resend) { + if (!status.ok()) { break; } + if (!is_need_resend) { + auto end = std::chrono::steady_clock::now(); auto elapsed_time = std::chrono::duration(end - start).count(); - RefreshDataThreadPerf(ThreadType::SEND, elapsed_time, args); + RefreshDataThreadPerf(ThreadType::SEND, elapsed_time, args_total_bytes); + RecordMbufQueueBytes(is_hold_type, args_total_bytes); + break; } - } while (status.ok() && is_need_resend); + } return status; } @@ -668,18 +701,20 @@ class HostQueueDatasetOp : public DatasetOpKernel { ADP_LOG(INFO) << "Begin to send data to the NPU. "; rtError_t rt = rtSetDevice(dataset()->device_id_); if (rt != ACL_RT_SUCCESS) { + StopNotify(); ADP_LOG(ERROR) << "Thread rtSetDevice failed: device_id_ = " << dataset()->device_id_ << " rt=" << rt; + return; } { mutex_lock lck(mu_); - active_thread_num++; + active_thread_num_++; } auto cleanup = gtl::MakeCleanup([this] { { mutex_lock lck(mu_); - active_thread_num--; + active_thread_num_--; } rtError_t rt = rtDeviceReset(this->dataset()->device_id_); if (rt != RT_ERROR_NONE) { @@ -690,6 +725,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { while (true) { std::vector args; acltdtTensorType data_type = ACL_TENSOR_DATA_TENSOR; + uint64_t args_total_bytes = 0ULL; { mutex_lock lck(mu_); while ((!finish_send_) && buffer_.empty()) { @@ -710,15 +746,16 @@ class HostQueueDatasetOp : public DatasetOpKernel { args = buffer_.front().value; buffer_.pop_front(); for (auto &tensor : args) { - total_bytes_ -= tensor.TotalBytes(); + args_total_bytes += tensor.TotalBytes(); } + total_bytes_ -= args_total_bytes; } ADP_LOG(INFO) << "Host queue " << dataset()->channel_name_ - << ", buffer_size: " << buffer_.size() << ", data_type:" << data_type; + << ", buffer_size: " << buffer_.size() << ", data_type: " << data_type; } Status status; if (dataset()->channel_type_ == ChannelType::ACL_QUEUE) { - status = SendDataByAclQueue(args, data_type); + status = SendDataByAclQueue(args, data_type, args_total_bytes); } else { status = SendDataByHostQueue(args, data_type); } @@ -742,13 +779,13 @@ class HostQueueDatasetOp : public DatasetOpKernel { void SendDataThread(const std::shared_ptr &ctx) { { mutex_lock lck(mu_); - active_thread_num++; + active_thread_num_++; } auto cleanup = gtl::MakeCleanup([this] { { mutex_lock lck(mu_); - active_thread_num--; + active_thread_num_--; } }); vector args; @@ -984,7 +1021,6 @@ class HostQueueDatasetOp : public DatasetOpKernel { condition_variable event_finish_var_; bool cancelled_ GUARDED_BY(mu_) = true; bool finish_send_ GUARDED_BY(mu_) = false; - std::atomic get_thread_exception_ { false }; uint64_t total_bytes_ GUARDED_BY(mu_) = 0; // The following two thread must be the first member to be destructed, // because tensorflow::Thread does not provide an explicit join function. @@ -996,11 +1032,14 @@ class HostQueueDatasetOp : public DatasetOpKernel { DataItemDeliver *data_deliver_; acltdtChannelHandle *acl_handle_; uint32_t queue_id_; - int active_thread_num = 0; + int active_thread_num_ = 0; struct DataThreadPerf { double elapsed_time = 0; uint64_t total_bytes = 0; } data_thread_perf_stat_[static_cast(ThreadType::BUTT)]; + uint64_t mbuf_queue_bytes_[kStringTypeDepth] = { 0 }; + uint64_t mbuf_queue_total_bytes_ = 0; + size_t mbuf_queue_rear_ = 0; }; const std::vector inputs_; std::string channel_name_; diff --git a/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.cc b/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.cc index 52109405e46bad63ac6784e42f195a288f2ca69d..927197ccd526bafa83ea54dcea7f13cddfd29018 100644 --- a/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.cc +++ b/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.cc @@ -342,6 +342,7 @@ aclError aclrtDestroyEvent(aclrtEvent event) { } std::string g_SocVersionStub = "Ascend910B"; +uint64_t g_MbufSize = 0; const char *aclrtGetSocName() { return g_SocVersionStub.c_str(); @@ -354,6 +355,20 @@ void aclrtSetSocNameStub(std::string socVersion) { void aclrtSetDefaultSocNameStub() { g_SocVersionStub = "Ascend910B"; } + +void setMbufSize(uint64_t value) { + g_MbufSize = value; +} + +void setDefaultMbufSize() { + g_MbufSize = 0; +} + +aclError acltdtQueryChannelSize(const acltdtChannelHandle *handle, size_t *size) { + *size = g_MbufSize; + return ACL_SUCCESS; +} + // for GE RunGraph api #if 0 aclError aclrtSynchronizeStream(aclrtStream stream) { diff --git a/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.h b/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.h index 6b60a118f38b74cbaa1c9101f9fee01cae28cd22..7e41334ad845ee63adb5265da8ac65e606ade3e5 100644 --- a/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.h +++ b/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.h @@ -27,6 +27,8 @@ void aclrtSetSocNameStub(std::string socVersion); void aclrtSetDefaultSocNameStub(); +void setMbufSize(uint64_t value); +void setDefaultMbufSize(); extern uint32_t g_tensor_desc_size; void SetTensorDescSize(uint32_t val); diff --git a/tf_adapter/tests/st/kernels/testcase/dataset/host_queue_dats_set_st.cc b/tf_adapter/tests/st/kernels/testcase/dataset/host_queue_dats_set_st.cc index ec94fffab01593deb14fa3f27458328828b66d0f..29beb180bb8d18fe7f265f98c7c8fd5e70cb548d 100644 --- a/tf_adapter/tests/st/kernels/testcase/dataset/host_queue_dats_set_st.cc +++ b/tf_adapter/tests/st/kernels/testcase/dataset/host_queue_dats_set_st.cc @@ -5,6 +5,7 @@ #include "tensorflow/core/graph/graph_def_builder.h" #include "tensorflow/core/kernels/data/dataset_test_base.h" #include "tf_adapter/util/npu_attrs.h" +#include "ascendcl_stub.h" class HostQueueDatasetOp; namespace tensorflow { @@ -64,6 +65,17 @@ struct TestCase { std::vector expected_output_shapes; }; +TestCase NormalizeTestStringCase() { + return { + // input_tensors expected_outputs expected_output_dtypes + // expected_output_shapes + {CreateTensor(TensorShape{10, 1}, {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"})}, + {CreateTensor(TensorShape{1}, {"0"})}, + {DT_STRING}, + {PartialTensorShape({1})}, + }; +} + TestCase NormalizeTestCase() { return { // input_tensors expected_outputs expected_output_dtypes @@ -487,6 +499,181 @@ TEST_F(HostQueueDatasetOpTest, iterator_getnext05_host_queue) { &end_of_sequence)); *const_cast(&kIsHeterogeneous) = false; } + +TEST_F(HostQueueDatasetOpTest, isholddatatrans1) { + NpuAttrs::SetNewDataTransferFlag(true); + int thread_num = 2, cpu_num = 2; + TF_ASSERT_OK(InitThreadPool(thread_num)); + TF_ASSERT_OK(InitFunctionLibraryRuntime({}, cpu_num)); + + const TestCase &test_case = NormalizeTestStringCase(); + Tensor tensor_slice_dataset_tensor(DT_VARIANT, TensorShape({})); + std::vector inputs_for_tensor_slice_dataset = test_case.input_tensors; + TF_ASSERT_OK(CreateTensorSliceDatasetTensorForQueue(&inputs_for_tensor_slice_dataset, + &tensor_slice_dataset_tensor)); + + gtl::InlinedVector inputs_for_host_queue_dataset( + {TensorValue(&tensor_slice_dataset_tensor), + TensorValue(&tensor_slice_dataset_tensor)}); + + std::unique_ptr host_queue_dataset_kernel; + TF_ASSERT_OK(CreateHostQueueDatasetKernel(test_case.expected_output_dtypes, + test_case.expected_output_shapes, + &host_queue_dataset_kernel, "-1")); + std::unique_ptr host_queue_dataset_context; + TF_ASSERT_OK(CreateHostQueueDatasetContext(host_queue_dataset_kernel.get(), + &inputs_for_host_queue_dataset, + &host_queue_dataset_context)); + DatasetBase *host_queue_dataset; + TF_ASSERT_OK(CreateDataset(host_queue_dataset_kernel.get(), + host_queue_dataset_context.get(), + &host_queue_dataset)); + core::ScopedUnref scoped_unref(host_queue_dataset); + + EXPECT_EQ(host_queue_dataset->node_name(), kNodeName); + + host_queue_dataset->output_dtypes(); + host_queue_dataset->output_shapes(); + host_queue_dataset->DebugString(); + + SerializationContext context(SerializationContext::Params{}); + GraphDefBuilder b; + DatasetBase::DatasetGraphDefBuilder db(&b); + Node *output; + host_queue_dataset->AsGraphDefInternal(&context, &db, &output); + + std::unique_ptr iterator_context; + TF_ASSERT_OK(CreateIteratorContext(host_queue_dataset_context.get(), + &iterator_context)); + std::unique_ptr iterator; + TF_ASSERT_OK(host_queue_dataset->MakeIterator(iterator_context.get(), + "Iterator", &iterator)); + + bool end_of_sequence = false; + std::vector out_tensors; + sleep(5); + setMbufSize(2); + TF_EXPECT_OK(iterator->GetNext(iterator_context.get(), &out_tensors, + &end_of_sequence)); + setDefaultMbufSize(); +} + +TEST_F(HostQueueDatasetOpTest, isholddatatrans2) { + NpuAttrs::SetNewDataTransferFlag(true); + int thread_num = 2, cpu_num = 2; + TF_ASSERT_OK(InitThreadPool(thread_num)); + TF_ASSERT_OK(InitFunctionLibraryRuntime({}, cpu_num)); + + const TestCase &test_case = NormalizeTestStringCase(); + Tensor tensor_slice_dataset_tensor(DT_VARIANT, TensorShape({})); + std::vector inputs_for_tensor_slice_dataset = test_case.input_tensors; + TF_ASSERT_OK(CreateTensorSliceDatasetTensorForQueue(&inputs_for_tensor_slice_dataset, + &tensor_slice_dataset_tensor)); + + gtl::InlinedVector inputs_for_host_queue_dataset( + {TensorValue(&tensor_slice_dataset_tensor), + TensorValue(&tensor_slice_dataset_tensor)}); + + std::unique_ptr host_queue_dataset_kernel; + TF_ASSERT_OK(CreateHostQueueDatasetKernel(test_case.expected_output_dtypes, + test_case.expected_output_shapes, + &host_queue_dataset_kernel, "-1")); + std::unique_ptr host_queue_dataset_context; + TF_ASSERT_OK(CreateHostQueueDatasetContext(host_queue_dataset_kernel.get(), + &inputs_for_host_queue_dataset, + &host_queue_dataset_context)); + DatasetBase *host_queue_dataset; + TF_ASSERT_OK(CreateDataset(host_queue_dataset_kernel.get(), + host_queue_dataset_context.get(), + &host_queue_dataset)); + core::ScopedUnref scoped_unref(host_queue_dataset); + + EXPECT_EQ(host_queue_dataset->node_name(), kNodeName); + + host_queue_dataset->output_dtypes(); + host_queue_dataset->output_shapes(); + host_queue_dataset->DebugString(); + + SerializationContext context(SerializationContext::Params{}); + GraphDefBuilder b; + DatasetBase::DatasetGraphDefBuilder db(&b); + Node *output; + host_queue_dataset->AsGraphDefInternal(&context, &db, &output); + + std::unique_ptr iterator_context; + TF_ASSERT_OK(CreateIteratorContext(host_queue_dataset_context.get(), + &iterator_context)); + std::unique_ptr iterator; + TF_ASSERT_OK(host_queue_dataset->MakeIterator(iterator_context.get(), + "Iterator", &iterator)); + + bool end_of_sequence = false; + std::vector out_tensors; + sleep(5); + setMbufSize(3); + TF_EXPECT_OK(iterator->GetNext(iterator_context.get(), &out_tensors, + &end_of_sequence)); + setDefaultMbufSize(); +} + +TEST_F(HostQueueDatasetOpTest, isholddatatrans3) { + NpuAttrs::SetNewDataTransferFlag(true); + int thread_num = 2, cpu_num = 2; + TF_ASSERT_OK(InitThreadPool(thread_num)); + TF_ASSERT_OK(InitFunctionLibraryRuntime({}, cpu_num)); + + const TestCase &test_case = NormalizeTestStringCase(); + Tensor tensor_slice_dataset_tensor(DT_VARIANT, TensorShape({})); + std::vector inputs_for_tensor_slice_dataset = test_case.input_tensors; + TF_ASSERT_OK(CreateTensorSliceDatasetTensorForQueue(&inputs_for_tensor_slice_dataset, + &tensor_slice_dataset_tensor)); + + gtl::InlinedVector inputs_for_host_queue_dataset( + {TensorValue(&tensor_slice_dataset_tensor), + TensorValue(&tensor_slice_dataset_tensor)}); + + std::unique_ptr host_queue_dataset_kernel; + TF_ASSERT_OK(CreateHostQueueDatasetKernel(test_case.expected_output_dtypes, + test_case.expected_output_shapes, + &host_queue_dataset_kernel, "-1")); + std::unique_ptr host_queue_dataset_context; + TF_ASSERT_OK(CreateHostQueueDatasetContext(host_queue_dataset_kernel.get(), + &inputs_for_host_queue_dataset, + &host_queue_dataset_context)); + DatasetBase *host_queue_dataset; + TF_ASSERT_OK(CreateDataset(host_queue_dataset_kernel.get(), + host_queue_dataset_context.get(), + &host_queue_dataset)); + core::ScopedUnref scoped_unref(host_queue_dataset); + + EXPECT_EQ(host_queue_dataset->node_name(), kNodeName); + + host_queue_dataset->output_dtypes(); + host_queue_dataset->output_shapes(); + host_queue_dataset->DebugString(); + + SerializationContext context(SerializationContext::Params{}); + GraphDefBuilder b; + DatasetBase::DatasetGraphDefBuilder db(&b); + Node *output; + host_queue_dataset->AsGraphDefInternal(&context, &db, &output); + + std::unique_ptr iterator_context; + TF_ASSERT_OK(CreateIteratorContext(host_queue_dataset_context.get(), + &iterator_context)); + std::unique_ptr iterator; + TF_ASSERT_OK(host_queue_dataset->MakeIterator(iterator_context.get(), + "Iterator", &iterator)); + + bool end_of_sequence = false; + std::vector out_tensors; + sleep(5); + setMbufSize(65); + TF_EXPECT_OK(iterator->GetNext(iterator_context.get(), &out_tensors, + &end_of_sequence)); + setDefaultMbufSize(); +} + } // namespace } // namespace data } // namespace tensorflow diff --git a/tf_adapter/tests/ut/kernels/testcase/dataset/host_queue_dats_set_ut.cc b/tf_adapter/tests/ut/kernels/testcase/dataset/host_queue_dats_set_ut.cc index 027ee442c7f4f88db85b269fef833c9e9249afdf..b37e5cbb0e90c72e2ed538b008216437c4e1d8a0 100644 --- a/tf_adapter/tests/ut/kernels/testcase/dataset/host_queue_dats_set_ut.cc +++ b/tf_adapter/tests/ut/kernels/testcase/dataset/host_queue_dats_set_ut.cc @@ -7,6 +7,8 @@ #include "tensorflow/core/kernels/data/dataset_test_base.h" #include "tf_adapter/util/acl_channel.h" #include "tf_adapter/util/npu_attrs.h" +#include "ascendcl_stub.h" + class HostQueueDatasetOp; namespace tensorflow { namespace data { @@ -65,6 +67,17 @@ struct TestCase { std::vector expected_output_shapes; }; +TestCase NormalizeTestStringCase() { + return { + // input_tensors expected_outputs expected_output_dtypes + // expected_output_shapes + {CreateTensor(TensorShape{10, 1}, {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"})}, + {CreateTensor(TensorShape{1}, {"0"})}, + {DT_STRING}, + {PartialTensorShape({1})}, + }; +} + TestCase NormalizeTestCase() { return { // input_tensors expected_outputs expected_output_dtypes @@ -629,6 +642,181 @@ TEST_F(HostQueueDatasetOpTest, iterator_getnext10) { EXPECT_TRUE(StopRecvTensorByAcl(&acl_handle_, "test").ok()); } +TEST_F(HostQueueDatasetOpTest, isholddatatrans1) { + NpuAttrs::SetNewDataTransferFlag(true); + int thread_num = 2, cpu_num = 2; + TF_ASSERT_OK(InitThreadPool(thread_num)); + TF_ASSERT_OK(InitFunctionLibraryRuntime({}, cpu_num)); + + const TestCase &test_case = NormalizeTestStringCase(); + Tensor tensor_slice_dataset_tensor(DT_VARIANT, TensorShape({})); + std::vector inputs_for_tensor_slice_dataset = test_case.input_tensors; + TF_ASSERT_OK(CreateTensorSliceDatasetTensorForQueue(&inputs_for_tensor_slice_dataset, + &tensor_slice_dataset_tensor)); + + gtl::InlinedVector inputs_for_host_queue_dataset( + {TensorValue(&tensor_slice_dataset_tensor), + TensorValue(&tensor_slice_dataset_tensor)}); + + std::unique_ptr host_queue_dataset_kernel; + TF_ASSERT_OK(CreateHostQueueDatasetKernel(test_case.expected_output_dtypes, + test_case.expected_output_shapes, + &host_queue_dataset_kernel, "-1")); + std::unique_ptr host_queue_dataset_context; + TF_ASSERT_OK(CreateHostQueueDatasetContext(host_queue_dataset_kernel.get(), + &inputs_for_host_queue_dataset, + &host_queue_dataset_context)); + DatasetBase *host_queue_dataset; + TF_ASSERT_OK(CreateDataset(host_queue_dataset_kernel.get(), + host_queue_dataset_context.get(), + &host_queue_dataset)); + core::ScopedUnref scoped_unref(host_queue_dataset); + + EXPECT_EQ(host_queue_dataset->node_name(), kNodeName); + + host_queue_dataset->output_dtypes(); + host_queue_dataset->output_shapes(); + host_queue_dataset->DebugString(); + + SerializationContext context(SerializationContext::Params{}); + GraphDefBuilder b; + DatasetBase::DatasetGraphDefBuilder db(&b); + Node *output; + host_queue_dataset->AsGraphDefInternal(&context, &db, &output); + + std::unique_ptr iterator_context; + TF_ASSERT_OK(CreateIteratorContext(host_queue_dataset_context.get(), + &iterator_context)); + std::unique_ptr iterator; + TF_ASSERT_OK(host_queue_dataset->MakeIterator(iterator_context.get(), + "Iterator", &iterator)); + + bool end_of_sequence = false; + std::vector out_tensors; + sleep(5); + setMbufSize(2); + TF_EXPECT_OK(iterator->GetNext(iterator_context.get(), &out_tensors, + &end_of_sequence)); + setDefaultMbufSize(); +} + +TEST_F(HostQueueDatasetOpTest, isholddatatrans2) { + NpuAttrs::SetNewDataTransferFlag(true); + int thread_num = 2, cpu_num = 2; + TF_ASSERT_OK(InitThreadPool(thread_num)); + TF_ASSERT_OK(InitFunctionLibraryRuntime({}, cpu_num)); + + const TestCase &test_case = NormalizeTestStringCase(); + Tensor tensor_slice_dataset_tensor(DT_VARIANT, TensorShape({})); + std::vector inputs_for_tensor_slice_dataset = test_case.input_tensors; + TF_ASSERT_OK(CreateTensorSliceDatasetTensorForQueue(&inputs_for_tensor_slice_dataset, + &tensor_slice_dataset_tensor)); + + gtl::InlinedVector inputs_for_host_queue_dataset( + {TensorValue(&tensor_slice_dataset_tensor), + TensorValue(&tensor_slice_dataset_tensor)}); + + std::unique_ptr host_queue_dataset_kernel; + TF_ASSERT_OK(CreateHostQueueDatasetKernel(test_case.expected_output_dtypes, + test_case.expected_output_shapes, + &host_queue_dataset_kernel, "-1")); + std::unique_ptr host_queue_dataset_context; + TF_ASSERT_OK(CreateHostQueueDatasetContext(host_queue_dataset_kernel.get(), + &inputs_for_host_queue_dataset, + &host_queue_dataset_context)); + DatasetBase *host_queue_dataset; + TF_ASSERT_OK(CreateDataset(host_queue_dataset_kernel.get(), + host_queue_dataset_context.get(), + &host_queue_dataset)); + core::ScopedUnref scoped_unref(host_queue_dataset); + + EXPECT_EQ(host_queue_dataset->node_name(), kNodeName); + + host_queue_dataset->output_dtypes(); + host_queue_dataset->output_shapes(); + host_queue_dataset->DebugString(); + + SerializationContext context(SerializationContext::Params{}); + GraphDefBuilder b; + DatasetBase::DatasetGraphDefBuilder db(&b); + Node *output; + host_queue_dataset->AsGraphDefInternal(&context, &db, &output); + + std::unique_ptr iterator_context; + TF_ASSERT_OK(CreateIteratorContext(host_queue_dataset_context.get(), + &iterator_context)); + setMbufSize(3); + std::unique_ptr iterator; + TF_ASSERT_OK(host_queue_dataset->MakeIterator(iterator_context.get(), + "Iterator", &iterator)); + + bool end_of_sequence = false; + std::vector out_tensors; + sleep(5); + TF_EXPECT_OK(iterator->GetNext(iterator_context.get(), &out_tensors, + &end_of_sequence)); + setDefaultMbufSize(); +} + +TEST_F(HostQueueDatasetOpTest, isholddatatrans3) { + NpuAttrs::SetNewDataTransferFlag(true); + int thread_num = 2, cpu_num = 2; + TF_ASSERT_OK(InitThreadPool(thread_num)); + TF_ASSERT_OK(InitFunctionLibraryRuntime({}, cpu_num)); + + const TestCase &test_case = NormalizeTestStringCase(); + Tensor tensor_slice_dataset_tensor(DT_VARIANT, TensorShape({})); + std::vector inputs_for_tensor_slice_dataset = test_case.input_tensors; + TF_ASSERT_OK(CreateTensorSliceDatasetTensorForQueue(&inputs_for_tensor_slice_dataset, + &tensor_slice_dataset_tensor)); + + gtl::InlinedVector inputs_for_host_queue_dataset( + {TensorValue(&tensor_slice_dataset_tensor), + TensorValue(&tensor_slice_dataset_tensor)}); + + std::unique_ptr host_queue_dataset_kernel; + TF_ASSERT_OK(CreateHostQueueDatasetKernel(test_case.expected_output_dtypes, + test_case.expected_output_shapes, + &host_queue_dataset_kernel, "-1")); + std::unique_ptr host_queue_dataset_context; + TF_ASSERT_OK(CreateHostQueueDatasetContext(host_queue_dataset_kernel.get(), + &inputs_for_host_queue_dataset, + &host_queue_dataset_context)); + DatasetBase *host_queue_dataset; + TF_ASSERT_OK(CreateDataset(host_queue_dataset_kernel.get(), + host_queue_dataset_context.get(), + &host_queue_dataset)); + core::ScopedUnref scoped_unref(host_queue_dataset); + + EXPECT_EQ(host_queue_dataset->node_name(), kNodeName); + + host_queue_dataset->output_dtypes(); + host_queue_dataset->output_shapes(); + host_queue_dataset->DebugString(); + + SerializationContext context(SerializationContext::Params{}); + GraphDefBuilder b; + DatasetBase::DatasetGraphDefBuilder db(&b); + Node *output; + host_queue_dataset->AsGraphDefInternal(&context, &db, &output); + + std::unique_ptr iterator_context; + TF_ASSERT_OK(CreateIteratorContext(host_queue_dataset_context.get(), + &iterator_context)); + + setMbufSize(65); + std::unique_ptr iterator; + TF_ASSERT_OK(host_queue_dataset->MakeIterator(iterator_context.get(), + "Iterator", &iterator)); + + bool end_of_sequence = false; + std::vector out_tensors; + sleep(5); + TF_EXPECT_OK(iterator->GetNext(iterator_context.get(), &out_tensors, + &end_of_sequence)); + setDefaultMbufSize(); +} + } // namespace } // namespace data } // namespace tensorflow