From e7a9f0e7086f5ac4c349dc321dadb6d09a13d627 Mon Sep 17 00:00:00 2001 From: medivh-x Date: Sat, 20 Mar 2021 15:47:18 +0800 Subject: [PATCH 1/6] clean code --- tf_adapter_2.x/npu_device/core/npu_device.cpp | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tf_adapter_2.x/npu_device/core/npu_device.cpp b/tf_adapter_2.x/npu_device/core/npu_device.cpp index 0f89ed76a..9d5787b6b 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.cpp +++ b/tf_adapter_2.x/npu_device/core/npu_device.cpp @@ -127,14 +127,6 @@ std::string NpuDevice::CreateDevice(const char *name, int device_index, return "Failed init graph engine: create new session failed"; } - std::shared_ptr parser = - domi::ModelParserFactory::Instance()->CreateModelParser(domi::FrameworkType::TENSORFLOW); - if (parser == nullptr) { - return "Failed init graph engine: create tensorflow model parser failed"; - } - - std::unique_ptr status(TF_NewStatus(), TF_DeleteStatus); - *device = new (std::nothrow) NpuDevice(); if (*device == nullptr) { return "Failed create new npu device instance"; -- Gitee From 4f714e2a0933806beff619e698458d1c1f34abe5 Mon Sep 17 00:00:00 2001 From: medivh-x Date: Sat, 20 Mar 2021 18:18:26 +0800 Subject: [PATCH 2/6] support loop on npu --- tf_adapter_2.x/npu_device/core/npu_device.cpp | 111 ++++++++++++++++++ tf_adapter_2.x/npu_device/core/npu_device.h | 2 + 2 files changed, 113 insertions(+) diff --git a/tf_adapter_2.x/npu_device/core/npu_device.cpp b/tf_adapter_2.x/npu_device/core/npu_device.cpp index 9d5787b6b..3c34716ab 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.cpp +++ b/tf_adapter_2.x/npu_device/core/npu_device.cpp @@ -1256,6 +1256,117 @@ void NpuDevice::RunOp(TFE_Context *context, const npu::OpSpec *spec, int num_inp } // 计数-2 } +namespace { +tensorflow::Node *AddVarInitToGraph(TFE_Context *context, std::string name, tensorflow::Tensor tensor, + tensorflow::Graph *graph, TF_Status *status) { + tensorflow::Node *variable; + tensorflow::Node *value; + tensorflow::Node *assign_variable; + + NPU_CTX_REQUIRES_OK_RETURN(status, + tensorflow::NodeBuilder(name, "VarHandleOp") + .Attr("container", "") + .Attr("shared_name", name) + .Attr("dtype", tensor.dtype()) + .Attr("shape", tensor.shape()) + .Finalize(graph, &variable), + assign_variable); + NPU_CTX_REQUIRES_OK_RETURN(status, + tensorflow::NodeBuilder(name + "_v", "Const") + .Attr("value", tensor) + .Attr("dtype", tensor.dtype()) + .Finalize(graph, &value), + assign_variable); + NPU_CTX_REQUIRES_OK_RETURN(status, + tensorflow::NodeBuilder(name + "_op", "AssignVariableOp") + .Input(variable, 0) + .Input(value, 0) + .Attr("dtype", tensor.dtype()) + .Finalize(graph, &assign_variable), + assign_variable); + + AssembleOpDef(variable); + AssembleOpDef(value); + AssembleOpDef(assign_variable); + + AssembleOutputDesc(TensorShapes({kScalarShape}), {tensorflow::DT_RESOURCE}, variable); + AssembleOutputDesc(TensorShapes({tensor.shape()}), {tensor.dtype()}, value); + AssembleInputDesc(TensorShapes({kScalarShape, tensor.shape()}), {tensorflow::DT_RESOURCE, tensor.dtype()}, + assign_variable); + return assign_variable; +} +} // namespace + +void NpuDevice::SetNpuLoopSize(TFE_Context *context, int64_t loop, TF_Status *status) { + static std::atomic_bool initialized{false}; + static std::atomic_int64_t current_loop_size{1}; + static tensorflow::Status init_status = tensorflow::Status::OK(); + static std::uint64_t loop_var_graph_id = 0; + const static std::string kLoopVarName = "npu_runconfig/iterations_per_loop"; + + if (!initialized.exchange(true)) { + tensorflow::Graph graph(tensorflow::OpRegistry::Global()); + AddVarInitToGraph(context, "npu_runconfig/iterations_per_loop", tensorflow::Tensor(int64_t(1)), &graph, status); + if (TF_GetCode(status) != TF_OK) return; + AddVarInitToGraph(context, "npu_runconfig/loop_cond", tensorflow::Tensor(int64_t(1)), &graph, status); + if (TF_GetCode(status) != TF_OK) return; + AddVarInitToGraph(context, "npu_runconfig/one", tensorflow::Tensor(int64_t(1)), &graph, status); + if (TF_GetCode(status) != TF_OK) return; + AddVarInitToGraph(context, "npu_runconfig/zero", tensorflow::Tensor(int64_t(0)), &graph, status); + if (TF_GetCode(status) != TF_OK) return; + + RunGeGraphPin2CpuAnonymous(context, "set_npu_loop_size", graph.ToGraphDefDebug(), 0, nullptr, 0, nullptr, status); + if (TF_GetCode(status) != TF_OK) return; + + tensorflow::Node *variable; + tensorflow::Node *arg; + tensorflow::Node *assign_variable; + + tensorflow::Graph graph2(tensorflow::OpRegistry::Global()); + + NPU_CTX_REQUIRES_OK(status, tensorflow::NodeBuilder(kLoopVarName, "VarHandleOp") + .Attr("container", "") + .Attr("shared_name", kLoopVarName) + .Attr("dtype", tensorflow::DT_INT64) + .Attr("shape", kScalarShape) + .Finalize(&graph2, &variable)); + NPU_CTX_REQUIRES_OK(status, tensorflow::NodeBuilder(kLoopVarName + "_v", "Const") + .Attr("index", 0) + .Attr("dtype", tensorflow::DT_INT64) + .Finalize(&graph2, &arg)); + NPU_CTX_REQUIRES_OK(status, tensorflow::NodeBuilder(kLoopVarName + "_op", "AssignVariableOp") + .Input(variable, 0) + .Input(arg, 0) + .Attr("dtype", tensorflow::DT_INT64) + .Finalize(&graph2, &assign_variable)); + + AssembleOpDef(variable); + AssembleOpDef(arg); + AssembleOpDef(assign_variable); + + AssembleOutputDesc(TensorShapes({kScalarShape}), {tensorflow::DT_RESOURCE}, variable); + AssembleOutputDesc(TensorShapes({kScalarShape}), {tensorflow::DT_INT64}, arg); + AssembleInputDesc(TensorShapes({kScalarShape, kScalarShape}), {tensorflow::DT_RESOURCE, tensorflow::DT_INT64}, + assign_variable); + + loop_var_graph_id = AddGeGraph(context, "set_loop_var", graph2.ToGraphDefDebug(), status); + init_status = status->status; + if (TF_GetCode(status) != TF_OK) return; + } + + status->status = init_status; + if (TF_GetCode(status) != TF_OK) return; + + std::vector inputs(1); + inputs[0] = tensorflow::wrap(tensorflow::TensorHandle::CreateLocalHandle(tensorflow::Tensor(loop))); + + RunGeGraphPin2Cpu(context, loop_var_graph_id, inputs.size(), inputs.data(), {}, 0, nullptr, status); + + for (auto handle : inputs) { + TFE_DeleteTensorHandle(handle); + } +} + void NpuDevice::RunGraph(TFE_Context *context, const npu::FuncSpec *spec, int tf_num_inputs, TFE_TensorHandle **tf_inputs, int *num_outputs, TFE_TensorHandle **outputs, TF_Status *status) { diff --git a/tf_adapter_2.x/npu_device/core/npu_device.h b/tf_adapter_2.x/npu_device/core/npu_device.h index ab6d317ab..a8413d0cf 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.h +++ b/tf_adapter_2.x/npu_device/core/npu_device.h @@ -89,6 +89,8 @@ class NpuDevice { void RunOp(TFE_Context *context, const npu::OpSpec *spec, int num_inputs, TFE_TensorHandle **inputs, int *num_outputs, TFE_TensorHandle **outputs, TF_Status *status); + void SetNpuLoopSize(TFE_Context *context, int64_t loop, TF_Status *status); + void RunGraph(TFE_Context *context, const npu::FuncSpec *spec, int num_inputs, TFE_TensorHandle **inputs, int *num_outputs, TFE_TensorHandle **outputs, TF_Status *status); -- Gitee From e62e7cb93645a350b48b8fab938be626c3735ea3 Mon Sep 17 00:00:00 2001 From: medivh-x Date: Sat, 20 Mar 2021 18:18:26 +0800 Subject: [PATCH 3/6] support loop on npu --- tf_adapter_2.x/npu_device/core/npu_device.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tf_adapter_2.x/npu_device/core/npu_device.cpp b/tf_adapter_2.x/npu_device/core/npu_device.cpp index 3c34716ab..34060725b 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.cpp +++ b/tf_adapter_2.x/npu_device/core/npu_device.cpp @@ -1304,6 +1304,8 @@ void NpuDevice::SetNpuLoopSize(TFE_Context *context, int64_t loop, TF_Status *st static std::uint64_t loop_var_graph_id = 0; const static std::string kLoopVarName = "npu_runconfig/iterations_per_loop"; + if (current_loop_size == loop) return; + if (!initialized.exchange(true)) { tensorflow::Graph graph(tensorflow::OpRegistry::Global()); AddVarInitToGraph(context, "npu_runconfig/iterations_per_loop", tensorflow::Tensor(int64_t(1)), &graph, status); @@ -1362,6 +1364,9 @@ void NpuDevice::SetNpuLoopSize(TFE_Context *context, int64_t loop, TF_Status *st RunGeGraphPin2Cpu(context, loop_var_graph_id, inputs.size(), inputs.data(), {}, 0, nullptr, status); + if (TF_GetCode(status) == TF_OK) { + current_loop_size = loop; + } for (auto handle : inputs) { TFE_DeleteTensorHandle(handle); } @@ -1404,6 +1409,8 @@ void NpuDevice::RunGraph(TFE_Context *context, const npu::FuncSpec *spec, int tf for (auto node : spec->Graph()->op_nodes()) { if (node->IsWhileNode()) { iterations_per_loop = kGlobalLoopSize; + SetNpuLoopSize(context, iterations_per_loop, status); + if (TF_GetCode(status) != TF_OK) return; break; } } -- Gitee From 0936ae8565c76883962c1485c55935033493c65e Mon Sep 17 00:00:00 2001 From: medivh-x Date: Mon, 22 Mar 2021 11:31:00 +0800 Subject: [PATCH 4/6] npu auto loop for while --- .../npu_device/core/npu_cache_spec.h | 4 ++ tf_adapter_2.x/npu_device/core/npu_device.cpp | 47 +++++++++++++------ tf_adapter_2.x/npu_device/core/npu_env.h | 6 +++ 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/tf_adapter_2.x/npu_device/core/npu_cache_spec.h b/tf_adapter_2.x/npu_device/core/npu_cache_spec.h index 893fa9558..9259e4e6c 100644 --- a/tf_adapter_2.x/npu_device/core/npu_cache_spec.h +++ b/tf_adapter_2.x/npu_device/core/npu_cache_spec.h @@ -181,6 +181,9 @@ class FuncSpec : public TaskSpec { void SetBuilt() const { built_.store(true); } bool Built() const { return built_; } + void SetNeedLoop() const { need_loop_.store(true); } + bool NeedLoop() const { return need_loop_; } + void PruneInputs(int num_inputs, TFE_TensorHandle **inputs, std::vector &pruned) const { prune_func_(num_inputs, inputs, pruned); } @@ -204,6 +207,7 @@ class FuncSpec : public TaskSpec { PruneInputsFunc prune_func_; const std::map> dependent_host_resources_; std::atomic_bool mutable built_{false}; + std::atomic_bool mutable need_loop_{false}; }; } // namespace npu diff --git a/tf_adapter_2.x/npu_device/core/npu_device.cpp b/tf_adapter_2.x/npu_device/core/npu_device.cpp index 34060725b..ecfa3455e 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.cpp +++ b/tf_adapter_2.x/npu_device/core/npu_device.cpp @@ -1308,13 +1308,14 @@ void NpuDevice::SetNpuLoopSize(TFE_Context *context, int64_t loop, TF_Status *st if (!initialized.exchange(true)) { tensorflow::Graph graph(tensorflow::OpRegistry::Global()); - AddVarInitToGraph(context, "npu_runconfig/iterations_per_loop", tensorflow::Tensor(int64_t(1)), &graph, status); + AddVarInitToGraph(context, "npu_runconfig/iterations_per_loop", tensorflow::Tensor(tensorflow::int64(1)), &graph, + status); if (TF_GetCode(status) != TF_OK) return; - AddVarInitToGraph(context, "npu_runconfig/loop_cond", tensorflow::Tensor(int64_t(1)), &graph, status); + AddVarInitToGraph(context, "npu_runconfig/loop_cond", tensorflow::Tensor(tensorflow::int64(1)), &graph, status); if (TF_GetCode(status) != TF_OK) return; - AddVarInitToGraph(context, "npu_runconfig/one", tensorflow::Tensor(int64_t(1)), &graph, status); + AddVarInitToGraph(context, "npu_runconfig/one", tensorflow::Tensor(tensorflow::int64(1)), &graph, status); if (TF_GetCode(status) != TF_OK) return; - AddVarInitToGraph(context, "npu_runconfig/zero", tensorflow::Tensor(int64_t(0)), &graph, status); + AddVarInitToGraph(context, "npu_runconfig/zero", tensorflow::Tensor(tensorflow::int64(0)), &graph, status); if (TF_GetCode(status) != TF_OK) return; RunGeGraphPin2CpuAnonymous(context, "set_npu_loop_size", graph.ToGraphDefDebug(), 0, nullptr, 0, nullptr, status); @@ -1360,7 +1361,8 @@ void NpuDevice::SetNpuLoopSize(TFE_Context *context, int64_t loop, TF_Status *st if (TF_GetCode(status) != TF_OK) return; std::vector inputs(1); - inputs[0] = tensorflow::wrap(tensorflow::TensorHandle::CreateLocalHandle(tensorflow::Tensor(loop))); + inputs[0] = + tensorflow::wrap(tensorflow::TensorHandle::CreateLocalHandle(tensorflow::Tensor(tensorflow::int64(loop)))); RunGeGraphPin2Cpu(context, loop_var_graph_id, inputs.size(), inputs.data(), {}, 0, nullptr, status); @@ -1404,17 +1406,12 @@ void NpuDevice::RunGraph(TFE_Context *context, const npu::FuncSpec *spec, int tf if (kCustomKernelEnabled) { // TODO:这里根据小循环策略修改值 - int64_t iterations_per_loop = 1; - if (!spec->DependentHostResources().empty()) { - for (auto node : spec->Graph()->op_nodes()) { - if (node->IsWhileNode()) { - iterations_per_loop = kGlobalLoopSize; - SetNpuLoopSize(context, iterations_per_loop, status); - if (TF_GetCode(status) != TF_OK) return; - break; - } - } + int64_t iterations_per_loop = spec->NeedLoop() ? kGlobalLoopSize : 1; + if (iterations_per_loop > 1) { + SetNpuLoopSize(context, iterations_per_loop, status); + if (TF_GetCode(status) != TF_OK) return; } + for (const auto &resource : spec->DependentHostResources()) { LOG(INFO) << "Start consume iterator resource " << resource.second->Name() << " " << iterations_per_loop << " times"; @@ -1686,6 +1683,25 @@ uint64_t NpuDevice::AddGeGraph(TFE_Context *context, uint64_t graph_id, const st return subgraph; }; + if (kAutoLoopEnabled) { + tensorflow::Graph graph(npu::UnwrapCtx(context)->FuncLibDef()); + tensorflow::ConvertGraphDefToGraph(tensorflow::GraphConstructorOptions{}, def, &graph); + for (auto node : graph.op_nodes()) { + if (node->IsWhileNode()) { + auto loop_graph = request_subgraph(nullptr, node->attrs().Find("body")->func().name()); + NPU_CTX_REQUIRES_GE_OK_RETURN( + status, "NPU Parse tensorflow model", + parser->ParseProtoWithSubgraph(loop_graph.get(), request_subgraph, ge_compute_graph), graph_id); + + ge::Graph ge_graph = ge::GraphUtils::CreateGraphFromComputeGraph(ge_compute_graph); + ge_graph.SetNeedIteration(true); + NPU_CTX_REQUIRES_GE_OK_RETURN(status, "Graph engine Add graph", GeSession()->AddGraph(graph_id, ge_graph), + graph_id); + return graph_id; + } + } + } + NPU_CTX_REQUIRES_GE_OK_RETURN(status, "NPU Parse tensorflow model", parser->ParseProtoWithSubgraph(&def, request_subgraph, ge_compute_graph), graph_id); @@ -1831,6 +1847,7 @@ std::shared_ptr NpuDevice::CacheFuncSpec( const std::map> &dependent_host_resources, const std::string &reason) { auto spec = std::make_shared(op_spec, ndef, ge_graph_id, std::move(graph), prune_func, dependent_host_resources, reason); + spec->SetNeedLoop(); cached_func_specs_[op] = spec; DLOG() << "Cache function op spec " << spec->DebugString(); return spec; diff --git a/tf_adapter_2.x/npu_device/core/npu_env.h b/tf_adapter_2.x/npu_device/core/npu_env.h index 5435faef9..9f3f8bfc8 100644 --- a/tf_adapter_2.x/npu_device/core/npu_env.h +++ b/tf_adapter_2.x/npu_device/core/npu_env.h @@ -44,4 +44,10 @@ const static bool kPerfEnabled = []() -> bool { return perf_enabled; }(); +const static bool kAutoLoopEnabled = []() -> bool { + bool loop_enabled = false; + tensorflow::ReadBoolFromEnvVar("NPU_EXPERIMENTAL_AUTO_LOOP", false, &loop_enabled); + return loop_enabled; +}(); + #endif // TENSORFLOW_NPU_ENV_H -- Gitee From e588836ec70df8f4dd1589378a43ab8696253751 Mon Sep 17 00:00:00 2001 From: medivh-x Date: Sun, 28 Mar 2021 12:54:59 +0800 Subject: [PATCH 5/6] support ge loop on npu --- .../npu_device/core/npu_cache_spec.h | 2 +- tf_adapter_2.x/npu_device/core/npu_device.cpp | 129 +++++++++++------- tf_adapter_2.x/npu_device/core/npu_device.h | 6 + 3 files changed, 89 insertions(+), 48 deletions(-) diff --git a/tf_adapter_2.x/npu_device/core/npu_cache_spec.h b/tf_adapter_2.x/npu_device/core/npu_cache_spec.h index 9259e4e6c..3f68d9209 100644 --- a/tf_adapter_2.x/npu_device/core/npu_cache_spec.h +++ b/tf_adapter_2.x/npu_device/core/npu_cache_spec.h @@ -181,7 +181,7 @@ class FuncSpec : public TaskSpec { void SetBuilt() const { built_.store(true); } bool Built() const { return built_; } - void SetNeedLoop() const { need_loop_.store(true); } + void SetNeedLoop(bool loop) const { need_loop_.store(loop); } bool NeedLoop() const { return need_loop_; } void PruneInputs(int num_inputs, TFE_TensorHandle **inputs, std::vector &pruned) const { diff --git a/tf_adapter_2.x/npu_device/core/npu_device.cpp b/tf_adapter_2.x/npu_device/core/npu_device.cpp index ecfa3455e..bc5ffc82b 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.cpp +++ b/tf_adapter_2.x/npu_device/core/npu_device.cpp @@ -36,6 +36,7 @@ #include "framework/omg/parser/parser_factory.h" using Format = ge::Format; +const static uint64_t kInvalidGeGraphId = -1; namespace { template @@ -72,6 +73,34 @@ size_t RemoveRedundantHcomControlEdges(tensorflow::Graph *graph) { return edges_to_remove.size(); } +bool IsGraphNeedLoop(const std::string &name, const tensorflow::GraphDef &def) { + const static std::unordered_set kNecessaryOps{"IteratorV2"}; + const static std::unordered_set kTrainKeyOps{"ResourceApplyKerasMomentum"}; + + bool contain_necessary_op = false; + bool contain_train_key_op = false; + + for (const auto &ndef : def.node()) { + if (!contain_train_key_op && kTrainKeyOps.count(ndef.op())) { + if (contain_necessary_op) { + return true; + } + contain_train_key_op = true; + } + if (!contain_necessary_op && kNecessaryOps.count(ndef.op())) { + if (contain_train_key_op) { + return true; + } + contain_necessary_op = true; + } + } + return false; +} + +bool IsGraphNeedLoop(const std::string &name, const tensorflow::Graph *graph) { + return IsGraphNeedLoop(name, graph->ToGraphDefDebug()); +} + } // namespace void NpuDevice::CreateIteratorProvider(TFE_Context *context, const tensorflow::Tensor *tensor, @@ -140,14 +169,14 @@ std::string NpuDevice::CreateDevice(const char *name, int device_index, } void NpuDevice::ReleaseResource() { - DLOG() << "Start cancel all uncompleted async call"; - CancellationManager()->StartCancel(); - std::vector> thread_guarder; for (auto &iterator_provider : iterator_providers_) { auto provider = iterator_provider.second; thread_guarder.emplace_back(std::async([provider]() { provider->Destroy(); })); } + + DLOG() << "Start cancel all uncompleted async call"; + CancellationManager()->StartCancel(); } void NpuDevice::DeleteDevice(void *device) { @@ -870,7 +899,6 @@ void NpuDevice::GetOrCreateSpec(TFE_Context *context, const char *op_name, const tensorflow::Status compat_status = ValidateOutput(op_name, data_types); if (!compat_status.ok()) { if (is_function_op) { - const static uint64_t kInvalidGeGraphId = -1; *spec = CacheFuncSpec(op_name, op_reg_data, ndef, kInvalidGeGraphId, {}, {}, {}, compat_status.error_message()); return; } else { @@ -936,25 +964,34 @@ void NpuDevice::GetOrCreateSpec(TFE_Context *context, const char *op_name, const *gdef.mutable_library() = fdef_lib; WriteTextProto(tensorflow::Env::Default(), "step_3_after_mark_shape_" + file_name_suffix, gdef); } - // 因为parser当前约定的附加属性不是匿名属性(非下划线开头,所以这里当前需要拷贝一份新图用于标记parser所需属性) - uint64_t graph_id = 0; - if (kCustomKernelEnabled) { - graph_id = AddGeGraph(context, std::string("tf_function_") + op_name, optimize_graph->ToGraphDefDebug(), s); - if (TF_GetCode(s) != TF_OK) return; - } - DLOG() << std::string("tf_function_") + op_name << " remained input index (0-" << num_inputs - 1 << ") -> " - << VecToString(remain_indexes); + DLOG() << op_name << " remained input index (0-" << num_inputs - 1 << ") -> " << VecToString(remain_indexes); auto lambda = [remain_indexes](int num_inputs, TFE_TensorHandle **inputs, std::vector &pruned) { for (auto index : remain_indexes) { pruned.push_back(inputs[index]); } }; + // 对于function节点,可以将resource的输入NPU兼容性作为缓存项目,校验输入是否被NPU支持,如果类型不支持,或者是CPU的Resouce类型,则不支持 // 如果是单算子,则不能缓存,需要在每次dev->Run的时候,校验单算子资源输入的兼容性 - *spec = - CacheFuncSpec(op_name, op_reg_data, ndef, graph_id, std::move(optimize_graph), lambda, dependent_host_resources, - ValidateInput(op_name, pruned_inputs.size(), pruned_inputs.data()).error_message()); + auto status = ValidateInput(op_name, pruned_inputs.size(), pruned_inputs.data()); + if (!status.ok()) { + *spec = CacheFuncSpec(op_name, op_reg_data, ndef, kInvalidGeGraphId, {}, {}, {}, status.error_message()); + } else { + uint64_t graph_id = kInvalidGeGraphId; + bool need_loop = false; + if (kCustomKernelEnabled) { + if (!dependent_host_resources.empty()) { + graph_id = AddMaybeAutoLoopGeGraph(context, op_name, optimize_graph->ToGraphDefDebug(), need_loop, s); + } else { + graph_id = AddGeGraph(context, op_name, optimize_graph->ToGraphDefDebug(), s); + } + if (TF_GetCode(s) != TF_OK) return; + } + *spec = CacheFuncSpec(op_name, op_reg_data, ndef, graph_id, std::move(optimize_graph), lambda, + dependent_host_resources, ""); + reinterpret_cast(spec->get())->SetNeedLoop(need_loop); + } return; } else { // 进行inferShape,输出可能是unknown shape,所以使用partial shape @@ -1306,19 +1343,19 @@ void NpuDevice::SetNpuLoopSize(TFE_Context *context, int64_t loop, TF_Status *st if (current_loop_size == loop) return; + LOG(INFO) << "Set npu loop size to " << loop; + if (!initialized.exchange(true)) { tensorflow::Graph graph(tensorflow::OpRegistry::Global()); - AddVarInitToGraph(context, "npu_runconfig/iterations_per_loop", tensorflow::Tensor(tensorflow::int64(1)), &graph, - status); - if (TF_GetCode(status) != TF_OK) return; - AddVarInitToGraph(context, "npu_runconfig/loop_cond", tensorflow::Tensor(tensorflow::int64(1)), &graph, status); + AddVarInitToGraph(context, "npu_runconfig/loop_cond", tensorflow::Tensor(tensorflow::int64(0)), &graph, status); if (TF_GetCode(status) != TF_OK) return; AddVarInitToGraph(context, "npu_runconfig/one", tensorflow::Tensor(tensorflow::int64(1)), &graph, status); if (TF_GetCode(status) != TF_OK) return; AddVarInitToGraph(context, "npu_runconfig/zero", tensorflow::Tensor(tensorflow::int64(0)), &graph, status); if (TF_GetCode(status) != TF_OK) return; - RunGeGraphPin2CpuAnonymous(context, "set_npu_loop_size", graph.ToGraphDefDebug(), 0, nullptr, 0, nullptr, status); + RunGeGraphPin2CpuAnonymous(context, "set_npu_loop_conditions", graph.ToGraphDefDebug(), 0, nullptr, 0, nullptr, + status); if (TF_GetCode(status) != TF_OK) return; tensorflow::Node *variable; @@ -1333,9 +1370,9 @@ void NpuDevice::SetNpuLoopSize(TFE_Context *context, int64_t loop, TF_Status *st .Attr("dtype", tensorflow::DT_INT64) .Attr("shape", kScalarShape) .Finalize(&graph2, &variable)); - NPU_CTX_REQUIRES_OK(status, tensorflow::NodeBuilder(kLoopVarName + "_v", "Const") + NPU_CTX_REQUIRES_OK(status, tensorflow::NodeBuilder(kLoopVarName + "_v", "_Arg") + .Attr("T", tensorflow::DT_INT64) .Attr("index", 0) - .Attr("dtype", tensorflow::DT_INT64) .Finalize(&graph2, &arg)); NPU_CTX_REQUIRES_OK(status, tensorflow::NodeBuilder(kLoopVarName + "_op", "AssignVariableOp") .Input(variable, 0) @@ -1362,7 +1399,7 @@ void NpuDevice::SetNpuLoopSize(TFE_Context *context, int64_t loop, TF_Status *st std::vector inputs(1); inputs[0] = - tensorflow::wrap(tensorflow::TensorHandle::CreateLocalHandle(tensorflow::Tensor(tensorflow::int64(loop)))); + tensorflow::wrap(tensorflow::TensorHandle::CreateLocalHandle(tensorflow::Tensor(tensorflow::int64(loop - 1)))); RunGeGraphPin2Cpu(context, loop_var_graph_id, inputs.size(), inputs.data(), {}, 0, nullptr, status); @@ -1637,8 +1674,8 @@ void NpuDevice::RunGeGraphAsync(TFE_Context *context, uint64_t graph_id, int num ge_session_->RunGraphAsync(graph_id, ge_inputs, ge_callback)); } -uint64_t NpuDevice::AddGeGraph(TFE_Context *context, uint64_t graph_id, const std::string &name, - const tensorflow::GraphDef &def, TF_Status *status) { +uint64_t NpuDevice::AddGeGraphInner(TFE_Context *context, uint64_t graph_id, const std::string &name, + const tensorflow::GraphDef &def, bool loop, TF_Status *status) { auto ge_compute_graph = std::make_shared(name); std::shared_ptr parser = domi::ModelParserFactory::Instance()->CreateModelParser(domi::FrameworkType::TENSORFLOW); @@ -1683,39 +1720,39 @@ uint64_t NpuDevice::AddGeGraph(TFE_Context *context, uint64_t graph_id, const st return subgraph; }; - if (kAutoLoopEnabled) { - tensorflow::Graph graph(npu::UnwrapCtx(context)->FuncLibDef()); - tensorflow::ConvertGraphDefToGraph(tensorflow::GraphConstructorOptions{}, def, &graph); - for (auto node : graph.op_nodes()) { - if (node->IsWhileNode()) { - auto loop_graph = request_subgraph(nullptr, node->attrs().Find("body")->func().name()); - NPU_CTX_REQUIRES_GE_OK_RETURN( - status, "NPU Parse tensorflow model", - parser->ParseProtoWithSubgraph(loop_graph.get(), request_subgraph, ge_compute_graph), graph_id); - - ge::Graph ge_graph = ge::GraphUtils::CreateGraphFromComputeGraph(ge_compute_graph); - ge_graph.SetNeedIteration(true); - NPU_CTX_REQUIRES_GE_OK_RETURN(status, "Graph engine Add graph", GeSession()->AddGraph(graph_id, ge_graph), - graph_id); - return graph_id; - } - } - } - NPU_CTX_REQUIRES_GE_OK_RETURN(status, "NPU Parse tensorflow model", parser->ParseProtoWithSubgraph(&def, request_subgraph, ge_compute_graph), graph_id); ge::Graph ge_graph = ge::GraphUtils::CreateGraphFromComputeGraph(ge_compute_graph); + + ge_graph.SetNeedIteration(loop); + NPU_CTX_REQUIRES_GE_OK_RETURN(status, "Graph engine Add graph", GeSession()->AddGraph(graph_id, ge_graph), graph_id); return graph_id; } +uint64_t NpuDevice::AddGeGraph(TFE_Context *context, uint64_t graph_id, const std::string &name, + const tensorflow::GraphDef &def, TF_Status *status) { + return AddGeGraphInner(context, graph_id, name, def, false, status); +} + uint64_t NpuDevice::AddGeGraph(TFE_Context *context, const std::string &name, const tensorflow::GraphDef &def, TF_Status *status) { uint64_t graph_id = NextUUID(); return AddGeGraph(context, graph_id, name, def, status); } +uint64_t NpuDevice::AddMaybeAutoLoopGeGraph(TFE_Context *context, const std::string &name, + const tensorflow::GraphDef &def, bool &loop, TF_Status *status) { + if (kAutoLoopEnabled && kGlobalLoopSize > 1) { + loop = false; + } else { + loop = false; + } + uint64_t graph_id = NextUUID(); + return AddGeGraphInner(context, graph_id, name, def, loop, status); +} + void NpuDevice::RemoveGeGraph(TFE_Context *context, uint64_t graph_id, TF_Status *status) { NPU_CTX_REQUIRES_GE_OK(status, "Graph engine Remove graph", GeSession()->RemoveGraph(graph_id)); } @@ -1804,8 +1841,7 @@ void NpuDevice::MaybeRebuildFuncSpecGraph(TFE_Context *context, const npu::FuncS LOG(INFO) << "Start rebuild ge graph " << spec->GeGraphId(); RemoveGeGraph(context, spec->GeGraphId(), status); if (TF_GetCode(status) != TF_OK) return; - AddGeGraph(context, spec->GeGraphId(), std::string("tf_function_") + spec->Op(), spec->Graph()->ToGraphDefDebug(), - status); + AddGeGraph(context, spec->GeGraphId(), spec->Op(), spec->Graph()->ToGraphDefDebug(), status); } } @@ -1847,7 +1883,6 @@ std::shared_ptr NpuDevice::CacheFuncSpec( const std::map> &dependent_host_resources, const std::string &reason) { auto spec = std::make_shared(op_spec, ndef, ge_graph_id, std::move(graph), prune_func, dependent_host_resources, reason); - spec->SetNeedLoop(); cached_func_specs_[op] = spec; DLOG() << "Cache function op spec " << spec->DebugString(); return spec; diff --git a/tf_adapter_2.x/npu_device/core/npu_device.h b/tf_adapter_2.x/npu_device/core/npu_device.h index a8413d0cf..e203ea31f 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.h +++ b/tf_adapter_2.x/npu_device/core/npu_device.h @@ -112,6 +112,12 @@ class NpuDevice { uint64_t AddGeGraph(TFE_Context *context, uint64_t graph_id, const std::string &name, const tensorflow::GraphDef &def, TF_Status *status); + uint64_t AddMaybeAutoLoopGeGraph(TFE_Context *context, const std::string &name, const tensorflow::GraphDef &def, + bool &loop, TF_Status *status); + + uint64_t AddGeGraphInner(TFE_Context *context, uint64_t graph_id, const std::string &name, + const tensorflow::GraphDef &def, bool loop, TF_Status *status); + void RemoveGeGraph(TFE_Context *context, uint64_t graph_id, TF_Status *status); void RunGeGraph(TFE_Context *context, uint64_t graph_id, int num_inputs, TFE_TensorHandle **inputs, bool pin_to_npu, -- Gitee From 2d6ca819eda7ca22071cc9d271f6edad38e8c455 Mon Sep 17 00:00:00 2001 From: medivh-x Date: Thu, 1 Apr 2021 16:49:19 +0800 Subject: [PATCH 6/6] auto loop for while node --- tf_adapter_2.x/npu_device/core/npu_device.cpp | 129 ++++++++++++------ tf_adapter_2.x/npu_device/core/npu_device.h | 4 +- 2 files changed, 91 insertions(+), 42 deletions(-) diff --git a/tf_adapter_2.x/npu_device/core/npu_device.cpp b/tf_adapter_2.x/npu_device/core/npu_device.cpp index bc5ffc82b..0edd6092c 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.cpp +++ b/tf_adapter_2.x/npu_device/core/npu_device.cpp @@ -20,6 +20,7 @@ #include "tensorflow/c/eager/tfe_context_internal.h" #include "tensorflow/c/eager/tfe_op_internal.h" #include "tensorflow/c/eager/tfe_tensorhandle_internal.h" +#include "tensorflow/core/grappler/op_types.h" #include "npu_custom_kernel.h" #include "npu_device.h" @@ -73,34 +74,30 @@ size_t RemoveRedundantHcomControlEdges(tensorflow::Graph *graph) { return edges_to_remove.size(); } -bool IsGraphNeedLoop(const std::string &name, const tensorflow::GraphDef &def) { - const static std::unordered_set kNecessaryOps{"IteratorV2"}; - const static std::unordered_set kTrainKeyOps{"ResourceApplyKerasMomentum"}; - - bool contain_necessary_op = false; - bool contain_train_key_op = false; - - for (const auto &ndef : def.node()) { - if (!contain_train_key_op && kTrainKeyOps.count(ndef.op())) { - if (contain_necessary_op) { - return true; - } - contain_train_key_op = true; - } - if (!contain_necessary_op && kNecessaryOps.count(ndef.op())) { - if (contain_train_key_op) { - return true; +bool IsGraphNeedLoop(const tensorflow::Graph *graph, tensorflow::Node **key) { + *key = nullptr; + for (auto node : graph->op_nodes()) { + if (node->IsWhileNode()) { + if (*key != nullptr) { + return false; } - contain_necessary_op = true; + *key = node; } } - return false; -} - -bool IsGraphNeedLoop(const std::string &name, const tensorflow::Graph *graph) { - return IsGraphNeedLoop(name, graph->ToGraphDefDebug()); + if (*key == nullptr) { + DLOG() << "Skip check as no while node in graph"; + return false; + } + size_t reserved_nums = 0; + const std::function &enter = [&reserved_nums](tensorflow::Node *node) { + if (node->IsOp()) { + reserved_nums++; + } + }; + tensorflow::ReverseDFSFrom(*graph, {*key}, enter, {}, {}, {}); + DLOG() << "Reserved nodes " << reserved_nums << " vs. totally " << graph->num_op_nodes(); + return reserved_nums == graph->num_op_nodes(); } - } // namespace void NpuDevice::CreateIteratorProvider(TFE_Context *context, const tensorflow::Tensor *tensor, @@ -979,18 +976,23 @@ void NpuDevice::GetOrCreateSpec(TFE_Context *context, const char *op_name, const *spec = CacheFuncSpec(op_name, op_reg_data, ndef, kInvalidGeGraphId, {}, {}, {}, status.error_message()); } else { uint64_t graph_id = kInvalidGeGraphId; - bool need_loop = false; + bool loop = false; if (kCustomKernelEnabled) { - if (!dependent_host_resources.empty()) { - graph_id = AddMaybeAutoLoopGeGraph(context, op_name, optimize_graph->ToGraphDefDebug(), need_loop, s); - } else { - graph_id = AddGeGraph(context, op_name, optimize_graph->ToGraphDefDebug(), s); + tensorflow::GraphDef loop_graph; + NPU_CTX_REQUIRES_OK(s, GetAutoLoopGraph(context, optimize_graph.get(), pruned_inputs.size(), + pruned_inputs.data(), loop, &loop_graph)); + if (loop) { + LOG(INFO) << "Trans graph " << op_name << " to auto loop graph succeed"; + if (kDumpExecutionDetail || kDumpGraph) { + WriteTextProto(tensorflow::Env::Default(), "LOOP." + std::string(op_name) + ".pbtxt", loop_graph); + } } + graph_id = AddGeGraphInner(context, NextUUID(), op_name, loop_graph, loop, s); if (TF_GetCode(s) != TF_OK) return; } *spec = CacheFuncSpec(op_name, op_reg_data, ndef, graph_id, std::move(optimize_graph), lambda, dependent_host_resources, ""); - reinterpret_cast(spec->get())->SetNeedLoop(need_loop); + reinterpret_cast(spec->get())->SetNeedLoop(loop); } return; } else { @@ -1456,8 +1458,10 @@ void NpuDevice::RunGraph(TFE_Context *context, const npu::FuncSpec *spec, int tf NPU_CTX_REQUIRES_OK(status, npu::UnwrapTensor(tf_inputs[resource.first], &tensor)); // 注意,这个callback不能引用捕获,防止中途因为消费某个资源失败而导致coredump auto done = [resource, iterations_per_loop](const tensorflow::Status &s) { - LOG(INFO) << "Iterator resource " << resource.second->Name() << " consume " << iterations_per_loop - << " times done with status " << s.ToString(); + if (iterations_per_loop > 1 || !s.ok() || kDumpExecutionDetail) { + LOG(INFO) << "Iterator resource " << resource.second->Name() << " consume " << iterations_per_loop + << " times done with status " << s.ToString(); + } }; NPU_CTX_REQUIRES_OK(status, resource.second->ConsumeAsync(*tensor, iterations_per_loop, done)); } @@ -1465,7 +1469,9 @@ void NpuDevice::RunGraph(TFE_Context *context, const npu::FuncSpec *spec, int tf MaybeRebuildFuncSpecGraph(context, spec, status); if (TF_GetCode(status) != TF_OK) return; - LOG(INFO) << "Start run ge graph " << spec->GeGraphId() << " pin to cpu, loop size " << iterations_per_loop; + if (iterations_per_loop > 1 || kDumpExecutionDetail) { + LOG(INFO) << "Start run ge graph " << spec->GeGraphId() << " pin to cpu, loop size " << iterations_per_loop; + } npu::Timer timer("Graph engine run ", iterations_per_loop, " times for graph ", spec->GeGraphId()); timer.Start(); spec->SetBuilt(); @@ -1742,15 +1748,58 @@ uint64_t NpuDevice::AddGeGraph(TFE_Context *context, const std::string &name, co return AddGeGraph(context, graph_id, name, def, status); } -uint64_t NpuDevice::AddMaybeAutoLoopGeGraph(TFE_Context *context, const std::string &name, - const tensorflow::GraphDef &def, bool &loop, TF_Status *status) { - if (kAutoLoopEnabled && kGlobalLoopSize > 1) { - loop = false; - } else { +tensorflow::Status NpuDevice::GetAutoLoopGraph(TFE_Context *context, tensorflow::Graph *origin_graph, int num_inputs, + TFE_TensorHandle **inputs, bool &loop, tensorflow::GraphDef *def) { + tensorflow::FunctionLibraryDefinition *lib_def = npu::UnwrapCtx(context)->FuncLibDef(); + std::unique_ptr graph = std::make_unique(lib_def); + CopyGraph(*origin_graph, graph.get()); + + tensorflow::Node *key; + if (!IsGraphNeedLoop(graph.get(), &key)) { loop = false; + graph->ToGraphDef(def); + return tensorflow::Status::OK(); } - uint64_t graph_id = NextUUID(); - return AddGeGraphInner(context, graph_id, name, def, loop, status); + + loop = true; + + const auto fn_name = key->attrs().Find("body")->func().name(); + DLOG() << "Inline while body func " << fn_name << " for node " << key->name(); + auto builder = tensorflow::NodeBuilder(fn_name, fn_name, lib_def); + for (int i = 0; i < key->num_inputs(); i++) { + const tensorflow::Edge *edge; + NPU_REQUIRES_OK(key->input_edge(i, &edge)); + builder.Input(edge->src(), edge->src_output()); + } + for (auto edge : key->in_edges()) { + if (edge->IsControlEdge()) { + builder.ControlInput(edge->src()); + } + } + + tensorflow::Node *fn_node; + NPU_REQUIRES_OK(builder.Finalize(graph.get(), &fn_node)); + + graph->RemoveNode(key); + tensorflow::FixupSourceAndSinkEdges(graph.get()); + + tensorflow::ProcessFunctionLibraryRuntime *pflr = npu::UnwrapCtx(context)->pflr(); + tensorflow::FunctionLibraryRuntime *flr = pflr->GetFLR("/job:localhost/replica:0/task:0/device:CPU:0"); + + tensorflow::OptimizeGraph(flr, &graph); + + for (auto node : graph->op_nodes()) { + if (tensorflow::grappler::IsVariable(node->def())) { + if (node->attrs().Find("shared_name") != nullptr) { + DLOG() << "Change node " << node->name() << " name to " << node->attrs().Find("shared_name")->s(); + node->set_name(node->attrs().Find("shared_name")->s()); + } + } + } + + MarkGraphNodeInOutDesc(context, graph.get(), num_inputs, inputs); + graph->ToGraphDef(def); + return tensorflow::Status::OK(); } void NpuDevice::RemoveGeGraph(TFE_Context *context, uint64_t graph_id, TF_Status *status) { diff --git a/tf_adapter_2.x/npu_device/core/npu_device.h b/tf_adapter_2.x/npu_device/core/npu_device.h index e203ea31f..0ea6431a4 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.h +++ b/tf_adapter_2.x/npu_device/core/npu_device.h @@ -112,8 +112,8 @@ class NpuDevice { uint64_t AddGeGraph(TFE_Context *context, uint64_t graph_id, const std::string &name, const tensorflow::GraphDef &def, TF_Status *status); - uint64_t AddMaybeAutoLoopGeGraph(TFE_Context *context, const std::string &name, const tensorflow::GraphDef &def, - bool &loop, TF_Status *status); + tensorflow::Status GetAutoLoopGraph(TFE_Context *context, tensorflow::Graph *graph, int num_inputs, + TFE_TensorHandle **inputs, bool &loop, tensorflow::GraphDef *def); uint64_t AddGeGraphInner(TFE_Context *context, uint64_t graph_id, const std::string &name, const tensorflow::GraphDef &def, bool loop, TF_Status *status); -- Gitee