diff --git a/tf_adapter_2.x/npu_device/core/npu_cache_spec.h b/tf_adapter_2.x/npu_device/core/npu_cache_spec.h index 893fa9558d6d6f072f1ad53a9802e336a624d134..3f68d9209c9d961648a28f2445a1ded8e098b055 100644 --- a/tf_adapter_2.x/npu_device/core/npu_cache_spec.h +++ b/tf_adapter_2.x/npu_device/core/npu_cache_spec.h @@ -181,6 +181,9 @@ class FuncSpec : public TaskSpec { void SetBuilt() const { built_.store(true); } bool Built() const { return built_; } + void SetNeedLoop(bool loop) const { need_loop_.store(loop); } + bool NeedLoop() const { return need_loop_; } + void PruneInputs(int num_inputs, TFE_TensorHandle **inputs, std::vector &pruned) const { prune_func_(num_inputs, inputs, pruned); } @@ -204,6 +207,7 @@ class FuncSpec : public TaskSpec { PruneInputsFunc prune_func_; const std::map> dependent_host_resources_; std::atomic_bool mutable built_{false}; + std::atomic_bool mutable need_loop_{false}; }; } // namespace npu diff --git a/tf_adapter_2.x/npu_device/core/npu_device.cpp b/tf_adapter_2.x/npu_device/core/npu_device.cpp index 0f89ed76a65325af09e7b9922532026d23a7b523..0edd6092c1ec3b99b464d4cb7e5599526247b535 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.cpp +++ b/tf_adapter_2.x/npu_device/core/npu_device.cpp @@ -20,6 +20,7 @@ #include "tensorflow/c/eager/tfe_context_internal.h" #include "tensorflow/c/eager/tfe_op_internal.h" #include "tensorflow/c/eager/tfe_tensorhandle_internal.h" +#include "tensorflow/core/grappler/op_types.h" #include "npu_custom_kernel.h" #include "npu_device.h" @@ -36,6 +37,7 @@ #include "framework/omg/parser/parser_factory.h" using Format = ge::Format; +const static uint64_t kInvalidGeGraphId = -1; namespace { template @@ -72,6 +74,30 @@ size_t RemoveRedundantHcomControlEdges(tensorflow::Graph *graph) { return edges_to_remove.size(); } +bool IsGraphNeedLoop(const tensorflow::Graph *graph, tensorflow::Node **key) { + *key = nullptr; + for (auto node : graph->op_nodes()) { + if (node->IsWhileNode()) { + if (*key != nullptr) { + return false; + } + *key = node; + } + } + if (*key == nullptr) { + DLOG() << "Skip check as no while node in graph"; + return false; + } + size_t reserved_nums = 0; + const std::function &enter = [&reserved_nums](tensorflow::Node *node) { + if (node->IsOp()) { + reserved_nums++; + } + }; + tensorflow::ReverseDFSFrom(*graph, {*key}, enter, {}, {}, {}); + DLOG() << "Reserved nodes " << reserved_nums << " vs. totally " << graph->num_op_nodes(); + return reserved_nums == graph->num_op_nodes(); +} } // namespace void NpuDevice::CreateIteratorProvider(TFE_Context *context, const tensorflow::Tensor *tensor, @@ -127,14 +153,6 @@ std::string NpuDevice::CreateDevice(const char *name, int device_index, return "Failed init graph engine: create new session failed"; } - std::shared_ptr parser = - domi::ModelParserFactory::Instance()->CreateModelParser(domi::FrameworkType::TENSORFLOW); - if (parser == nullptr) { - return "Failed init graph engine: create tensorflow model parser failed"; - } - - std::unique_ptr status(TF_NewStatus(), TF_DeleteStatus); - *device = new (std::nothrow) NpuDevice(); if (*device == nullptr) { return "Failed create new npu device instance"; @@ -148,14 +166,14 @@ std::string NpuDevice::CreateDevice(const char *name, int device_index, } void NpuDevice::ReleaseResource() { - DLOG() << "Start cancel all uncompleted async call"; - CancellationManager()->StartCancel(); - std::vector> thread_guarder; for (auto &iterator_provider : iterator_providers_) { auto provider = iterator_provider.second; thread_guarder.emplace_back(std::async([provider]() { provider->Destroy(); })); } + + DLOG() << "Start cancel all uncompleted async call"; + CancellationManager()->StartCancel(); } void NpuDevice::DeleteDevice(void *device) { @@ -878,7 +896,6 @@ void NpuDevice::GetOrCreateSpec(TFE_Context *context, const char *op_name, const tensorflow::Status compat_status = ValidateOutput(op_name, data_types); if (!compat_status.ok()) { if (is_function_op) { - const static uint64_t kInvalidGeGraphId = -1; *spec = CacheFuncSpec(op_name, op_reg_data, ndef, kInvalidGeGraphId, {}, {}, {}, compat_status.error_message()); return; } else { @@ -944,25 +961,39 @@ void NpuDevice::GetOrCreateSpec(TFE_Context *context, const char *op_name, const *gdef.mutable_library() = fdef_lib; WriteTextProto(tensorflow::Env::Default(), "step_3_after_mark_shape_" + file_name_suffix, gdef); } - // 因为parser当前约定的附加属性不是匿名属性(非下划线开头,所以这里当前需要拷贝一份新图用于标记parser所需属性) - uint64_t graph_id = 0; - if (kCustomKernelEnabled) { - graph_id = AddGeGraph(context, std::string("tf_function_") + op_name, optimize_graph->ToGraphDefDebug(), s); - if (TF_GetCode(s) != TF_OK) return; - } - DLOG() << std::string("tf_function_") + op_name << " remained input index (0-" << num_inputs - 1 << ") -> " - << VecToString(remain_indexes); + DLOG() << op_name << " remained input index (0-" << num_inputs - 1 << ") -> " << VecToString(remain_indexes); auto lambda = [remain_indexes](int num_inputs, TFE_TensorHandle **inputs, std::vector &pruned) { for (auto index : remain_indexes) { pruned.push_back(inputs[index]); } }; + // 对于function节点,可以将resource的输入NPU兼容性作为缓存项目,校验输入是否被NPU支持,如果类型不支持,或者是CPU的Resouce类型,则不支持 // 如果是单算子,则不能缓存,需要在每次dev->Run的时候,校验单算子资源输入的兼容性 - *spec = - CacheFuncSpec(op_name, op_reg_data, ndef, graph_id, std::move(optimize_graph), lambda, dependent_host_resources, - ValidateInput(op_name, pruned_inputs.size(), pruned_inputs.data()).error_message()); + auto status = ValidateInput(op_name, pruned_inputs.size(), pruned_inputs.data()); + if (!status.ok()) { + *spec = CacheFuncSpec(op_name, op_reg_data, ndef, kInvalidGeGraphId, {}, {}, {}, status.error_message()); + } else { + uint64_t graph_id = kInvalidGeGraphId; + bool loop = false; + if (kCustomKernelEnabled) { + tensorflow::GraphDef loop_graph; + NPU_CTX_REQUIRES_OK(s, GetAutoLoopGraph(context, optimize_graph.get(), pruned_inputs.size(), + pruned_inputs.data(), loop, &loop_graph)); + if (loop) { + LOG(INFO) << "Trans graph " << op_name << " to auto loop graph succeed"; + if (kDumpExecutionDetail || kDumpGraph) { + WriteTextProto(tensorflow::Env::Default(), "LOOP." + std::string(op_name) + ".pbtxt", loop_graph); + } + } + graph_id = AddGeGraphInner(context, NextUUID(), op_name, loop_graph, loop, s); + if (TF_GetCode(s) != TF_OK) return; + } + *spec = CacheFuncSpec(op_name, op_reg_data, ndef, graph_id, std::move(optimize_graph), lambda, + dependent_host_resources, ""); + reinterpret_cast(spec->get())->SetNeedLoop(loop); + } return; } else { // 进行inferShape,输出可能是unknown shape,所以使用partial shape @@ -1264,6 +1295,124 @@ void NpuDevice::RunOp(TFE_Context *context, const npu::OpSpec *spec, int num_inp } // 计数-2 } +namespace { +tensorflow::Node *AddVarInitToGraph(TFE_Context *context, std::string name, tensorflow::Tensor tensor, + tensorflow::Graph *graph, TF_Status *status) { + tensorflow::Node *variable; + tensorflow::Node *value; + tensorflow::Node *assign_variable; + + NPU_CTX_REQUIRES_OK_RETURN(status, + tensorflow::NodeBuilder(name, "VarHandleOp") + .Attr("container", "") + .Attr("shared_name", name) + .Attr("dtype", tensor.dtype()) + .Attr("shape", tensor.shape()) + .Finalize(graph, &variable), + assign_variable); + NPU_CTX_REQUIRES_OK_RETURN(status, + tensorflow::NodeBuilder(name + "_v", "Const") + .Attr("value", tensor) + .Attr("dtype", tensor.dtype()) + .Finalize(graph, &value), + assign_variable); + NPU_CTX_REQUIRES_OK_RETURN(status, + tensorflow::NodeBuilder(name + "_op", "AssignVariableOp") + .Input(variable, 0) + .Input(value, 0) + .Attr("dtype", tensor.dtype()) + .Finalize(graph, &assign_variable), + assign_variable); + + AssembleOpDef(variable); + AssembleOpDef(value); + AssembleOpDef(assign_variable); + + AssembleOutputDesc(TensorShapes({kScalarShape}), {tensorflow::DT_RESOURCE}, variable); + AssembleOutputDesc(TensorShapes({tensor.shape()}), {tensor.dtype()}, value); + AssembleInputDesc(TensorShapes({kScalarShape, tensor.shape()}), {tensorflow::DT_RESOURCE, tensor.dtype()}, + assign_variable); + return assign_variable; +} +} // namespace + +void NpuDevice::SetNpuLoopSize(TFE_Context *context, int64_t loop, TF_Status *status) { + static std::atomic_bool initialized{false}; + static std::atomic_int64_t current_loop_size{1}; + static tensorflow::Status init_status = tensorflow::Status::OK(); + static std::uint64_t loop_var_graph_id = 0; + const static std::string kLoopVarName = "npu_runconfig/iterations_per_loop"; + + if (current_loop_size == loop) return; + + LOG(INFO) << "Set npu loop size to " << loop; + + if (!initialized.exchange(true)) { + tensorflow::Graph graph(tensorflow::OpRegistry::Global()); + AddVarInitToGraph(context, "npu_runconfig/loop_cond", tensorflow::Tensor(tensorflow::int64(0)), &graph, status); + if (TF_GetCode(status) != TF_OK) return; + AddVarInitToGraph(context, "npu_runconfig/one", tensorflow::Tensor(tensorflow::int64(1)), &graph, status); + if (TF_GetCode(status) != TF_OK) return; + AddVarInitToGraph(context, "npu_runconfig/zero", tensorflow::Tensor(tensorflow::int64(0)), &graph, status); + if (TF_GetCode(status) != TF_OK) return; + + RunGeGraphPin2CpuAnonymous(context, "set_npu_loop_conditions", graph.ToGraphDefDebug(), 0, nullptr, 0, nullptr, + status); + if (TF_GetCode(status) != TF_OK) return; + + tensorflow::Node *variable; + tensorflow::Node *arg; + tensorflow::Node *assign_variable; + + tensorflow::Graph graph2(tensorflow::OpRegistry::Global()); + + NPU_CTX_REQUIRES_OK(status, tensorflow::NodeBuilder(kLoopVarName, "VarHandleOp") + .Attr("container", "") + .Attr("shared_name", kLoopVarName) + .Attr("dtype", tensorflow::DT_INT64) + .Attr("shape", kScalarShape) + .Finalize(&graph2, &variable)); + NPU_CTX_REQUIRES_OK(status, tensorflow::NodeBuilder(kLoopVarName + "_v", "_Arg") + .Attr("T", tensorflow::DT_INT64) + .Attr("index", 0) + .Finalize(&graph2, &arg)); + NPU_CTX_REQUIRES_OK(status, tensorflow::NodeBuilder(kLoopVarName + "_op", "AssignVariableOp") + .Input(variable, 0) + .Input(arg, 0) + .Attr("dtype", tensorflow::DT_INT64) + .Finalize(&graph2, &assign_variable)); + + AssembleOpDef(variable); + AssembleOpDef(arg); + AssembleOpDef(assign_variable); + + AssembleOutputDesc(TensorShapes({kScalarShape}), {tensorflow::DT_RESOURCE}, variable); + AssembleOutputDesc(TensorShapes({kScalarShape}), {tensorflow::DT_INT64}, arg); + AssembleInputDesc(TensorShapes({kScalarShape, kScalarShape}), {tensorflow::DT_RESOURCE, tensorflow::DT_INT64}, + assign_variable); + + loop_var_graph_id = AddGeGraph(context, "set_loop_var", graph2.ToGraphDefDebug(), status); + init_status = status->status; + if (TF_GetCode(status) != TF_OK) return; + } + + status->status = init_status; + if (TF_GetCode(status) != TF_OK) return; + + std::vector inputs(1); + inputs[0] = + tensorflow::wrap(tensorflow::TensorHandle::CreateLocalHandle(tensorflow::Tensor(tensorflow::int64(loop - 1)))); + + RunGeGraphPin2Cpu(context, loop_var_graph_id, inputs.size(), inputs.data(), {}, 0, nullptr, status); + + if (TF_GetCode(status) == TF_OK) { + current_loop_size = loop; + } + for (auto handle : inputs) { + TFE_DeleteTensorHandle(handle); + } +} + void NpuDevice::RunGraph(TFE_Context *context, const npu::FuncSpec *spec, int tf_num_inputs, TFE_TensorHandle **tf_inputs, int *num_outputs, TFE_TensorHandle **outputs, TF_Status *status) { @@ -1296,15 +1445,12 @@ void NpuDevice::RunGraph(TFE_Context *context, const npu::FuncSpec *spec, int tf if (kCustomKernelEnabled) { // TODO:这里根据小循环策略修改值 - int64_t iterations_per_loop = 1; - if (!spec->DependentHostResources().empty()) { - for (auto node : spec->Graph()->op_nodes()) { - if (node->IsWhileNode()) { - iterations_per_loop = kGlobalLoopSize; - break; - } - } + int64_t iterations_per_loop = spec->NeedLoop() ? kGlobalLoopSize : 1; + if (iterations_per_loop > 1) { + SetNpuLoopSize(context, iterations_per_loop, status); + if (TF_GetCode(status) != TF_OK) return; } + for (const auto &resource : spec->DependentHostResources()) { LOG(INFO) << "Start consume iterator resource " << resource.second->Name() << " " << iterations_per_loop << " times"; @@ -1312,8 +1458,10 @@ void NpuDevice::RunGraph(TFE_Context *context, const npu::FuncSpec *spec, int tf NPU_CTX_REQUIRES_OK(status, npu::UnwrapTensor(tf_inputs[resource.first], &tensor)); // 注意,这个callback不能引用捕获,防止中途因为消费某个资源失败而导致coredump auto done = [resource, iterations_per_loop](const tensorflow::Status &s) { - LOG(INFO) << "Iterator resource " << resource.second->Name() << " consume " << iterations_per_loop - << " times done with status " << s.ToString(); + if (iterations_per_loop > 1 || !s.ok() || kDumpExecutionDetail) { + LOG(INFO) << "Iterator resource " << resource.second->Name() << " consume " << iterations_per_loop + << " times done with status " << s.ToString(); + } }; NPU_CTX_REQUIRES_OK(status, resource.second->ConsumeAsync(*tensor, iterations_per_loop, done)); } @@ -1321,7 +1469,9 @@ void NpuDevice::RunGraph(TFE_Context *context, const npu::FuncSpec *spec, int tf MaybeRebuildFuncSpecGraph(context, spec, status); if (TF_GetCode(status) != TF_OK) return; - LOG(INFO) << "Start run ge graph " << spec->GeGraphId() << " pin to cpu, loop size " << iterations_per_loop; + if (iterations_per_loop > 1 || kDumpExecutionDetail) { + LOG(INFO) << "Start run ge graph " << spec->GeGraphId() << " pin to cpu, loop size " << iterations_per_loop; + } npu::Timer timer("Graph engine run ", iterations_per_loop, " times for graph ", spec->GeGraphId()); timer.Start(); spec->SetBuilt(); @@ -1530,8 +1680,8 @@ void NpuDevice::RunGeGraphAsync(TFE_Context *context, uint64_t graph_id, int num ge_session_->RunGraphAsync(graph_id, ge_inputs, ge_callback)); } -uint64_t NpuDevice::AddGeGraph(TFE_Context *context, uint64_t graph_id, const std::string &name, - const tensorflow::GraphDef &def, TF_Status *status) { +uint64_t NpuDevice::AddGeGraphInner(TFE_Context *context, uint64_t graph_id, const std::string &name, + const tensorflow::GraphDef &def, bool loop, TF_Status *status) { auto ge_compute_graph = std::make_shared(name); std::shared_ptr parser = domi::ModelParserFactory::Instance()->CreateModelParser(domi::FrameworkType::TENSORFLOW); @@ -1580,16 +1730,78 @@ uint64_t NpuDevice::AddGeGraph(TFE_Context *context, uint64_t graph_id, const st parser->ParseProtoWithSubgraph(&def, request_subgraph, ge_compute_graph), graph_id); ge::Graph ge_graph = ge::GraphUtils::CreateGraphFromComputeGraph(ge_compute_graph); + + ge_graph.SetNeedIteration(loop); + NPU_CTX_REQUIRES_GE_OK_RETURN(status, "Graph engine Add graph", GeSession()->AddGraph(graph_id, ge_graph), graph_id); return graph_id; } +uint64_t NpuDevice::AddGeGraph(TFE_Context *context, uint64_t graph_id, const std::string &name, + const tensorflow::GraphDef &def, TF_Status *status) { + return AddGeGraphInner(context, graph_id, name, def, false, status); +} + uint64_t NpuDevice::AddGeGraph(TFE_Context *context, const std::string &name, const tensorflow::GraphDef &def, TF_Status *status) { uint64_t graph_id = NextUUID(); return AddGeGraph(context, graph_id, name, def, status); } +tensorflow::Status NpuDevice::GetAutoLoopGraph(TFE_Context *context, tensorflow::Graph *origin_graph, int num_inputs, + TFE_TensorHandle **inputs, bool &loop, tensorflow::GraphDef *def) { + tensorflow::FunctionLibraryDefinition *lib_def = npu::UnwrapCtx(context)->FuncLibDef(); + std::unique_ptr graph = std::make_unique(lib_def); + CopyGraph(*origin_graph, graph.get()); + + tensorflow::Node *key; + if (!IsGraphNeedLoop(graph.get(), &key)) { + loop = false; + graph->ToGraphDef(def); + return tensorflow::Status::OK(); + } + + loop = true; + + const auto fn_name = key->attrs().Find("body")->func().name(); + DLOG() << "Inline while body func " << fn_name << " for node " << key->name(); + auto builder = tensorflow::NodeBuilder(fn_name, fn_name, lib_def); + for (int i = 0; i < key->num_inputs(); i++) { + const tensorflow::Edge *edge; + NPU_REQUIRES_OK(key->input_edge(i, &edge)); + builder.Input(edge->src(), edge->src_output()); + } + for (auto edge : key->in_edges()) { + if (edge->IsControlEdge()) { + builder.ControlInput(edge->src()); + } + } + + tensorflow::Node *fn_node; + NPU_REQUIRES_OK(builder.Finalize(graph.get(), &fn_node)); + + graph->RemoveNode(key); + tensorflow::FixupSourceAndSinkEdges(graph.get()); + + tensorflow::ProcessFunctionLibraryRuntime *pflr = npu::UnwrapCtx(context)->pflr(); + tensorflow::FunctionLibraryRuntime *flr = pflr->GetFLR("/job:localhost/replica:0/task:0/device:CPU:0"); + + tensorflow::OptimizeGraph(flr, &graph); + + for (auto node : graph->op_nodes()) { + if (tensorflow::grappler::IsVariable(node->def())) { + if (node->attrs().Find("shared_name") != nullptr) { + DLOG() << "Change node " << node->name() << " name to " << node->attrs().Find("shared_name")->s(); + node->set_name(node->attrs().Find("shared_name")->s()); + } + } + } + + MarkGraphNodeInOutDesc(context, graph.get(), num_inputs, inputs); + graph->ToGraphDef(def); + return tensorflow::Status::OK(); +} + void NpuDevice::RemoveGeGraph(TFE_Context *context, uint64_t graph_id, TF_Status *status) { NPU_CTX_REQUIRES_GE_OK(status, "Graph engine Remove graph", GeSession()->RemoveGraph(graph_id)); } @@ -1678,8 +1890,7 @@ void NpuDevice::MaybeRebuildFuncSpecGraph(TFE_Context *context, const npu::FuncS LOG(INFO) << "Start rebuild ge graph " << spec->GeGraphId(); RemoveGeGraph(context, spec->GeGraphId(), status); if (TF_GetCode(status) != TF_OK) return; - AddGeGraph(context, spec->GeGraphId(), std::string("tf_function_") + spec->Op(), spec->Graph()->ToGraphDefDebug(), - status); + AddGeGraph(context, spec->GeGraphId(), spec->Op(), spec->Graph()->ToGraphDefDebug(), status); } } diff --git a/tf_adapter_2.x/npu_device/core/npu_device.h b/tf_adapter_2.x/npu_device/core/npu_device.h index ab6d317ab62e57439482c8b648ba9e922fed3cd3..0ea6431a48b89e65fb287994b40eeed1e6c7d7bd 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.h +++ b/tf_adapter_2.x/npu_device/core/npu_device.h @@ -89,6 +89,8 @@ class NpuDevice { void RunOp(TFE_Context *context, const npu::OpSpec *spec, int num_inputs, TFE_TensorHandle **inputs, int *num_outputs, TFE_TensorHandle **outputs, TF_Status *status); + void SetNpuLoopSize(TFE_Context *context, int64_t loop, TF_Status *status); + void RunGraph(TFE_Context *context, const npu::FuncSpec *spec, int num_inputs, TFE_TensorHandle **inputs, int *num_outputs, TFE_TensorHandle **outputs, TF_Status *status); @@ -110,6 +112,12 @@ class NpuDevice { uint64_t AddGeGraph(TFE_Context *context, uint64_t graph_id, const std::string &name, const tensorflow::GraphDef &def, TF_Status *status); + tensorflow::Status GetAutoLoopGraph(TFE_Context *context, tensorflow::Graph *graph, int num_inputs, + TFE_TensorHandle **inputs, bool &loop, tensorflow::GraphDef *def); + + uint64_t AddGeGraphInner(TFE_Context *context, uint64_t graph_id, const std::string &name, + const tensorflow::GraphDef &def, bool loop, TF_Status *status); + void RemoveGeGraph(TFE_Context *context, uint64_t graph_id, TF_Status *status); void RunGeGraph(TFE_Context *context, uint64_t graph_id, int num_inputs, TFE_TensorHandle **inputs, bool pin_to_npu, diff --git a/tf_adapter_2.x/npu_device/core/npu_env.h b/tf_adapter_2.x/npu_device/core/npu_env.h index 5435faef9e21c861da20e5bfc5f97f7cf55f9fbd..9f3f8bfc862b605dd86f1092f62a529fdea92bc5 100644 --- a/tf_adapter_2.x/npu_device/core/npu_env.h +++ b/tf_adapter_2.x/npu_device/core/npu_env.h @@ -44,4 +44,10 @@ const static bool kPerfEnabled = []() -> bool { return perf_enabled; }(); +const static bool kAutoLoopEnabled = []() -> bool { + bool loop_enabled = false; + tensorflow::ReadBoolFromEnvVar("NPU_EXPERIMENTAL_AUTO_LOOP", false, &loop_enabled); + return loop_enabled; +}(); + #endif // TENSORFLOW_NPU_ENV_H