diff --git a/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc b/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc index ba92127b880c2d22e38d8b3c7ad4d305585a5560..505e5c557e522bb94e0caa5c3849ddf3c4baa288 100644 --- a/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc +++ b/tf_adapter/kernels/aicpu/host_queue_dataset_op.cc @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include #include #include #include @@ -61,6 +62,8 @@ const int64_t kUnknownShapeDepth = 3LL; const uint64_t PARALLEL_MEMORY_TRHESHOLD = 10 * 1024 * 1024ULL; const uint32_t MAX_THREAD_NUM = 4U; std::atomic tdt_release(false); +std::atomic is_hold_type(false); + // total memory usage controlled below 2G const uint64_t kTotalBytes = 8 * 1024 * 1024 * 1024LL; const int64_t kMaxBytes = 2 * 1024 * 1024 * 1024LL; @@ -192,6 +195,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { for (size_t i = 0UL; i < output_shape_size; i++) { DataType tensor_data_type = output_types_.at(i); if (tensor_data_type == DT_STRING) { + is_hold_type.store(true); ADP_LOG(INFO) << "Current tensor type is DT_STRING."; return kStringTypeDepth; } @@ -441,13 +445,9 @@ class HostQueueDatasetOp : public DatasetOpKernel { } void RefreshDataThreadPerf(const ThreadType type, const double elapsed_time, - const std::vector &args) { + const uint64_t args_total_bytes) { if (elapsed_time > data_thread_perf_stat_[static_cast(type)].elapsed_time) { - uint64_t total_bytes = 0; - for (auto &tensor : args) { - total_bytes += tensor.TotalBytes(); - } - data_thread_perf_stat_[static_cast(type)].total_bytes = total_bytes; + data_thread_perf_stat_[static_cast(type)].total_bytes = args_total_bytes; data_thread_perf_stat_[static_cast(type)].elapsed_time = elapsed_time; } } @@ -517,8 +517,6 @@ class HostQueueDatasetOp : public DatasetOpKernel { get_thread_exception_.store(true); return; } - auto elapsed_time = std::chrono::duration(end - start).count(); - RefreshDataThreadPerf(ThreadType::RECV, elapsed_time, args); uint64_t args_tensor_size = 0ULL; if (from_npu_dataset == NPU_ALLOCATOR_UNKNOW) { if (args.empty()) { @@ -548,6 +546,8 @@ class HostQueueDatasetOp : public DatasetOpKernel { args_tensor_size += tensor.TotalBytes(); total_bytes_ += tensor.TotalBytes(); } + auto elapsed_time = std::chrono::duration(end - start).count(); + RefreshDataThreadPerf(ThreadType::RECV, elapsed_time, args_tensor_size); } if ((!is_string) && (from_npu_dataset != NPU_ALLOCATOR_NPU) && (dataset()->channel_type_ == ChannelType::ACL_QUEUE)) { @@ -596,22 +596,50 @@ class HostQueueDatasetOp : public DatasetOpKernel { ADP_LOG(INFO) << "Slave SendDataThread exit."; } - Status SendDataByAclQueue(const vector &args, const acltdtTensorType &data_type) { - Status status; + void RecordMbufQueueBytes(const bool is_hold_type, const uint64_t args_total_bytes) { + if (!is_hold_type) { return; } + mbuf_queue_rear_ = (mbuf_queue_rear_ + 1) % kStringTypeDepth; + mbuf_queue_bytes_[mbuf_queue_rear_] = args_total_bytes; + } + + bool IsHoldDataTrans() { + if (!is_hold_type) { return false; } + size_t mbuf_size; + aclError status = acltdtQueryChannelSize(acl_handle_, &mbuf_size); + if ((status != ACL_SUCCESS) || (mbuf_size > kStringTypeDepth)) { + ADP_LOG(ERROR) << "Failed to get the mbuf size, status = " << status << "mbuf_size = " << mbuf_size; + return false; + } + if (mbuf_size <= 1) { return false; } + uint64_t mbuf_total_bytes = 0; + for (size_t i = 0; i < mbuf_size; i++) { + size_t index = (mbuf_queue_rear_ + kStringTypeDepth - i) % kStringTypeDepth; + mbuf_total_bytes += mbuf_queue_bytes_[index]; + } + return (mbuf_total_bytes >= static_cast(kMaxBytes)); + } + + Status SendDataByAclQueue(const vector &args, const acltdtTensorType &data_type, + const uint64_t args_total_bytes) { + Status status = Status::OK(); bool is_need_resend = false; - do { - { - mutex_lock lck(mu_); - if (finish_send_) { break; } + + while(!finish_send_) { + if (IsHoldDataTrans()) { + sched_yield(); + continue; } auto start = std::chrono::steady_clock::now(); status = SendTensorsByAcl(acl_handle_, data_type, args, is_need_resend); - auto end = std::chrono::steady_clock::now(); - if (status.ok() && !is_need_resend) { + if (!status.ok()) { break; } + if (!is_need_resend) { + auto end = std::chrono::steady_clock::now(); auto elapsed_time = std::chrono::duration(end - start).count(); - RefreshDataThreadPerf(ThreadType::SEND, elapsed_time, args); + RefreshDataThreadPerf(ThreadType::SEND, elapsed_time, args_total_bytes); + RecordMbufQueueBytes(is_hold_type, args_total_bytes); + break; } - } while (status.ok() && is_need_resend); + } return status; } @@ -689,6 +717,7 @@ class HostQueueDatasetOp : public DatasetOpKernel { while (true) { std::vector args; acltdtTensorType data_type = ACL_TENSOR_DATA_TENSOR; + uint64_t args_total_bytes = 0ULL; { mutex_lock lck(mu_); while ((!finish_send_) && buffer_.empty()) { @@ -709,15 +738,16 @@ class HostQueueDatasetOp : public DatasetOpKernel { args = buffer_.front().value; buffer_.pop_front(); for (auto &tensor : args) { - total_bytes_ -= tensor.TotalBytes(); + args_total_bytes += tensor.TotalBytes(); } + total_bytes_ -= args_total_bytes; } ADP_LOG(INFO) << "Host queue " << dataset()->channel_name_ << ", buffer_size: " << buffer_.size() << ", data_type: " << data_type; } Status status; if (dataset()->channel_type_ == ChannelType::ACL_QUEUE) { - status = SendDataByAclQueue(args, data_type); + status = SendDataByAclQueue(args, data_type, args_total_bytes); } else { status = SendDataByHostQueue(args, data_type); } @@ -1000,6 +1030,8 @@ class HostQueueDatasetOp : public DatasetOpKernel { double elapsed_time = 0; uint64_t total_bytes = 0; } data_thread_perf_stat_[static_cast(ThreadType::BUTT)]; + uint64_t mbuf_queue_bytes_[kStringTypeDepth]; + size_t mbuf_queue_rear_ = 0; }; const std::vector inputs_; std::string channel_name_; diff --git a/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.cc b/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.cc index d162842494e7b3e913714030068ddc8d02f88603..a56b38bbcce85b4e8e4e7e562bdf4fb5e898f710 100644 --- a/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.cc +++ b/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.cc @@ -342,6 +342,7 @@ aclError aclrtDestroyEvent(aclrtEvent event) { } std::string g_SocVersionStub = "Ascend910B"; +uint64_t g_MbufSize = 0; const char *aclrtGetSocName() { return g_SocVersionStub.c_str(); @@ -354,6 +355,20 @@ void aclrtSetSocNameStub(std::string socVersion) { void aclrtSetDefaultSocNameStub() { g_SocVersionStub = "Ascend910B"; } + +void setMbufSize(uint64_t value) { + g_MbufSize = value; +} + +void setDefaultMbufSize() { + g_MbufSize = 0; +} + +aclError acltdtQueryChannelSize(const acltdtChannelHandle *handle, size_t *size) { + *size = g_MbufSize; + return ACL_SUCCESS; +} + // for GE RunGraph api #if 0 aclError aclrtSynchronizeStream(aclrtStream stream) { diff --git a/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.h b/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.h index efb16421604d358b5148c1bd7e1fabd961948190..3ae697ab2a10e6e5c714ce5d426da3f75195a64b 100644 --- a/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.h +++ b/tf_adapter/tests/depends/ascendcl/src/ascendcl_stub.h @@ -27,6 +27,8 @@ void aclrtSetSocNameStub(std::string socVersion); void aclrtSetDefaultSocNameStub(); +void setMbufSize(uint64_t value); +void setDefaultMbufSize(); extern uint32_t g_tensor_desc_size; void SetTensorDescSize(uint32_t val); diff --git a/tf_adapter/tests/st/kernels/testcase/dataset/host_queue_dats_set_st.cc b/tf_adapter/tests/st/kernels/testcase/dataset/host_queue_dats_set_st.cc index ec94fffab01593deb14fa3f27458328828b66d0f..29beb180bb8d18fe7f265f98c7c8fd5e70cb548d 100644 --- a/tf_adapter/tests/st/kernels/testcase/dataset/host_queue_dats_set_st.cc +++ b/tf_adapter/tests/st/kernels/testcase/dataset/host_queue_dats_set_st.cc @@ -5,6 +5,7 @@ #include "tensorflow/core/graph/graph_def_builder.h" #include "tensorflow/core/kernels/data/dataset_test_base.h" #include "tf_adapter/util/npu_attrs.h" +#include "ascendcl_stub.h" class HostQueueDatasetOp; namespace tensorflow { @@ -64,6 +65,17 @@ struct TestCase { std::vector expected_output_shapes; }; +TestCase NormalizeTestStringCase() { + return { + // input_tensors expected_outputs expected_output_dtypes + // expected_output_shapes + {CreateTensor(TensorShape{10, 1}, {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"})}, + {CreateTensor(TensorShape{1}, {"0"})}, + {DT_STRING}, + {PartialTensorShape({1})}, + }; +} + TestCase NormalizeTestCase() { return { // input_tensors expected_outputs expected_output_dtypes @@ -487,6 +499,181 @@ TEST_F(HostQueueDatasetOpTest, iterator_getnext05_host_queue) { &end_of_sequence)); *const_cast(&kIsHeterogeneous) = false; } + +TEST_F(HostQueueDatasetOpTest, isholddatatrans1) { + NpuAttrs::SetNewDataTransferFlag(true); + int thread_num = 2, cpu_num = 2; + TF_ASSERT_OK(InitThreadPool(thread_num)); + TF_ASSERT_OK(InitFunctionLibraryRuntime({}, cpu_num)); + + const TestCase &test_case = NormalizeTestStringCase(); + Tensor tensor_slice_dataset_tensor(DT_VARIANT, TensorShape({})); + std::vector inputs_for_tensor_slice_dataset = test_case.input_tensors; + TF_ASSERT_OK(CreateTensorSliceDatasetTensorForQueue(&inputs_for_tensor_slice_dataset, + &tensor_slice_dataset_tensor)); + + gtl::InlinedVector inputs_for_host_queue_dataset( + {TensorValue(&tensor_slice_dataset_tensor), + TensorValue(&tensor_slice_dataset_tensor)}); + + std::unique_ptr host_queue_dataset_kernel; + TF_ASSERT_OK(CreateHostQueueDatasetKernel(test_case.expected_output_dtypes, + test_case.expected_output_shapes, + &host_queue_dataset_kernel, "-1")); + std::unique_ptr host_queue_dataset_context; + TF_ASSERT_OK(CreateHostQueueDatasetContext(host_queue_dataset_kernel.get(), + &inputs_for_host_queue_dataset, + &host_queue_dataset_context)); + DatasetBase *host_queue_dataset; + TF_ASSERT_OK(CreateDataset(host_queue_dataset_kernel.get(), + host_queue_dataset_context.get(), + &host_queue_dataset)); + core::ScopedUnref scoped_unref(host_queue_dataset); + + EXPECT_EQ(host_queue_dataset->node_name(), kNodeName); + + host_queue_dataset->output_dtypes(); + host_queue_dataset->output_shapes(); + host_queue_dataset->DebugString(); + + SerializationContext context(SerializationContext::Params{}); + GraphDefBuilder b; + DatasetBase::DatasetGraphDefBuilder db(&b); + Node *output; + host_queue_dataset->AsGraphDefInternal(&context, &db, &output); + + std::unique_ptr iterator_context; + TF_ASSERT_OK(CreateIteratorContext(host_queue_dataset_context.get(), + &iterator_context)); + std::unique_ptr iterator; + TF_ASSERT_OK(host_queue_dataset->MakeIterator(iterator_context.get(), + "Iterator", &iterator)); + + bool end_of_sequence = false; + std::vector out_tensors; + sleep(5); + setMbufSize(2); + TF_EXPECT_OK(iterator->GetNext(iterator_context.get(), &out_tensors, + &end_of_sequence)); + setDefaultMbufSize(); +} + +TEST_F(HostQueueDatasetOpTest, isholddatatrans2) { + NpuAttrs::SetNewDataTransferFlag(true); + int thread_num = 2, cpu_num = 2; + TF_ASSERT_OK(InitThreadPool(thread_num)); + TF_ASSERT_OK(InitFunctionLibraryRuntime({}, cpu_num)); + + const TestCase &test_case = NormalizeTestStringCase(); + Tensor tensor_slice_dataset_tensor(DT_VARIANT, TensorShape({})); + std::vector inputs_for_tensor_slice_dataset = test_case.input_tensors; + TF_ASSERT_OK(CreateTensorSliceDatasetTensorForQueue(&inputs_for_tensor_slice_dataset, + &tensor_slice_dataset_tensor)); + + gtl::InlinedVector inputs_for_host_queue_dataset( + {TensorValue(&tensor_slice_dataset_tensor), + TensorValue(&tensor_slice_dataset_tensor)}); + + std::unique_ptr host_queue_dataset_kernel; + TF_ASSERT_OK(CreateHostQueueDatasetKernel(test_case.expected_output_dtypes, + test_case.expected_output_shapes, + &host_queue_dataset_kernel, "-1")); + std::unique_ptr host_queue_dataset_context; + TF_ASSERT_OK(CreateHostQueueDatasetContext(host_queue_dataset_kernel.get(), + &inputs_for_host_queue_dataset, + &host_queue_dataset_context)); + DatasetBase *host_queue_dataset; + TF_ASSERT_OK(CreateDataset(host_queue_dataset_kernel.get(), + host_queue_dataset_context.get(), + &host_queue_dataset)); + core::ScopedUnref scoped_unref(host_queue_dataset); + + EXPECT_EQ(host_queue_dataset->node_name(), kNodeName); + + host_queue_dataset->output_dtypes(); + host_queue_dataset->output_shapes(); + host_queue_dataset->DebugString(); + + SerializationContext context(SerializationContext::Params{}); + GraphDefBuilder b; + DatasetBase::DatasetGraphDefBuilder db(&b); + Node *output; + host_queue_dataset->AsGraphDefInternal(&context, &db, &output); + + std::unique_ptr iterator_context; + TF_ASSERT_OK(CreateIteratorContext(host_queue_dataset_context.get(), + &iterator_context)); + std::unique_ptr iterator; + TF_ASSERT_OK(host_queue_dataset->MakeIterator(iterator_context.get(), + "Iterator", &iterator)); + + bool end_of_sequence = false; + std::vector out_tensors; + sleep(5); + setMbufSize(3); + TF_EXPECT_OK(iterator->GetNext(iterator_context.get(), &out_tensors, + &end_of_sequence)); + setDefaultMbufSize(); +} + +TEST_F(HostQueueDatasetOpTest, isholddatatrans3) { + NpuAttrs::SetNewDataTransferFlag(true); + int thread_num = 2, cpu_num = 2; + TF_ASSERT_OK(InitThreadPool(thread_num)); + TF_ASSERT_OK(InitFunctionLibraryRuntime({}, cpu_num)); + + const TestCase &test_case = NormalizeTestStringCase(); + Tensor tensor_slice_dataset_tensor(DT_VARIANT, TensorShape({})); + std::vector inputs_for_tensor_slice_dataset = test_case.input_tensors; + TF_ASSERT_OK(CreateTensorSliceDatasetTensorForQueue(&inputs_for_tensor_slice_dataset, + &tensor_slice_dataset_tensor)); + + gtl::InlinedVector inputs_for_host_queue_dataset( + {TensorValue(&tensor_slice_dataset_tensor), + TensorValue(&tensor_slice_dataset_tensor)}); + + std::unique_ptr host_queue_dataset_kernel; + TF_ASSERT_OK(CreateHostQueueDatasetKernel(test_case.expected_output_dtypes, + test_case.expected_output_shapes, + &host_queue_dataset_kernel, "-1")); + std::unique_ptr host_queue_dataset_context; + TF_ASSERT_OK(CreateHostQueueDatasetContext(host_queue_dataset_kernel.get(), + &inputs_for_host_queue_dataset, + &host_queue_dataset_context)); + DatasetBase *host_queue_dataset; + TF_ASSERT_OK(CreateDataset(host_queue_dataset_kernel.get(), + host_queue_dataset_context.get(), + &host_queue_dataset)); + core::ScopedUnref scoped_unref(host_queue_dataset); + + EXPECT_EQ(host_queue_dataset->node_name(), kNodeName); + + host_queue_dataset->output_dtypes(); + host_queue_dataset->output_shapes(); + host_queue_dataset->DebugString(); + + SerializationContext context(SerializationContext::Params{}); + GraphDefBuilder b; + DatasetBase::DatasetGraphDefBuilder db(&b); + Node *output; + host_queue_dataset->AsGraphDefInternal(&context, &db, &output); + + std::unique_ptr iterator_context; + TF_ASSERT_OK(CreateIteratorContext(host_queue_dataset_context.get(), + &iterator_context)); + std::unique_ptr iterator; + TF_ASSERT_OK(host_queue_dataset->MakeIterator(iterator_context.get(), + "Iterator", &iterator)); + + bool end_of_sequence = false; + std::vector out_tensors; + sleep(5); + setMbufSize(65); + TF_EXPECT_OK(iterator->GetNext(iterator_context.get(), &out_tensors, + &end_of_sequence)); + setDefaultMbufSize(); +} + } // namespace } // namespace data } // namespace tensorflow diff --git a/tf_adapter/tests/ut/kernels/testcase/dataset/host_queue_dats_set_ut.cc b/tf_adapter/tests/ut/kernels/testcase/dataset/host_queue_dats_set_ut.cc index 027ee442c7f4f88db85b269fef833c9e9249afdf..b37e5cbb0e90c72e2ed538b008216437c4e1d8a0 100644 --- a/tf_adapter/tests/ut/kernels/testcase/dataset/host_queue_dats_set_ut.cc +++ b/tf_adapter/tests/ut/kernels/testcase/dataset/host_queue_dats_set_ut.cc @@ -7,6 +7,8 @@ #include "tensorflow/core/kernels/data/dataset_test_base.h" #include "tf_adapter/util/acl_channel.h" #include "tf_adapter/util/npu_attrs.h" +#include "ascendcl_stub.h" + class HostQueueDatasetOp; namespace tensorflow { namespace data { @@ -65,6 +67,17 @@ struct TestCase { std::vector expected_output_shapes; }; +TestCase NormalizeTestStringCase() { + return { + // input_tensors expected_outputs expected_output_dtypes + // expected_output_shapes + {CreateTensor(TensorShape{10, 1}, {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"})}, + {CreateTensor(TensorShape{1}, {"0"})}, + {DT_STRING}, + {PartialTensorShape({1})}, + }; +} + TestCase NormalizeTestCase() { return { // input_tensors expected_outputs expected_output_dtypes @@ -629,6 +642,181 @@ TEST_F(HostQueueDatasetOpTest, iterator_getnext10) { EXPECT_TRUE(StopRecvTensorByAcl(&acl_handle_, "test").ok()); } +TEST_F(HostQueueDatasetOpTest, isholddatatrans1) { + NpuAttrs::SetNewDataTransferFlag(true); + int thread_num = 2, cpu_num = 2; + TF_ASSERT_OK(InitThreadPool(thread_num)); + TF_ASSERT_OK(InitFunctionLibraryRuntime({}, cpu_num)); + + const TestCase &test_case = NormalizeTestStringCase(); + Tensor tensor_slice_dataset_tensor(DT_VARIANT, TensorShape({})); + std::vector inputs_for_tensor_slice_dataset = test_case.input_tensors; + TF_ASSERT_OK(CreateTensorSliceDatasetTensorForQueue(&inputs_for_tensor_slice_dataset, + &tensor_slice_dataset_tensor)); + + gtl::InlinedVector inputs_for_host_queue_dataset( + {TensorValue(&tensor_slice_dataset_tensor), + TensorValue(&tensor_slice_dataset_tensor)}); + + std::unique_ptr host_queue_dataset_kernel; + TF_ASSERT_OK(CreateHostQueueDatasetKernel(test_case.expected_output_dtypes, + test_case.expected_output_shapes, + &host_queue_dataset_kernel, "-1")); + std::unique_ptr host_queue_dataset_context; + TF_ASSERT_OK(CreateHostQueueDatasetContext(host_queue_dataset_kernel.get(), + &inputs_for_host_queue_dataset, + &host_queue_dataset_context)); + DatasetBase *host_queue_dataset; + TF_ASSERT_OK(CreateDataset(host_queue_dataset_kernel.get(), + host_queue_dataset_context.get(), + &host_queue_dataset)); + core::ScopedUnref scoped_unref(host_queue_dataset); + + EXPECT_EQ(host_queue_dataset->node_name(), kNodeName); + + host_queue_dataset->output_dtypes(); + host_queue_dataset->output_shapes(); + host_queue_dataset->DebugString(); + + SerializationContext context(SerializationContext::Params{}); + GraphDefBuilder b; + DatasetBase::DatasetGraphDefBuilder db(&b); + Node *output; + host_queue_dataset->AsGraphDefInternal(&context, &db, &output); + + std::unique_ptr iterator_context; + TF_ASSERT_OK(CreateIteratorContext(host_queue_dataset_context.get(), + &iterator_context)); + std::unique_ptr iterator; + TF_ASSERT_OK(host_queue_dataset->MakeIterator(iterator_context.get(), + "Iterator", &iterator)); + + bool end_of_sequence = false; + std::vector out_tensors; + sleep(5); + setMbufSize(2); + TF_EXPECT_OK(iterator->GetNext(iterator_context.get(), &out_tensors, + &end_of_sequence)); + setDefaultMbufSize(); +} + +TEST_F(HostQueueDatasetOpTest, isholddatatrans2) { + NpuAttrs::SetNewDataTransferFlag(true); + int thread_num = 2, cpu_num = 2; + TF_ASSERT_OK(InitThreadPool(thread_num)); + TF_ASSERT_OK(InitFunctionLibraryRuntime({}, cpu_num)); + + const TestCase &test_case = NormalizeTestStringCase(); + Tensor tensor_slice_dataset_tensor(DT_VARIANT, TensorShape({})); + std::vector inputs_for_tensor_slice_dataset = test_case.input_tensors; + TF_ASSERT_OK(CreateTensorSliceDatasetTensorForQueue(&inputs_for_tensor_slice_dataset, + &tensor_slice_dataset_tensor)); + + gtl::InlinedVector inputs_for_host_queue_dataset( + {TensorValue(&tensor_slice_dataset_tensor), + TensorValue(&tensor_slice_dataset_tensor)}); + + std::unique_ptr host_queue_dataset_kernel; + TF_ASSERT_OK(CreateHostQueueDatasetKernel(test_case.expected_output_dtypes, + test_case.expected_output_shapes, + &host_queue_dataset_kernel, "-1")); + std::unique_ptr host_queue_dataset_context; + TF_ASSERT_OK(CreateHostQueueDatasetContext(host_queue_dataset_kernel.get(), + &inputs_for_host_queue_dataset, + &host_queue_dataset_context)); + DatasetBase *host_queue_dataset; + TF_ASSERT_OK(CreateDataset(host_queue_dataset_kernel.get(), + host_queue_dataset_context.get(), + &host_queue_dataset)); + core::ScopedUnref scoped_unref(host_queue_dataset); + + EXPECT_EQ(host_queue_dataset->node_name(), kNodeName); + + host_queue_dataset->output_dtypes(); + host_queue_dataset->output_shapes(); + host_queue_dataset->DebugString(); + + SerializationContext context(SerializationContext::Params{}); + GraphDefBuilder b; + DatasetBase::DatasetGraphDefBuilder db(&b); + Node *output; + host_queue_dataset->AsGraphDefInternal(&context, &db, &output); + + std::unique_ptr iterator_context; + TF_ASSERT_OK(CreateIteratorContext(host_queue_dataset_context.get(), + &iterator_context)); + setMbufSize(3); + std::unique_ptr iterator; + TF_ASSERT_OK(host_queue_dataset->MakeIterator(iterator_context.get(), + "Iterator", &iterator)); + + bool end_of_sequence = false; + std::vector out_tensors; + sleep(5); + TF_EXPECT_OK(iterator->GetNext(iterator_context.get(), &out_tensors, + &end_of_sequence)); + setDefaultMbufSize(); +} + +TEST_F(HostQueueDatasetOpTest, isholddatatrans3) { + NpuAttrs::SetNewDataTransferFlag(true); + int thread_num = 2, cpu_num = 2; + TF_ASSERT_OK(InitThreadPool(thread_num)); + TF_ASSERT_OK(InitFunctionLibraryRuntime({}, cpu_num)); + + const TestCase &test_case = NormalizeTestStringCase(); + Tensor tensor_slice_dataset_tensor(DT_VARIANT, TensorShape({})); + std::vector inputs_for_tensor_slice_dataset = test_case.input_tensors; + TF_ASSERT_OK(CreateTensorSliceDatasetTensorForQueue(&inputs_for_tensor_slice_dataset, + &tensor_slice_dataset_tensor)); + + gtl::InlinedVector inputs_for_host_queue_dataset( + {TensorValue(&tensor_slice_dataset_tensor), + TensorValue(&tensor_slice_dataset_tensor)}); + + std::unique_ptr host_queue_dataset_kernel; + TF_ASSERT_OK(CreateHostQueueDatasetKernel(test_case.expected_output_dtypes, + test_case.expected_output_shapes, + &host_queue_dataset_kernel, "-1")); + std::unique_ptr host_queue_dataset_context; + TF_ASSERT_OK(CreateHostQueueDatasetContext(host_queue_dataset_kernel.get(), + &inputs_for_host_queue_dataset, + &host_queue_dataset_context)); + DatasetBase *host_queue_dataset; + TF_ASSERT_OK(CreateDataset(host_queue_dataset_kernel.get(), + host_queue_dataset_context.get(), + &host_queue_dataset)); + core::ScopedUnref scoped_unref(host_queue_dataset); + + EXPECT_EQ(host_queue_dataset->node_name(), kNodeName); + + host_queue_dataset->output_dtypes(); + host_queue_dataset->output_shapes(); + host_queue_dataset->DebugString(); + + SerializationContext context(SerializationContext::Params{}); + GraphDefBuilder b; + DatasetBase::DatasetGraphDefBuilder db(&b); + Node *output; + host_queue_dataset->AsGraphDefInternal(&context, &db, &output); + + std::unique_ptr iterator_context; + TF_ASSERT_OK(CreateIteratorContext(host_queue_dataset_context.get(), + &iterator_context)); + + setMbufSize(65); + std::unique_ptr iterator; + TF_ASSERT_OK(host_queue_dataset->MakeIterator(iterator_context.get(), + "Iterator", &iterator)); + + bool end_of_sequence = false; + std::vector out_tensors; + sleep(5); + TF_EXPECT_OK(iterator->GetNext(iterator_context.get(), &out_tensors, + &end_of_sequence)); + setDefaultMbufSize(); +} + } // namespace } // namespace data } // namespace tensorflow