From dfd3efc3c44120c77404cf842618ec040dfc70d4 Mon Sep 17 00:00:00 2001 From: guopeian Date: Wed, 22 May 2024 10:51:22 +0800 Subject: [PATCH] geop --- tf_adapter/interface_spec/api_npu_config.pyh | 2 +- tf_adapter/kernels/geop_npu.cc | 730 +++++++++--------- tf_adapter/kernels/geop_npu.h | 50 +- .../npu_bridge/estimator/npu/npu_config.py | 5 +- .../npu_bridge/estimator/npu/npu_estimator.py | 2 + .../ge_runner/src/callback_executor.cc | 86 +++ .../depends/ge_runner/src/callback_executor.h | 54 ++ .../depends/ge_runner/src/ge_runner_stub.cc | 26 +- .../tests/depends/ge_runner/src/ge_stub.h | 1 + .../st/kernels/testcase/geop_npu_test.cc | 27 +- .../ut/kernels/testcase/geop_npu_test.cc | 27 +- tf_adapter/util/npu_attrs.cc | 23 +- tf_adapter_2.x/npu_device/core/npu_device.cpp | 1 + 13 files changed, 637 insertions(+), 397 deletions(-) create mode 100644 tf_adapter/tests/depends/ge_runner/src/callback_executor.cc create mode 100644 tf_adapter/tests/depends/ge_runner/src/callback_executor.h diff --git a/tf_adapter/interface_spec/api_npu_config.pyh b/tf_adapter/interface_spec/api_npu_config.pyh index e696158aa..eab27fc3e 100644 --- a/tf_adapter/interface_spec/api_npu_config.pyh +++ b/tf_adapter/interface_spec/api_npu_config.pyh @@ -22,7 +22,7 @@ class NPURunConfig(run_config_lib.RunConfig): event_sync_timeout=-1, external_weight=False, es_cluster_config=None, deterministic=0, frozen_variable=False, variable_placement="Device", jit_compile="auto", precision_mode_v2=None, ac_parallel_enable=None, quant_dumpable=None, input_fusion_size=131072, compile_dynamic_mode=None, - execute_times=-1): + execute_times=-1, graph_max_parallel_model_num=1): class ProfilingConfig(): def __init__(self, enable_profiling=False, profiling_options=None): diff --git a/tf_adapter/kernels/geop_npu.cc b/tf_adapter/kernels/geop_npu.cc index 537994554..30b794aad 100644 --- a/tf_adapter/kernels/geop_npu.cc +++ b/tf_adapter/kernels/geop_npu.cc @@ -1,5 +1,5 @@ /* - * Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved. + * Copyright (c) Huawei Technologies Co., Ltd. 2019-2025. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -104,7 +104,6 @@ const float kMaxStepRatio = 0.9; const float kDefaultLossRatio = 1.05; const float kMinLossRatio = 1.01; const float kMaxLossRatio = 1.5; - const std::map fast_value_string_2_eunm = {{"fast", GeOp::FastValue::kfast}, {"fast1", GeOp::FastValue::kfast1}}; @@ -333,6 +332,14 @@ void SetReuseOptions(const std::string &key, int32_t num, const std::mapsecond; } } +class ExitCallbackGuarder { + public: + explicit ExitCallbackGuarder(std::function done) : done_(done) {} + ~ExitCallbackGuarder() { done_(); } + + private: + std::function done_; +}; } // namespace std::string CurrentTimeInStr() { @@ -354,10 +361,10 @@ const int kFatalSleepTime = 3000; const std::string kAllReduce = "HcomAllReduce"; GeOp::GeOp(OpKernelConstruction *ctx) - : AsyncOpKernel(ctx), init_flag_(false), build_flag_(false), add_graph_flag_(false), sess_init_flag_(false), - compute_graph_empty_(false), is_input_convert_(false), data_format_(""), graph_id_(0), - is_initialized_graph_(false), need_iteration_(false), tf_session_(""), ge_session_(nullptr), job_type_(""), - is_host_graph_(false), handle_(nullptr), need_compile_graph_first_(false), tuned_flag_(ATOMIC_FLAG_INIT), + : AsyncOpKernel(ctx), init_flag_(false), sess_init_flag_(false), + is_input_convert_(false), data_format_(""), graph_id_(0), + is_initialized_graph_(false), is_empty_graph_(false), need_iteration_(false), tf_session_(""), ge_session_(nullptr), job_type_(""), + is_host_graph_(false), handle_(nullptr), tuned_flag_(ATOMIC_FLAG_INIT), jit_compile_("2"), is_dynamic_input_(false), session_id_(0), aoe_initialize_(nullptr), aoe_finalize_(nullptr), aoe_create_session_(nullptr), aoe_destroy_session_(nullptr), aoe_set_gesession_(nullptr), aoe_set_dependgraphs_(nullptr), aoe_set_tuninggraph_(nullptr), aoe_tuning_graph_(nullptr), @@ -429,6 +436,8 @@ void GeOp::Initialize(OpKernelConstruction *ctx) { OP_REQUIRES_OK(ctx, ctx->GetAttr("_NpuOptimizer", &sess_config)); std::map pass_options = NpuAttrs::GetPassOptions(ctx); iteration_per_loop_ = std::atoi(pass_options["iterations_per_loop"].c_str()); + graph_max_parallel_model_num_ = std::max(std::atoi(pass_options["graph_max_parallel_model_num"].c_str()), 1); + ADP_LOG(INFO) << "graph_max_parallel_model_num :" << graph_max_parallel_model_num_; job_type_ = pass_options["job"]; mix_compile_mode_ = pass_options["mix_compile_mode"]; accelerate_train_mode_ = pass_options["accelerate_train_mode"]; @@ -510,7 +519,7 @@ void GeOp::Finalize() { mutex_lock lock{mu_}; uint32_t graph_id = -1; if (sess_init_flag_ || !tf_session_.empty()) { - bool ret = DecrementGraphIdCount(tf_session_, graph_id); + bool ret = DecrementGraphIdCount(graph_id); if (!ret) { ADP_LOG(ERROR) << "tf session " << tf_session_ << " sub graph id failed."; LOG(ERROR) << "tf session " << tf_session_ << " sub graph id failed."; @@ -808,44 +817,6 @@ bool GeOp::IsGraphNeedRebuild(const uint32_t cache_graph_id) { return ((need_recover_precision_mode_) || (ge_session_->IsGraphNeedRebuild(cache_graph_id))); } -int32_t GeOp::InitRebuildFlag(uint32_t cache_graph_id) { - if (!build_flag_) { - ADP_LOG(INFO) << "[GEOP] tf session " << tf_session_ << ", graph id: " << cache_graph_id - << " does not build yet, no need to check rebuild"; - return 0; - } - if (compute_graph_empty_) { - ADP_LOG(INFO) << "[GEOP] tf session " << tf_session_ << ", graph id: " << cache_graph_id - << " is empty, no need to check rebuild"; - return 0; - } - if (ge_session_ == nullptr) { - ADP_LOG(ERROR) << "[GEOP] GE session is nullptr"; - LOG(ERROR) << "[GEOP] GE session is nullptr"; - return -1; - } - if (!IsGraphNeedRebuild(cache_graph_id)) { - ADP_LOG(INFO) << "[GEOP] tf session " << tf_session_ << ", graph id: " << cache_graph_id << " no need to rebuild"; - return 0; - } - ADP_LOG(INFO) << "[GEOP] The graph need rebuild, graph id " << cache_graph_id << " ,need_change_precision_mode: " - << need_recover_precision_mode_; - - // The graph need to rebuild, remove it from GE first. - ADP_LOG(INFO) << "[GEOP] tf session: " << tf_session_ << ", graph id: " << cache_graph_id; - auto ret = ge_session_->RemoveGraph(cache_graph_id); - if (ret != ge::SUCCESS) { - ADP_LOG(ERROR) << "[GEOP] Failed to remove graph " << cache_graph_id << " from ge, error code " << ret; - LOG(ERROR) << "[GEOP] Failed to remove graph " << cache_graph_id << " from ge, error code " << ret << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - return -1; - } - - build_flag_ = false; - compute_graph_empty_ = false; - return 0; -} - bool GeOp::IncrementGraphIdCount(uint32_t &graph_id) { if (tf_session_.empty()) { ADP_LOG(ERROR) << "[GEOP] Add graph id failed, tf session is empty."; @@ -863,7 +834,7 @@ bool GeOp::IncrementGraphIdCount(uint32_t &graph_id) { return true; } -bool GeOp::DecrementGraphIdCount(const std::string &tf_session, uint32_t &graph_id) { +bool GeOp::DecrementGraphIdCount(uint32_t &graph_id) { if (tf_session_.empty()) { ADP_LOG(ERROR) << "[GEOP] Sub graph id failed, tf session is empty."; LOG(ERROR) << "[GEOP] Sub graph id failed, tf session is empty."; @@ -881,8 +852,8 @@ bool GeOp::DecrementGraphIdCount(const std::string &tf_session, uint32_t &graph_ graph_id = it->second; return true; } - ADP_LOG(ERROR) << "[GEOP] Sub graph id failed, can not find tf session " << tf_session; - LOG(ERROR) << "[GEOP] Sub graph id failed, can not find tf session " << tf_session; + ADP_LOG(ERROR) << "[GEOP] Sub graph id failed, can not find tf session " << tf_session_; + LOG(ERROR) << "[GEOP] Sub graph id failed, can not find tf session " << tf_session_; return false; } @@ -893,7 +864,7 @@ void GeOp::ClearGraphIdCount() { } } -void GeOp::GetExecGraphId(uint32_t &cache_graph_id, std::vector input_shapes) { +void GeOp::GetExecGraphId(uint32_t &cache_graph_id, const std::vector &input_shapes) { size_t num = cache_graphs_.size(); if (cache_graphs_.find(input_shapes) != cache_graphs_.end()) { auto iter = std::find_if(graph_counts_.begin(), graph_counts_.end(), @@ -904,7 +875,9 @@ void GeOp::GetExecGraphId(uint32_t &cache_graph_id, std::vector inp iter->second += 1; } cache_graph_id = cache_graphs_[input_shapes]; - build_flag_ = true; + ADP_LOG(INFO) << "Set graph_status to CompileDone when get exec graphid, graph_id: " << cache_graph_id; + graph_handler_.status = CompileDone; + graph_handler_.cv.notify_all(); } else { ADP_LOG(INFO) << "[GEOP] This is a dynamic shape neural network, we recommend setting jit_compile to false"; if (num >= kMaxCacheNum) { @@ -922,8 +895,10 @@ void GeOp::GetExecGraphId(uint32_t &cache_graph_id, std::vector inp } else { cache_graph_id = graph_id_ + num; } - build_flag_ = false; - compute_graph_empty_ = false; + ADP_LOG(INFO) << "Set graph_status to Init when has no cache graph, graph_id: " << cache_graph_id; + is_empty_graph_ = false; + graph_handler_.status = Init; + graph_handler_.cv.notify_all(); } } @@ -954,8 +929,8 @@ PartialTensorShape GeOp::MakeCompatShape(const PartialTensorShape &a, const Part return MakeUnknownShape(b.dims()); } -bool GeOp::MaybeUpdateShape(OpKernelContext *const ctx) { - bool updated = false; +void GeOp::InitGraphShape(OpKernelContext *const ctx) { + mutex_lock lock{graph_handler_.graph_mu}; for (size_t i = 0UL; i < static_cast(ctx->num_inputs()); i++) { auto &shape = input_shapes_vec_[i]; auto &value_shape = ctx->input(static_cast(i)).shape(); @@ -966,14 +941,20 @@ bool GeOp::MaybeUpdateShape(OpKernelContext *const ctx) { } else { shape = value_shape; } - updated = true; ADP_LOG(INFO) << "Init input " << i << " shape to " << shape.value().DebugString(); - continue; } + } +} + +bool GeOp::MaybeUpdateShape(OpKernelContext *const ctx) { + bool updated = false; + for (size_t i = 0UL; i < static_cast(ctx->num_inputs()); i++) { + auto &shape = input_shapes_vec_[i]; + auto &value_shape = ctx->input(static_cast(i)).shape(); if (!shape.value().IsCompatibleWith(value_shape)) { - updated = true; ADP_LOG(INFO) << "Compat input " << i << " shape " << shape.value().DebugString() << " vs. " << value_shape.DebugString(); + updated = true; if ((jit_compile_ == "1") && (compile_dynamic_mode_ != "1")) { shape = value_shape; ADP_LOG(WARNING) << "Dynamic shape, recommended to configure jit_compile value to false or auto"; @@ -1004,12 +985,9 @@ Status GeOp::CreateGeSession() { ADP_LOG(INFO) << "[GePlugin] Initialize ge success."; first = false; } - if (!sess_init_flag_) { - mutex_lock lock{mu_}; - if (!SessionManager::GetInstance().GetOrCreateGeSession(tf_session_, ge_session_, sess_options_) || - tf_session_.empty() || ge_session_ == nullptr) { - return errors::Internal("Get ge session failed."); - } + if (!SessionManager::GetInstance().GetOrCreateGeSession(tf_session_, ge_session_, sess_options_) || + tf_session_.empty() || ge_session_ == nullptr) { + return errors::Internal("Get ge session failed."); } sess_init_flag_ = true; ADP_LOG(INFO) << "[GEOP] tf session: " << tf_session_ << " get ge session success."; @@ -1055,323 +1033,379 @@ PartialTensorShape GeOp::MakeUnknownShape(const int32_t &size) const { return status.ok() ? out_shape : kUnknownRankShape; } -void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { - // ctx is not nullptr - OP_REQUIRES_ASYNC(ctx, init_flag_, errors::InvalidArgument("GeOp not Initialize success."), done); - if (!sess_init_flag_) { - if (job_type_ != "localhost") { // in ps mode : ctx->session_handle() is empty - tf_session_ = "ps_worker_session"; - ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " when in ps mode."; - } - if (tf_session_.empty()) { - tf_session_ = ctx->session_handle(); - ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " from session handle."; - } - OP_REQUIRES_ASYNC(ctx, IncrementGraphIdCount(graph_id_), errors::Internal("Get ge session failed."), done); +Status GeOp::ParserGraph(OpKernelContext *ctx, const std::vector &input_vec) { + // Get Graph + auto func_lib = ctx->function_library(); + if (func_lib == nullptr) { + return errors::Internal("function library is nullptr"); + } + FunctionLibraryDefinition *flib_def = + const_cast(func_lib->GetFunctionLibraryDefinition()); + if (flib_def == nullptr) { + return errors::Internal("flib_def is nullptr"); + } + // Build GraphDef from FunctionDef + GraphDef ori_graph_def; + bool is_allreduce = false; + auto ret = BuildGraphDef(*flib_def, input_vec, ori_graph_def, is_initialized_graph_, is_allreduce); + if (!ret.ok()) { + return ret; + } + if (kDumpGraph) { + const std::string pbtxt_path = GetDumpPath() + "TF_" + ctx->op_kernel().name().c_str() + ".pbtxt"; + (void)WriteTextProto(Env::Default(), pbtxt_path, ori_graph_def); + } + ADP_LOG(INFO) << "[GEOP] TFadpter process graph success, GE parser begin, kernel_name: " << ctx->op_kernel().name() + << " , tf session: " << tf_session_; + const std::string compute_graph_name = "ge_default_" + CurrentTimeInStr(); + graph_handler_.graph = std::make_shared(compute_graph_name.c_str()); + if (graph_handler_.graph == nullptr) { + return errors::Internal("compute graph is nullptr"); + } + // parser, tensorflow graph to ge graph + ret = DoGraphParser(graph_handler_.graph, flib_def, ori_graph_def); + if (!ret.ok()) { + return ret; + } + ADP_LOG(INFO) << "[GEOP] Tensorflow graph parse to ge graph success, kernel_name: " << ctx->op_kernel().name() + << ", tf session: " << tf_session_ + << ", iteration_per_loop: " << iteration_per_loop_ << + ", need iteration: " << need_iteration_; + return SetGraphOptions(ctx); +} - ADP_LOG(INFO) << "[GEOP] Node name: " << ctx->op_kernel().name() << " , tf session: " << tf_session_; - if (!init_options_["ge.jobType"].empty() && !init_options_["ge.tuningPath"].empty()) { - uint32_t device_id = 0; - OP_REQUIRES_OK_ASYNC(ctx, GetEnvDeviceID(device_id), done); - ADP_LOG(INFO) << "[GEOP] in tuning func, aoe_mode:" << init_options_["ge.jobType"] - << ", work_path:" << init_options_["ge.tuningPath"] - << ", distribute_config:" << init_options_["distribute_config"]; - tune_options_.insert(init_options_.cbegin(), init_options_.cend()); - tune_options_.insert({"devices", std::to_string(device_id)}); - tune_options_.insert(sess_options_.cbegin(), sess_options_.cend()); - tune_options_.insert({"work_path", init_options_["ge.tuningPath"]}); - tune_options_.insert({"job_type", init_options_["ge.jobType"]}); - // aoe ini - if (!tuned_initialize_flag_) { - std::map global_options; - global_options.insert( - {ge::AscendString("work_path"), ge::AscendString(init_options_["ge.tuningPath"].c_str())}); - global_options.insert({ge::AscendString("job_type"), ge::AscendString(init_options_["ge.jobType"].c_str())}); - global_options.insert({ge::AscendString("ge.resourceConfigPath"), - ge::AscendString(sess_options_["ge.resourceConfigPath"].c_str())}); - AoeStatus init_ret = (*aoe_initialize_)(global_options); - OP_REQUIRES_ASYNC(ctx, init_ret == Aoe::AOE_SUCCESS, - errors::Internal("[GEOP] exec aoe initialize func failed[", init_ret, "]."), done); - tuned_initialize_flag_ = true; - } - } +Status GeOp::AddGraph(OpKernelContext *ctx, const uint32_t &graph_id) { + // call ge session addGraph api + auto graph_options = graph_options_; + if (is_aoe_) { + graph_options["ge.buildMode"] = "normal"; + } + if ((is_dynamic_getnext_ != "1") && (iteration_per_loop_ <= 1)) { + SetReuseOptions("ge.exec.inputReuseMemIndexes", ctx->num_inputs(), + sess_options_, init_options_, graph_options); + } + SetReuseOptions("ge.exec.outputReuseMemIndexes", + ctx->num_outputs(), sess_options_, init_options_, graph_options); + ADP_LOG(EVENT) << "[GEOP] call ge session add graph jit_compile: " << jit_compile_; + graph_options["ge.exec.graphIOMemAllocMode"] = "ByGE"; + const auto graph_option_ascend_string = ChangeStringToAscendString(graph_options); + ADP_LOG(INFO) << "Graph options: "; + NpuAttrs::LogOptions(graph_options); + ge::Graph ge_graph = ge::GraphUtilsEx::CreateGraphFromComputeGraph(graph_handler_.graph); + if (iteration_per_loop_ > 1) { + ge_graph.SetNeedIteration(need_iteration_); } - // convert input to const - OP_REQUIRES_OK_ASYNC(ctx, GraphInputConvertToConst(ctx), done); - std::string geop_name = ctx->op_kernel().name(); - uint32_t num_inputs = static_cast(ctx->num_inputs()); - ADP_LOG(INFO) << "[GEOP] Begin GeOp::ComputeAsync" - << ", kernel_name:" << geop_name << ", num_inputs:" << num_inputs - << ", num_outputs:" << ctx->num_outputs(); - int64 startTime = InferShapeUtil::GetCurrentTimestap(); - int64 endTime = 0; + auto status = ge_session_->AddGraph(graph_id, ge_graph, graph_option_ascend_string); + std::stringstream ss; + if (status != ge::SUCCESS) { + std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); + ss << "[GEOP] call ge session add graph failed, kernel: " << ctx->op_kernel().name() + << ", tf session: " << tf_session_ + << ", graph id: " << graph_id << std::endl + << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + return errors::Internal(ss.str()); + } + ADP_LOG(INFO) << "[GEOP] Add graph to ge session success, kernel_name: " << ctx->op_kernel().name() + << ", tf session: " << tf_session_ << ", graph id: " << graph_id; + return Status::OK(); +} - // To be compatible with old versions, we should check dynamic_input_ and dynamic_config - bool is_set_dynamic_config = IsDynamicConfig(); - if (dynamic_input_ != "1" && !is_set_dynamic_config) { - bool shape_changed = MaybeUpdateShape(ctx); - if (build_flag_ && shape_changed) { - ge::Status status = ge_session_->RemoveGraph(graph_id_); - if (status != ge::SUCCESS) { - ADP_LOG(WARNING) << "[GEOP] GE remove graph failed, ret : " << ToString(status) << ", graph_id: " << graph_id_; - } - build_flag_ = false; - } +Status GeOp::BuildGraph(const uint32_t &graph_id, const std::vector &inputs) { + ge::Status build_graph_status = ge_session_->BuildGraph(graph_id, inputs); + std::stringstream ss; + if (build_graph_status != ge::SUCCESS) { + ss << "[GEOP] GE session build graph failed, domi_ret : " << build_graph_status << std::endl + << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + return errors::Internal(ss.str()); } + LOG(INFO) << "The model has been compiled on the Ascend AI processor, current graph id is: " << graph_id; + return Status::OK(); +} - std::vector input_vec; - std::vector input_shapes; - std::vector inputs; - OP_REQUIRES_OK_ASYNC(ctx, (BuildInputTensorInfo(ctx, input_vec, input_shapes, inputs)), done); +Status GeOp::RunGraph(OpKernelContext *ctx, const uint32_t &graph_id, + const std::vector &inputs, + ge::RunAsyncCallback callback) { + // call ge session runGraphAsync api + ADP_LOG(INFO) << "[GEOP] Call ge session RunGraphAsync, kernel_name: " + << ctx->op_kernel().name() << ", tf session: " << tf_session_ + << ", graph id: " << graph_id; + ge::Status run_graph_status = ge_session_->RunGraphAsync(graph_id, inputs, callback); + std::stringstream ss; + if (run_graph_status != ge::SUCCESS) { + std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); + ss << "[GEOP] call ge session RunGraphAsync Failed, kernel:" << ctx->op_kernel().name() + << ", tf session: " << tf_session_ + << ", graph id: " << graph_id << std::endl + << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + return errors::Internal(ss.str()); + } + ADP_LOG(INFO) << "End RunGraph run_num: " << graph_handler_.graph_run_num; + graph_handler_.graph_run_num++; + return Status::OK(); +} - // if input shapes changed, cache graphs - uint32_t cache_graph_id = graph_id_; - bool is_lazy_recompile_mode = (dynamic_input_ == "1") && (dynamic_graph_execute_mode_ == "lazy_recompile"); - ADP_LOG(INFO) << "is_set_dynamic_config: " << is_set_dynamic_config - << " is_aoe_: " << is_aoe_ - << " is_lazy_recompile_mode: " << is_lazy_recompile_mode; +Status GeOp::SetGraphOptions(OpKernelContext *ctx) { + // convert to ge::graph + if (graph_options_.count("input_format") != 0) { + ADP_LOG(INFO) << "graph_options_[\"input_format\"] = " << graph_options_["input_format"]; + } - if (is_aoe_) { - if (is_set_dynamic_config) { - ADP_LOG(ERROR) << "dynamic input config can not use with mstuning."; - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("dynamic input config can not use with mstuning."), done); - return; - } - auto input_vec_aoe = input_vec; - if (RunTuning(input_vec_aoe, inputs, ctx) != 0) { - ADP_LOG(ERROR) << "RunTuning fail."; - std::stringstream ss; - ss << std::endl << ge::GEGetErrorMsgV2().GetString(); - OP_REQUIRES_ASYNC(ctx, false, errors::Internal(ss.str()), done); - } - if (InitRebuildFlag(cache_graph_id) != 0) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("Failed to check rebuild flag"), done); - return; - } - ADP_LOG(INFO) << geop_name << " RunTuning finish."; - } else if (is_set_dynamic_config) { - if (InitRebuildFlag(cache_graph_id) != 0) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("Failed to check rebuild flag"), done); - return; + if (iteration_per_loop_ > 1) { + graph_options_["iterations_per_loop"] = std::to_string(iteration_per_loop_); + } + + const auto cahce_option_iter = sess_options_.find("ge.graph_compiler_cache_dir"); + if (cahce_option_iter != sess_options_.cend() && !cahce_option_iter->second.empty()) { + graph_options_["ge.graph_key"] = ctx->op_kernel().name(); + } + + if (is_host_graph_) { + ADP_LOG(INFO) << "[GEOP] set graph option."; + graph_options_["ge.exec.placement"] = "HOST"; + } + graph_options_["ge.shape_generalized_build_mode"] = "shape_precise"; + if (!recompute_mode_.empty()) { + graph_options_["ge.recompute"] = recompute_mode_; + } + if (!max_key_num_.empty()) { + graph_options_["ge.max_key_num"] = max_key_num_; + } + if (!embedding_dim_.empty()) { + graph_options_["ge.embedding_dim"] = embedding_dim_; + } + if (!use_counter_filter_.empty()) { + graph_options_["ge.use_counter_filter"] = use_counter_filter_; + } + if (!padding_key_.empty()) { + graph_options_["ge.padding_key"] = padding_key_; + } + if (!embedding_flags_.empty()) { + graph_options_["ge.embedding_flags"] = embedding_flags_; + } + SetDynamicInput(); + graph_options_["ge.exec.isVarInitGraph"] = is_var_init_graph_; + graph_options_["ge.jit_compile"] = jit_compile_; + graph_options_["ge.exec.overflow"] = "1"; + graph_options_["ge.graphLevelSat"] = (mix_compile_mode_ == "0") ? "1" : "0"; + return DoAccelerateTrain(); +} + +Status GeOp::CompileGraph(OpKernelContext *ctx, const std::vector &input_vec, + const std::vector &inputs, + const uint32_t &graph_id, + const std::vector &input_shapes) { + auto ret = ParserGraph(ctx, input_vec); + if (!ret.ok()) { + return ret; + } + /* if graph is init verify graph, return */ + if (is_initialized_graph_) { + Tensor initialized_tensor(ctx->expected_output_dtype(0), TensorShape({0})); + ctx->set_output(0, initialized_tensor); + ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is initialize, kernel_name:" + << ctx->op_kernel().name() << ", ret_status:" << ToString(ge::SUCCESS) + << " , tf session: " << tf_session_ << " ,graph id: " << graph_id; + return Status::OK(); + } + if (graph_handler_.graph->GetAllNodesSize() == 0UL) { + ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is empty, kernel_name:" + << ctx->op_kernel().name() + << ", ret_status:" << ToString(ge::SUCCESS) << " , tf session: " << tf_session_ + << " ,graph id: " << graph_id; + is_empty_graph_ = true; + return Status::OK(); + } + for (uint32_t i = 0U; i < graph_max_parallel_model_num_; i++) { + ret = AddGraph(ctx, graph_id); + if (!ret.ok()) { + return ret; } - } else { + } + + ret = BuildGraph(graph_id, inputs); + if (!ret.ok()) { + return ret; + } + return Status::OK(); +} + +Status GeOp::CompileAndRunGraph(OpKernelContext *ctx, + const std::vector &input_vec, + const std::vector &inputs, + const std::vector &input_shapes, + ge::RunAsyncCallback callback) { + mutex_lock lock{graph_handler_.graph_mu}; + // 当其中一个线程处于compiling状态时,其他线程需要在此处wait,不能直接去编译 + while (graph_handler_.status == Compiling) { + ADP_LOG(INFO) << "Compiling wait, graph_status: " << graph_handler_.status; + graph_handler_.cv.wait(lock); + } + uint32_t cache_graph_id = graph_id_; + if (IsLazyCompile()) { // in dynamic input mode, cache graphs. - if (is_lazy_recompile_mode) { - GetExecGraphId(cache_graph_id, input_shapes); - } - if (InitRebuildFlag(cache_graph_id) != 0) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("Failed to check rebuild flag"), done); - return; - } + GetExecGraphId(cache_graph_id, input_shapes); } - if (!build_flag_) { - // Get Graph - OP_REQUIRES_ASYNC(ctx, ctx->function_library() != nullptr, errors::Internal("function library is nullptr"), done); - FunctionLibraryDefinition *flib_def = - const_cast(ctx->function_library()->GetFunctionLibraryDefinition()); - OP_REQUIRES_ASYNC(ctx, flib_def != nullptr, errors::Internal("flib_def is nullptr"), done); - - // Build GraphDef from FunctionDef - GraphDef ori_graph_def; - bool is_allreduce = false; - OP_REQUIRES_OK_ASYNC(ctx, BuildGraphDef(*flib_def, input_vec, ori_graph_def, is_initialized_graph_, is_allreduce), - done); - - /* if graph is init verify graph, return */ - if (this->is_initialized_graph_) { - Tensor initialized_tensor(ctx->expected_output_dtype(0), TensorShape({0})); - ctx->set_output(0, initialized_tensor); - done(); - return; - } - if (kDumpGraph) { - const std::string pbtxt_path = GetDumpPath() + "TF_" + geop_name.c_str() + ".pbtxt"; - (void) WriteTextProto(Env::Default(), pbtxt_path, ori_graph_def); - } - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(EVENT) << "[GEOP] In GEOP computeAsync, kernel_name: " << geop_name << " ,TFadapter cost time: [" - << ((endTime - startTime) / kMicrosToMillis) << " ms]."; - ADP_LOG(INFO) << "[GEOP] TFadpter process graph success, GE parser begin, kernel_name: " << geop_name - << " , tf session: " << tf_session_ << " , graph id: " << cache_graph_id; - ge::ComputeGraphPtr compute_graph = nullptr; - try { - const std::string compute_graph_name = "ge_default_" + CurrentTimeInStr(); - compute_graph = std::make_shared(compute_graph_name.c_str()); - } catch (...) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("make shared failed"), done); - } - OP_REQUIRES_ASYNC(ctx, compute_graph != nullptr, errors::InvalidArgument("create ComputeGraph failed"), done); - // parser, tensorflow graph to ge graph - OP_REQUIRES_OK_ASYNC(ctx, DoGraphParser(compute_graph, flib_def, ori_graph_def), done); - ADP_LOG(INFO) << "[GEOP] Tensorflow graph parse to ge graph success, kernel_name: " << geop_name - << ", tf session: " << tf_session_ << " , graph id: " << cache_graph_id - << ", iteration_per_loop: " << iteration_per_loop_ << ", need iteration: " << this->need_iteration_; - size_t nodes = compute_graph->GetAllNodesSize(); - if (nodes == 0) { - build_flag_ = true; - compute_graph_empty_ = true; - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is empty, kernel_name:" << geop_name - << ", ret_status:" << ToString(ge::SUCCESS) << " , tf session: " << tf_session_ - << " ,graph id: " << cache_graph_id << " [" << ((endTime - startTime) / kMicrosToMillis) << " ms]"; - done(); - return; - } - // convert to ge::graph - if (graph_options_.count("input_format") != 0) { - ADP_LOG(INFO) << "graph_options_[\"input_format\"] = " << graph_options_["input_format"]; - } - ge::Graph ge_graph = ge::GraphUtilsEx::CreateGraphFromComputeGraph(compute_graph); - if (iteration_per_loop_ > 1) { - ge_graph.SetNeedIteration(this->need_iteration_); - graph_options_["iterations_per_loop"] = std::to_string(iteration_per_loop_); - } + bool shape_changed = false; + if ((!is_dynamic_input_) && (!IsDynamicConfig())) { + shape_changed = MaybeUpdateShape(ctx); + } - const auto cahce_option_iter = sess_options_.find("ge.graph_compiler_cache_dir"); - if (cahce_option_iter != sess_options_.cend() && !cahce_option_iter->second.empty()) { - graph_options_["ge.graph_key"] = geop_name; + // To be compatible with old versions, we should check dynamic_input_ and dynamic_config + if ((graph_handler_.status != Init) && (shape_changed || IsGraphNeedRebuild(cache_graph_id))) { + ADP_LOG(INFO) << "[GEOP] The graph need rebuild, graph id " + << cache_graph_id << " ,need_change_precision_mode: " + << need_recover_precision_mode_; + // 让进入需要Remove状态时,其他线程需要等待他remove完 + graph_handler_.status = Compiling; + while (graph_handler_.graph_run_num > 0) { + ADP_LOG(INFO) << "Remove wait, run_num: " << graph_handler_.graph_run_num + << ", graph_status: " << graph_handler_.status; + graph_handler_.cv.wait(lock); + } + auto ret = ge_session_->RemoveGraph(cache_graph_id); + if (ret != ge::SUCCESS) { + // 防止remove报错时卡死 + ADP_LOG(INFO) << "Set graph_status to Init"; + graph_handler_.status = CompileDone; + graph_handler_.cv.notify_all(); + return errors::Internal("[GEOP] Failed to remove graph ", + cache_graph_id, "from ge, error code ", ret, + "Error Message is : ", ge::GEGetErrorMsgV2().GetString()); + } + ADP_LOG(INFO) << "[GEOP] tf session: " << tf_session_ << ", graph id: " << cache_graph_id << "Removed graph"; + } + + if (graph_handler_.status != CompileDone) { + auto ret = CompileGraph(ctx, input_vec, inputs, cache_graph_id, input_shapes); + ADP_LOG(INFO) << "Set graph_status to CompileDone"; + graph_handler_.status = CompileDone; + graph_handler_.cv.notify_all(); + if (!ret.ok()) { + return ret; } + } - if (is_host_graph_) { - ADP_LOG(INFO) << "[GEOP] set graph option."; - graph_options_["ge.exec.placement"] = "HOST"; - } - graph_options_["ge.shape_generalized_build_mode"] = "shape_precise"; - if (!recompute_mode_.empty()) { - graph_options_["ge.recompute"] = recompute_mode_; - } - if (!max_key_num_.empty()) { - graph_options_["ge.max_key_num"] = max_key_num_; - } - if (!embedding_dim_.empty()) { - graph_options_["ge.embedding_dim"] = embedding_dim_; - } - if (!use_counter_filter_.empty()) { - graph_options_["ge.use_counter_filter"] = use_counter_filter_; - } - if (!padding_key_.empty()) { - graph_options_["ge.padding_key"] = padding_key_; - } - if (!embedding_flags_.empty()) { - graph_options_["ge.embedding_flags"] = embedding_flags_; - } - SetDynamicInput(); - graph_options_["ge.exec.isVarInitGraph"] = is_var_init_graph_; - graph_options_["ge.jit_compile"] = jit_compile_; - graph_options_["ge.exec.overflow"] = "1"; - graph_options_["ge.graphLevelSat"] = (mix_compile_mode_ == "0") ? "1" : "0"; - OP_REQUIRES_OK_ASYNC(ctx, DoAccelerateTrain(), done); - // call ge session addGraph api - auto graph_options = graph_options_; - if (is_aoe_) { - graph_options["ge.buildMode"] = "normal"; - } - if ((is_dynamic_getnext_ != "1") && (iteration_per_loop_ <= 1)) { - SetReuseOptions("ge.exec.inputReuseMemIndexes", ctx->num_inputs(), sess_options_, init_options_, graph_options); - } - SetReuseOptions("ge.exec.outputReuseMemIndexes", ctx->num_outputs(), sess_options_, init_options_, graph_options); - ADP_LOG(EVENT) << "[GEOP] call ge session add graph jit_compile: " << jit_compile_; - graph_options["ge.exec.graphIOMemAllocMode"] = "ByGE"; - OP_REQUIRES_OK_ASYNC(ctx, CreateGeSession(), done); - auto const graph_option_ascend_string = ChangeStringToAscendString(graph_options); - ADP_LOG(INFO) << "Graph options: "; - NpuAttrs::LogOptions(graph_options); - auto status = ge_session_->AddGraph(cache_graph_id, ge_graph, graph_option_ascend_string); - std::stringstream ss; - if (status != ge::SUCCESS) { - std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); - ADP_LOG(FATAL) << "[GEOP] call ge session add graph failed, kernel: " << geop_name << " ,tf session: " - << tf_session_ << ", graph id: " << cache_graph_id; + if (is_initialized_graph_ || is_empty_graph_) { + return Status::OK(); + } - ss << "[GEOP] call ge session add graph failed, kernel: " << geop_name << ", tf session: " << tf_session_ - << ", graph id: " << cache_graph_id << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - } - OP_REQUIRES_ASYNC(ctx, status == ge::SUCCESS, errors::Internal(ss.str()), done); - add_graph_flag_ = true; - ADP_LOG(INFO) << "[GEOP] Add graph to ge session success, kernel_name: " << geop_name - << ", tf session: " << tf_session_ << ", graph id: " << cache_graph_id; - - build_flag_ = true; - if (!is_set_dynamic_config && is_lazy_recompile_mode) { - cache_graphs_.insert(std::make_pair(input_shapes, cache_graph_id)); - graph_counts_.push_back(std::make_pair(input_shapes, 1)); - } - if (need_compile_graph_first_) { - ge::Status build_graph_status = ge_session_->BuildGraph(cache_graph_id, inputs); - std::stringstream ss; - if (build_graph_status != ge::SUCCESS) { - ss << "[GEOP] GE session build graph failed, domi_ret : " << build_graph_status << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + if (!IsDynamicConfig() && IsLazyCompile()) { + cache_graphs_.insert(std::make_pair(input_shapes, cache_graph_id)); + graph_counts_.push_back(std::make_pair(input_shapes, 1)); + } + + return RunGraph(ctx, cache_graph_id, inputs, callback); +} + +bool GeOp::IsLazyCompile() { + return ((dynamic_input_ == "1") && (dynamic_graph_execute_mode_ == "lazy_recompile")); +} + +void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { + OP_REQUIRES_ASYNC(ctx, init_flag_, errors::InvalidArgument("GeOp not Initialize success."), done); + { + mutex_lock lock{mu_}; + if (!sess_init_flag_) { + if (job_type_ != "localhost") { // in ps mode : ctx->session_handle() is empty + tf_session_ = "ps_worker_session"; + ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " when in ps mode."; } - OP_REQUIRES_ASYNC(ctx, build_graph_status == ge::SUCCESS, errors::Internal(ss.str()), done); - ADP_LOG(INFO) << "[GEOP] Build graph success."; - done(); - return; + if (tf_session_.empty()) { + tf_session_ = ctx->session_handle(); + ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " from session handle."; + } + OP_REQUIRES_ASYNC(ctx, IncrementGraphIdCount(graph_id_), errors::Internal("Get ge session failed."), done); + OP_REQUIRES_OK_ASYNC(ctx, CreateGeSession(), done); + ADP_LOG(INFO) << "[GEOP] Node name: " << ctx->op_kernel().name() << " , tf session: " << tf_session_; } - LOG(INFO) << "The model has been compiled on the Ascend AI processor, current graph id is: " << cache_graph_id; - } else { - if (compute_graph_empty_) { - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is empty, kernel_name:" << geop_name - << ", ret_status:" << ToString(ge::SUCCESS) << " , tf session: " << tf_session_ - << " ,graph id: " << cache_graph_id << " [" << ((endTime - startTime) / kMicrosToMillis) << " ms]"; - done(); - return; + } + if (is_aoe_) { + ADP_LOG(INFO) << "[GEOP] in tuning func, aoe_mode:" << init_options_["ge.jobType"] + << ", work_path:" << init_options_["ge.tuningPath"] + << ", distribute_config:" << init_options_["distribute_config"]; + // aoe ini + mutex_lock lock{mu_}; + if (!tuned_initialize_flag_) { + std::map global_options; + global_options.insert( + {ge::AscendString("work_path"), ge::AscendString(init_options_["ge.tuningPath"].c_str())}); + global_options.insert({ge::AscendString("job_type"), ge::AscendString(init_options_["ge.jobType"].c_str())}); + global_options.insert({ge::AscendString("ge.resourceConfigPath"), + ge::AscendString(sess_options_["ge.resourceConfigPath"].c_str())}); + AoeStatus init_ret = (*aoe_initialize_)(global_options); + OP_REQUIRES_ASYNC(ctx, init_ret == Aoe::AOE_SUCCESS, + errors::Internal("[GEOP] exec aoe initialize func failed[", init_ret, "]."), done); + tuned_initialize_flag_ = true; } } + // convert input to const + OP_REQUIRES_OK_ASYNC(ctx, GraphInputConvertToConst(ctx), done); + uint32_t num_inputs = static_cast(ctx->num_inputs()); + ADP_LOG(INFO) << "[GEOP] Begin GeOp::ComputeAsync" + << ", kernel_name:" << ctx->op_kernel().name() << ", num_inputs:" << num_inputs + << ", num_outputs:" << ctx->num_outputs(); - int64 run_start_time = InferShapeUtil::GetCurrentTimestap(); - auto callback = [done, ctx, run_start_time](ge::Status ge_status, std::vector &outputs) { + // if input shapes changed, cache graphs + std::vector input_vec; + std::vector input_shapes; + std::vector inputs; + OP_REQUIRES_OK_ASYNC(ctx, (BuildInputTensorInfo(ctx, input_vec, input_shapes, inputs)), done); + InitGraphShape(ctx); + if (is_aoe_) { + bool is_set_dynamic_config = IsDynamicConfig(); + OP_REQUIRES_ASYNC(ctx, !is_set_dynamic_config, + errors::Internal("dynamic input config can not use with mstuning."), done); + auto input_vec_aoe = input_vec; + OP_REQUIRES_ASYNC(ctx, RunTuning(input_vec_aoe, inputs, ctx) == 0, + errors::Internal("RunTuning fail.\n", ge::GEGetErrorMsgV2().GetString()), done); + ADP_LOG(INFO) << ctx->op_kernel().name() << " RunTuning finish."; + } + int64_t run_start_time = InferShapeUtil::GetCurrentTimestap(); + auto callback = [done, ctx, run_start_time, this](ge::Status ge_status, std::vector &outputs) { + ExitCallbackGuarder guarder([ctx, this] () { + mutex_lock lock(graph_handler_.graph_mu); + ADP_LOG(INFO) << "Callback end, run_num: " << graph_handler_.graph_run_num; + graph_handler_.graph_run_num--; + graph_handler_.cv.notify_all(); + }); if (ge_status == ge::SUCCESS) { if (BuildOutputTensorInfo(ctx, outputs) != Status::OK()) { - ADP_LOG(FATAL) << ctx->op_kernel().name() << " GEOP::DoRunAsync get output failed."; + ADP_LOG(ERROR) << ctx->op_kernel().name() << " GEOP::DoRunAsync get output failed."; std::stringstream ss; ss << ctx->op_kernel().name() << "GEOP::DoRunAsync get output failed." << std::endl << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); OP_REQUIRES_ASYNC(ctx, false, errors::Internal(ss.str()), done); - return; } } else if (ge_status == ge::END_OF_SEQUENCE) { ctx->SetStatus(errors::OutOfRange("End of sequence")); ADP_LOG(WARNING) << "[GEOP] Out of range: End of sequence."; LOG(WARNING) << "[GEOP] Out of range: End of sequence."; } else if (ge_status != ge::SUCCESS) { - std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); - ADP_LOG(FATAL) << ctx->op_kernel().name() << "GEOP::::DoRunAsync Failed"; + ADP_LOG(ERROR) << ctx->op_kernel().name() << "GEOP::::DoRunAsync Failed"; std::stringstream ss; ss << ctx->op_kernel().name() << "GEOP::::DoRunAsync Failed" << std::endl << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); OP_REQUIRES_ASYNC(ctx, false, errors::Internal(ss.str()), done); - return; } - int64 run_end_time = InferShapeUtil::GetCurrentTimestap(); + int64_t run_end_time = InferShapeUtil::GetCurrentTimestap(); ADP_LOG(INFO) << "[GEOP] RunGraphAsync callback, status:" << ge_status << ", kernel_name:" << ctx->op_kernel().name() << "[ " << (run_end_time - run_start_time) << "us]"; done(); }; + OP_REQUIRES_OK_ASYNC(ctx, + CompileAndRunGraph(ctx, input_vec, inputs, input_shapes, callback), done); - // call ge session runGraphAsync api - ADP_LOG(INFO) << "[GEOP] Call ge session RunGraphAsync, kernel_name: " << geop_name << ", tf session: " << tf_session_ - << ", graph id: " << cache_graph_id; - ge::Status run_graph_status = ge_session_->RunGraphAsync(cache_graph_id, inputs, callback); - std::stringstream ss; - if (run_graph_status != ge::SUCCESS) { - std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); - ADP_LOG(FATAL) << "[GEOP] call ge session RunGraphAsync Failed, kernel:" << geop_name << " ,tf session: " - << tf_session_ << " ,graph id: " << cache_graph_id; - ss << "[GEOP] call ge session RunGraphAsync Failed, kernel:" << geop_name << ", tf session: " << tf_session_ - << ", graph id: " << cache_graph_id << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + if (is_initialized_graph_ || is_empty_graph_) { + done(); + return; } - OP_REQUIRES_ASYNC(ctx, run_graph_status == ge::SUCCESS, errors::Internal(ss.str()), done); - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, kernel_name: " << geop_name - << ", ret_status: " << ToString(run_graph_status) << ", tf session : " << tf_session_ - << ", graph id: " << cache_graph_id << ", cost [" << ((endTime - startTime) / kMicrosToMillis) << "ms]"; + int64_t end_time = InferShapeUtil::GetCurrentTimestap(); + ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, kernel_name: " << ctx->op_kernel().name() + << ", tf session : " << tf_session_ << ", cost [" + << ((end_time - run_start_time) / kMicrosToMillis) << "ms]"; return; } @@ -1443,12 +1477,6 @@ void GeOp::AddNodeAttrs(Node *node, bool &is_initialize) { is_host_graph_ = true; ADP_LOG(INFO) << "[GEOP] variable subgraph is initialized in host."; } - if (!need_compile_graph_first_) { - if (node->name().find("NpuCompile") != std::string::npos) { - need_compile_graph_first_ = true; - ADP_LOG(INFO) << "[GEOP] set subgraph compile first."; - } - } // clear device info && attr node_def.set_device(""); if (node_def.op() == "Const") { @@ -1589,7 +1617,7 @@ void GeOp::HandleDpOpAndGetNextNodes(Graph &graph) { remove_nodes.push_back(iterator_node); } } - if (dynamic_input_ == "1" && dynamic_graph_execute_mode_ == "lazy_recompile") { + if (IsLazyCompile()) { graph_options_["ge.exec.enableCopyOutputAddr"] = "1"; } } @@ -1951,6 +1979,7 @@ void GeOp::SetShapesToOutputDesc(const std::vector &input_shapes, c } int GeOp::RunTuning(std::vector &input_vec, std::vector &inputs, const OpKernelContext *const ctx) { + mutex_lock lock{graph_handler_.graph_mu}; if (tuned_flag_.test_and_set()) { ADP_LOG(INFO) << ctx->op_kernel().name() << " has tuned."; return 0; @@ -2040,11 +2069,6 @@ int GeOp::RunTuning(std::vector &input_vec, std::vector &inp } { GE_MAKE_GUARD(destroy, callback); - const auto &ge_status = CreateGeSession(); - if (!ge_status.ok()) { - ADP_LOG(ERROR) << "get ge session failed[" << ge_status.error_message() << "]."; - return -1; - } // share ge_session to aoe AoeStatus set_ret = (*aoe_set_gesession_)(session_id_, ge_session_); if (set_ret != Aoe::AOE_SUCCESS) { @@ -2190,7 +2214,7 @@ Status GeOp::AnalyzeStringInput(ge::Tensor &input, uint64_t count, const std::st } Status GeOp::GraphInputConvertToConst(OpKernelContext *ctx) { - mutex_lock lock{mu_}; + mutex_lock lock{graph_handler_.graph_mu}; if (is_input_convert_) { return Status::OK(); } @@ -2277,6 +2301,7 @@ Status GeOp::GraphInputConvertToConst(OpKernelContext *ctx) { } Status GeOp::GraphCheckInputEqualConstOp(Tensor &tensor, int32_t index, bool &is_equal) { + mutex_lock lock{graph_handler_.graph_mu}; if (remove_index_.size() == 0) { return Status::OK(); } @@ -2302,8 +2327,10 @@ Status GeOp::BuildInputTensorInfo(OpKernelContext *const ctx, std::vectornum_inputs(); std::string cur_input_shapes; - // populate inputs + std::shared_ptr model_parser = + domi::ModelParserFactory::Instance()->CreateModelParser(domi::FrameworkType::TENSORFLOW); + REQUIRES_NOT_NULL(model_parser); for (int i = 0; i < num_inputs; i++) { Tensor tensor(ctx->input(i)); bool is_equal = false; @@ -2319,9 +2346,6 @@ Status GeOp::BuildInputTensorInfo(OpKernelContext *const ctx, std::vector model_parser = - domi::ModelParserFactory::Instance()->CreateModelParser(domi::FrameworkType::TENSORFLOW); - REQUIRES_NOT_NULL(model_parser); ge::DataType type = model_parser->ConvertToGeDataType(static_cast(data_type)); if (type == ge::DT_UNDEFINED) { ADP_LOG(ERROR) << "[GEOP] No Supported datatype : " << data_type; @@ -2360,6 +2384,7 @@ Status GeOp::BuildInputTensorInfo(OpKernelContext *const ctx, std::vectornum_outputs(); // populate outputs diff --git a/tf_adapter/kernels/geop_npu.h b/tf_adapter/kernels/geop_npu.h index 975846463..3a9340ddb 100644 --- a/tf_adapter/kernels/geop_npu.h +++ b/tf_adapter/kernels/geop_npu.h @@ -1,5 +1,5 @@ /* - * Copyright (c) Huawei Technologies Co., Ltd. 2019-2020. All rights reserved. + * Copyright (c) Huawei Technologies Co., Ltd. 2019-2025. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ #include #include +#include #include "tensorflow/core/common_runtime/function.h" #include "tensorflow/core/framework/op_kernel.h" @@ -46,6 +47,20 @@ using AoeSetTuningGraphInputFunc = AoeStatus (*)(SessionId, const std::vector &); +enum GraphStatus { + Init, + CompileDone, + Compiling +}; + +struct GraphHandler { + GraphStatus status = Init; + mutex graph_mu; + condition_variable cv; + int32_t graph_run_num = 0; + ge::ComputeGraphPtr graph; +}; + class GeOp : public AsyncOpKernel { public: explicit GeOp(OpKernelConstruction *ctx); @@ -94,6 +109,22 @@ public: // prepare output tensor Status BuildOutTensorInfo(OpKernelContext *ctx); + Status ParserGraph(OpKernelContext *ctx, const std::vector &input_vec); + Status AddGraph(OpKernelContext *ctx, const uint32_t &graph_id); + Status CompileGraph(OpKernelContext *ctx, const std::vector &input_vec, + const std::vector &inputs, + const uint32_t &graph_id, + const std::vector &input_shapes); + Status BuildGraph(const uint32_t &graph_id, const std::vector &inputs); + Status RunGraph(OpKernelContext *ctx, const uint32_t &graph_id, + const std::vector &inputs, + ge::RunAsyncCallback callback); + Status CompileAndRunGraph(OpKernelContext *ctx, + const std::vector &input_vec, + const std::vector &inputs, + const std::vector &input_shapes, + ge::RunAsyncCallback callback); + bool IsLazyCompile(); // create input and output desc for NodeDef Status GenerateDesc(Node *&node); @@ -108,7 +139,6 @@ public: void AddNodeAttrs(Node *node, bool &is_initialize); - int InitRebuildFlag(uint32_t cache_graph_id); bool IsGraphNeedRebuild(const uint32_t cache_graph_id); Status DoAccelerateTrain(); Status NeedRecompileWhenAccelerateTrainOn(bool &need_recompile); @@ -120,12 +150,12 @@ public: Status RecoverPrecisionMode(); bool IncrementGraphIdCount(uint32_t &graph_id); - bool DecrementGraphIdCount(const std::string &tf_session, uint32_t &graph_id); + bool DecrementGraphIdCount(uint32_t &graph_id); void ClearGraphIdCount(); void GetExecGraphId(uint32_t &cache_graph_id, - std::vector input_shapes); + const std::vector &input_shapes); void GetMsTuneConfig(std::map init_options); @@ -144,7 +174,7 @@ public: std::string BuildSubGraph(FunctionLibraryDefinition *flib_def, const std::string &graph); void SetDynamicInput(); - + Status SetGraphOptions(OpKernelContext *ctx); void ProcessDpOpFuncDef(const Node &node) const; void BuildQueueDataAndGetNextFromQueue(Graph &graph, const Node &getnext_node, @@ -155,7 +185,7 @@ public: bool IsDynamicGetNext(const Node *node); void ChangeChannelNameAttr(NodeDef &node_def) const; - + void InitGraphShape(OpKernelContext *const ctx); bool IsDynamicConfig(); PartialTensorShape MakeCompatShape(const PartialTensorShape &a, const PartialTensorShape &b) const; @@ -184,10 +214,7 @@ public: static bool tuned_initialize_flag_; bool init_flag_; - bool build_flag_; - bool add_graph_flag_; bool sess_init_flag_; - bool compute_graph_empty_; bool is_input_convert_; std::string input_shapes_; @@ -195,6 +222,7 @@ public: std::string data_format_; uint32_t graph_id_; bool is_initialized_graph_; + bool is_empty_graph_; bool need_iteration_; std::string tf_session_; ge::Session *ge_session_; @@ -215,11 +243,10 @@ public: std::vector dynamic_shape_nodes_; std::string dynamic_input_; std::string compile_dynamic_mode_; + uint32_t graph_max_parallel_model_num_{1U}; std::string dynamic_graph_execute_mode_; std::string data_inputs_shape_range_; std::string getnext_inputs_shape_range_; - bool need_compile_graph_first_; - std::map tune_options_; std::string is_dynamic_getnext_; std::string placeholder_index_; std::atomic_flag tuned_flag_; @@ -250,6 +277,7 @@ public: AoeSetTuningGraphInputFunc aoe_set_tuning_graph_input_; // accelerate train AccelerateInfo accelerate_info_; + GraphHandler graph_handler_; }; } // namespace tensorflow #endif // TENSORFLOW_KERNELS_GEOP_NPU_H_ diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_config.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_config.py index b2b2e7c5c..9ffaf237f 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_config.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_config.py @@ -115,7 +115,8 @@ class NPURunConfig(run_config_lib.RunConfig): quant_dumpable=None, input_fusion_size=131072, compile_dynamic_mode=None, - execute_times=-1 + execute_times=-1, + graph_max_parallel_model_num=1 ): """ Constructs a NPUConfig. @@ -166,6 +167,7 @@ class NPURunConfig(run_config_lib.RunConfig): compress_weight_conf:Path and file name of the node list configuration file to be compressed. dynamic_input:Whether Input is dynamic. compile_dynamic_mode:compile graph with dynamic shape. + graph_max_parallel_model_num:max parallel model num dynamic_graph_execute_mode:Dynamic graph execute mode. lazy_recompile or dynamic_execute dynamic_inputs_shape_range:Inputs shape range. local_rank_id: Local sequence number of the device in a group. @@ -278,6 +280,7 @@ class NPURunConfig(run_config_lib.RunConfig): self._jit_compile = jit_compile self._input_fusion_size = input_fusion_size self._compile_dynamic_mode = compile_dynamic_mode + self._graph_max_parallel_model_num = graph_max_parallel_model_num self.execute_times = execute_times super(NPURunConfig, self).__init__( model_dir=model_dir, diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py index af3638578..0ea251de4 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py @@ -802,6 +802,8 @@ class NPUEstimator(estimator_lib.Estimator): custom_op.parameter_map["es_cluster_config"].s = tf.compat.as_bytes(config.es_cluster_config) if config._compile_dynamic_mode is not None: custom_op.parameter_map["compile_dynamic_mode"].b = config._compile_dynamic_mode + if config._graph_max_parallel_model_num is not None: + custom_op.parameter_map["graph_max_parallel_model_num"].i = config._graph_max_parallel_model_num custom_op.parameter_map["jit_compile"].s = tf.compat.as_bytes(config._jit_compile) custom_op.parameter_map["input_fusion_size"].i = config._input_fusion_size custom_op.parameter_map["stream_sync_timeout"].i = config.stream_sync_timeout diff --git a/tf_adapter/tests/depends/ge_runner/src/callback_executor.cc b/tf_adapter/tests/depends/ge_runner/src/callback_executor.cc new file mode 100644 index 000000000..2d14794ce --- /dev/null +++ b/tf_adapter/tests/depends/ge_runner/src/callback_executor.cc @@ -0,0 +1,86 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "callback_executor.h" +#include +#include "acl/acl_rt.h" + +namespace tensorflow { + CallbackExecutor &CallbackExecutor::GetInstance() { + static CallbackExecutor instance; + return instance; + } + void CallbackExecutor::Init() { + std::cout << "Start callback thread pool." << std::endl; + copy_thread_pool_.resize(thread_num_); + for (size_t idx = 0UL; idx < copy_thread_pool_.size(); idx++) { + if (copy_thread_pool_[idx] == nullptr) { + std::string thread_name = "thread_pool" + std::to_string(idx); + copy_thread_pool_[idx].reset(new std::thread(std::bind(&CallbackExecutor::CallbackHandler, this))); + } + } + thread_stop_flag_.store(false); + } + + void CallbackExecutor::CallbackHandler() { + std::cout << "Start callback thread." << std::endl; + CallbackPack closure; + while (!thread_stop_flag_.load()) { + { + std::unique_lock lck(queue_lock_); + queue_var_.wait(lck, [this]() { return ((!task_queue_.empty()) || (thread_stop_flag_.load())); }); + if (thread_stop_flag_.load()) { + queue_var_.notify_all(); + break; + } + closure = task_queue_.front(); + task_queue_.pop(); + std::cout << "Run callback" << std::endl; + } + closure.callback(closure.ge_status, closure.outputs); + std::unique_lock lck(queue_lock_); + run_num_--; + } + std::cout << "Callback thread is finished." << std::endl; + } + + void CallbackExecutor::PushTask(const CallbackPack &closure) { + std::unique_lock lck(queue_lock_); + std::cout << "Push closure" << std::endl; + task_queue_.push(closure); + run_num_++; + queue_var_.notify_all(); + } + + void CallbackExecutor::StopThreadPool() { + { + std::unique_lock lck(queue_lock_); + queue_var_.wait(lck, [this]() { return run_num_ <= 0; }); + std::cout << "Stop callback thread." << std::endl; + thread_stop_flag_.store(true); + queue_var_.notify_all(); + } + for (size_t i = 0UL; i < copy_thread_pool_.size(); i++) { + if (copy_thread_pool_[i]->joinable()) { + copy_thread_pool_[i]->join(); + } + } + } + int32_t CallbackExecutor::GetRunNum() { + std::unique_lock lck(queue_lock_); + return run_num_; + } +} \ No newline at end of file diff --git a/tf_adapter/tests/depends/ge_runner/src/callback_executor.h b/tf_adapter/tests/depends/ge_runner/src/callback_executor.h new file mode 100644 index 000000000..d312ccef6 --- /dev/null +++ b/tf_adapter/tests/depends/ge_runner/src/callback_executor.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TESTS_DEPENDS_GE_RUNNER_SRC_HOST_THREAD_POOL_H_ +#define TESTS_DEPENDS_GE_RUNNER_SRC_HOST_THREAD_POOL_H_ + +#include +#include +#include +#include +#include +#include +#include +#include "ge/ge_api_types.h" +#include "graph/tensor.h" + +namespace tensorflow { +struct CallbackPack { + ge::RunAsyncCallback callback; + ge::Status ge_status; + std::vector outputs; +}; +class CallbackExecutor { + public: + static CallbackExecutor &GetInstance(); + void Init(); + void PushTask(const CallbackPack &closure); + void StopThreadPool(); + int32_t GetRunNum(); + private: + void CallbackHandler(); + std::mutex queue_lock_; + std::condition_variable queue_var_; + std::vector> copy_thread_pool_; + std::queue task_queue_; + std::atomic thread_stop_flag_{false}; + uint32_t thread_num_ = 1U; + int32_t run_num_ = 0; +}; +} +#endif // TESTS_DEPENDS_GE_RUNNER_SRC_HOST_THREAD_POOL_H_ \ No newline at end of file diff --git a/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc b/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc index 35c7c9a23..adfdbe3d8 100644 --- a/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc +++ b/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc @@ -38,6 +38,7 @@ #include "ascendcl_stub.h" #include "ge_stub.h" #include "tf_adapter/common/adapter_logger.h" +#include "callback_executor.h" namespace ge { namespace { @@ -268,7 +269,7 @@ bool Session::IsGraphNeedRebuild(uint32_t graphId) { Status Session::AddGraph(uint32_t graphId, const Graph &graph, const std::map &options) { auto ret = graphs_map.find(graphId); if (ret != graphs_map.end()) { - return ge::FAILED; + return ge::SUCCESS; } graphs_map[graphId] = graph; return ge::SUCCESS; @@ -284,10 +285,6 @@ Status Session::AddGraphWithCopy(uint32_t graphId, const Graph &graph, const std } Status Session::BuildGraph(uint32_t graphId, const std::vector &inputs) { - auto ret = graphs_map.find(graphId); - if (ret == graphs_map.end()) { - return ge::FAILED; - } return ge::SUCCESS; } @@ -296,21 +293,22 @@ void RegRunGraphAsyncStub(RunGraphAsyncStub stub) { g_RunGraphAsyncStub = stub; } +void ClearRegRunGraphAsyncStub() { + g_RunGraphAsyncStub = nullptr; +} + Status Session::RunGraphAsync(uint32_t graphId, const std::vector &inputs, RunAsyncCallback callback) { if (g_RunGraphAsyncStub != nullptr) { return g_RunGraphAsyncStub(graphId, inputs, callback); } - ge::Status ret; std::vector outputs; outputs.push_back(ge::Tensor()); - auto res = graphs_map.find(graphId); - if (res == graphs_map.end()) { - ret = ge::FAILED; - } else { - ret = ge::SUCCESS; - } - callback(ret, outputs); - return ret; + tensorflow::CallbackPack pack; + pack.callback = callback; + pack.ge_status = ge::SUCCESS; + pack.outputs = outputs; + tensorflow::CallbackExecutor::GetInstance().PushTask(pack); + return ge::SUCCESS; } Status Session::BuildGraph(uint32_t graphId, const std::vector &inputs) { diff --git a/tf_adapter/tests/depends/ge_runner/src/ge_stub.h b/tf_adapter/tests/depends/ge_runner/src/ge_stub.h index 025107d84..9a0b2be1b 100644 --- a/tf_adapter/tests/depends/ge_runner/src/ge_stub.h +++ b/tf_adapter/tests/depends/ge_runner/src/ge_stub.h @@ -51,5 +51,6 @@ void RegRunGraphStub(RunGraphStub stub); using RunGraphAsyncStub = std::function&, RunAsyncCallback)>; void RegRunGraphAsyncStub(RunGraphAsyncStub stub); +void ClearRegRunGraphAsyncStub(); } // namespace ge #endif // COMMON_GRAPH_DEBUG_GE_UTIL_H_ diff --git a/tf_adapter/tests/st/kernels/testcase/geop_npu_test.cc b/tf_adapter/tests/st/kernels/testcase/geop_npu_test.cc index 2e632988c..041a948d3 100644 --- a/tf_adapter/tests/st/kernels/testcase/geop_npu_test.cc +++ b/tf_adapter/tests/st/kernels/testcase/geop_npu_test.cc @@ -8,6 +8,7 @@ #include #include "gtest/gtest.h" #include "ge_stub.h" +#include "callback_executor.h" #define private public #include "tf_adapter/kernels/geop_npu.h" #undef private @@ -144,6 +145,9 @@ Status GeOpRunGraphAsync(std::string example_path, gtl::InlinedVector(¶ms); async_op->ComputeAsync(ctx1.get(), done); } + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); + } } } return Status::OK(); @@ -187,6 +191,9 @@ Status GeOpRunGraphAsyncMultiStep(std::string example_path, std::vector(¶ms); async_op->ComputeAsync(ctx.get(), done); + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); + } } } } @@ -195,6 +202,7 @@ Status GeOpRunGraphAsyncMultiStep(std::string example_path, std::vector inputs; @@ -290,20 +298,19 @@ TEST_F(GeOpTest, GeOpDynamicInput1Test) { EXPECT_TRUE(!attrs["_dynamic_input"].s().empty()); EXPECT_EQ(attrs["_dynamic_graph_execute_mode"].s() == "dynamic_execute", true); } + TEST_F(GeOpTest, GeOpGetNextStringTest) { NodeDef node_def; std::string graph_def_path = "tf_adapter/tests/ut/kernels/pbtxt/geop_getnext_string.pbtxt"; std::vector ge_output1_dims{2, 2}; auto getnext_output1_info = - std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output1_dims, 8, nullptr)); + std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output1_dims, 128, nullptr)); Allocator* allocator1 = NpuHostGetNextAllocator::Create(std::move(getnext_output1_info)); Tensor a(allocator1, DT_INT64, TensorShape({2, 2})); - Tensor in(DT_STRING, TensorShape({1})); - in.scalar()() = "ABC"; - Tensor d(DT_INT32, TensorShape({2, 2})); - gtl::InlinedVector inputs{TensorValue(&a), TensorValue(&in), TensorValue(&d)}; - EXPECT_TRUE(GeOpRunGraphAsync(graph_def_path, inputs, node_def, "GeOp14_0", false).ok()); + gtl::InlinedVector inputs{TensorValue(&a)}; + EXPECT_TRUE(GeOpRunGraphAsync(graph_def_path, inputs, node_def, "GeOp14_0").ok()); } + TEST_F(GeOpTest, GeOpAoeTuningAndDynamicDimsTest) { NodeDef node_def; std::string graph_def_path = "tf_adapter/tests/ut/kernels/pbtxt/geop_aoe_tuning_and_dynamic_dims.pbtxt"; @@ -594,8 +601,11 @@ TEST_F(GeOpTest, BuildOutputTensorInfo) { }); std::vector outputs; outputs.emplace_back(tensor); - - callback(ge::SUCCESS, outputs); + tensorflow::CallbackPack pack; + pack.callback = callback; + pack.ge_status = ge::SUCCESS; + pack.outputs = outputs; + tensorflow::CallbackExecutor::GetInstance().PushTask(pack); return ge::SUCCESS; }); @@ -779,6 +789,7 @@ TEST_F(GeOpTest, test_Get_GeSession_Failed) { GeOp *geop_node = dynamic_cast(g_op.get()); geop_node->tf_session_ = ""; EXPECT_EQ(geop_node->CreateGeSession().ok(), false); + CallbackExecutor::GetInstance().StopThreadPool(); } } // namespace } //end tensorflow diff --git a/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc b/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc index e319a8571..01c3e031c 100644 --- a/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc +++ b/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc @@ -11,6 +11,7 @@ #include "tf_adapter/util/npu_plugin.h" #include "tf_adapter/util/npu_plugin.h" #include "tf_adapter/util/util.h" +#include "callback_executor.h" #define private public #include "tf_adapter/kernels/geop_npu.h" #undef private @@ -146,6 +147,9 @@ Status GeOpRunGraphAsync(std::string example_path, gtl::InlinedVector(¶ms); async_op->ComputeAsync(ctx1.get(), done); } + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); + } } } return Status::OK(); @@ -190,6 +194,9 @@ Status GeOpRunGraphAsyncMultiStep(std::string example_path, std::vector(¶ms); async_op->ComputeAsync(ctx.get(), done); } + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); + } } } return Status::OK(); @@ -198,6 +205,7 @@ Status GeOpRunGraphAsyncMultiStep(std::string example_path, std::vector inputs; @@ -303,20 +311,19 @@ TEST_F(GeOpTest, GeOpDynamicInput1Test) { EXPECT_TRUE(!attrs["_dynamic_input"].s().empty()); EXPECT_EQ(attrs["_dynamic_graph_execute_mode"].s() == "dynamic_execute", true); } + TEST_F(GeOpTest, GeOpGetNextStringTest) { NodeDef node_def; std::string graph_def_path = "tf_adapter/tests/ut/kernels/pbtxt/geop_getnext_string.pbtxt"; std::vector ge_output1_dims{2, 2}; auto getnext_output1_info = - std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output1_dims, 8, nullptr)); + std::unique_ptr(new NpuGetNextOutputInfo(ge::kPlacementDevice, ge_output1_dims, 128, nullptr)); Allocator* allocator1 = NpuHostGetNextAllocator::Create(std::move(getnext_output1_info)); Tensor a(allocator1, DT_INT64, TensorShape({2, 2})); - Tensor in(DT_STRING, TensorShape({1})); - in.scalar()() = "ABC"; - Tensor d(DT_INT32, TensorShape({2, 2})); - gtl::InlinedVector inputs{TensorValue(&a), TensorValue(&in), TensorValue(&d)}; - EXPECT_TRUE(GeOpRunGraphAsync(graph_def_path, inputs, node_def, "GeOp14_0", false).ok()); + gtl::InlinedVector inputs{TensorValue(&a)}; + EXPECT_TRUE(GeOpRunGraphAsync(graph_def_path, inputs, node_def, "GeOp14_0").ok()); } + TEST_F(GeOpTest, GeOpAoeTuningAndDynamicDimsTest) { NodeDef node_def; std::string graph_def_path = "tf_adapter/tests/ut/kernels/pbtxt/geop_aoe_tuning_and_dynamic_dims.pbtxt"; @@ -603,8 +610,11 @@ TEST_F(GeOpTest, BuildOutputTensorInfo) { }); std::vector outputs; outputs.emplace_back(tensor); - - callback(ge::SUCCESS, outputs); + tensorflow::CallbackPack pack; + pack.callback = callback; + pack.ge_status = ge::SUCCESS; + pack.outputs = outputs; + tensorflow::CallbackExecutor::GetInstance().PushTask(pack); return ge::SUCCESS; }); @@ -784,6 +794,7 @@ TEST_F(GeOpTest, test_Get_GeSession_Failed) { GeOp *geop_node = dynamic_cast(g_op.get()); geop_node->tf_session_ = ""; EXPECT_EQ(geop_node->CreateGeSession().ok(), false); + CallbackExecutor::GetInstance().StopThreadPool(); } } // namespace } // namespace tensorflow diff --git a/tf_adapter/util/npu_attrs.cc b/tf_adapter/util/npu_attrs.cc index f80ca6c02..f970327d6 100644 --- a/tf_adapter/util/npu_attrs.cc +++ b/tf_adapter/util/npu_attrs.cc @@ -490,6 +490,7 @@ std::map NpuAttrs::GetSessOptions(const OpKernelConstr std::string graph_slice_mode; std::string input_fusion_size = "131072"; std::string compile_dynamic_mode; + std::string graph_max_parallel_model_num = "1"; if (ctx != nullptr && ctx->GetAttr("_NpuOptimizer", &npuOptimizer) == Status::OK()) { (void) ctx->GetAttr("_variable_format_optimize", &variable_format_optimize); (void) ctx->GetAttr("_hcom_parallel", &hcom_parallel); @@ -558,6 +559,7 @@ std::map NpuAttrs::GetSessOptions(const OpKernelConstr (void) ctx->GetAttr("_enable_graph_parallel", &enable_graph_parallel); (void) ctx->GetAttr("_graph_slice", &graph_slice_mode); (void) ctx->GetAttr("_compile_dynamic_mode", &compile_dynamic_mode); + (void) ctx->GetAttr("_graph_max_parallel_model_num", &graph_max_parallel_model_num); std::string jit_compile; if (ctx->GetAttr("_jit_compile", &jit_compile).ok()) { sess_options["jit_compile"] = jit_compile; @@ -931,6 +933,7 @@ std::map NpuAttrs::GetPassOptions(const OpKernelConstr std::string use_off_line = "1"; std::string mix_compile_mode = "0"; std::string iterations_per_loop = "1"; + std::string graph_max_parallel_model_num = "1"; std::string lower_functional_ops = "0"; std::string job = "default"; std::string task_index = "0"; @@ -968,6 +971,7 @@ std::map NpuAttrs::GetPassOptions(const OpKernelConstr (void) ctx->GetAttr("_frozen_variable", &frozen_variable); (void) ctx->GetAttr("_variable_location", &variable_location); (void) ctx->GetAttr("_accelerate_train_mode", &accelerate_train_mode); + (void) ctx->GetAttr("_graph_max_parallel_model_num", &graph_max_parallel_model_num); } // pass options pass_options["do_npu_optimizer"] = do_npu_optimizer; @@ -975,6 +979,7 @@ std::map NpuAttrs::GetPassOptions(const OpKernelConstr pass_options["use_off_line"] = use_off_line; pass_options["mix_compile_mode"] = mix_compile_mode; pass_options["iterations_per_loop"] = iterations_per_loop; + pass_options["graph_max_parallel_model_num"] = graph_max_parallel_model_num; pass_options["lower_functional_ops"] = lower_functional_ops; pass_options["job"] = job; pass_options["task_index"] = task_index; @@ -988,7 +993,6 @@ std::map NpuAttrs::GetPassOptions(const OpKernelConstr pass_options["frozen_variable"] = frozen_variable; pass_options["variable_location"] = variable_location; pass_options["accelerate_train_mode"] = accelerate_train_mode; - return pass_options; } @@ -1112,6 +1116,7 @@ std::map NpuAttrs::GetAllAttrOptions(const AttrSlice & std::string use_off_line = "1"; std::string mix_compile_mode = "0"; std::string iterations_per_loop = "1"; + std::string graph_max_parallel_model_num = "1"; std::string lower_functional_ops = "0"; std::string job = "default"; std::string task_index = "0"; @@ -1206,6 +1211,7 @@ std::map NpuAttrs::GetAllAttrOptions(const AttrSlice & auto use_off_line_value = attrs.Find("_use_off_line"); auto mix_compile_mode_value = attrs.Find("_mix_compile_mode"); auto iterations_per_loop_value = attrs.Find("_iterations_per_loop"); + auto graph_max_parallel_model_num_value = attrs.Find("_graph_max_parallel_model_num"); auto lower_functional_ops_value = attrs.Find("_lower_functional_ops"); auto job_value = attrs.Find("_job"); auto task_index_value = attrs.Find("_task_index"); @@ -1309,6 +1315,9 @@ std::map NpuAttrs::GetAllAttrOptions(const AttrSlice & if (iterations_per_loop_value != nullptr) { iterations_per_loop = iterations_per_loop_value->s(); } + if (graph_max_parallel_model_num_value != nullptr) { + graph_max_parallel_model_num = graph_max_parallel_model_num_value->s(); + } if (lower_functional_ops_value != nullptr) { lower_functional_ops = lower_functional_ops_value->s(); } @@ -1652,6 +1661,7 @@ std::map NpuAttrs::GetAllAttrOptions(const AttrSlice & all_options["use_off_line"] = use_off_line; all_options["mix_compile_mode"] = mix_compile_mode; all_options["iterations_per_loop"] = iterations_per_loop; + all_options["graph_max_parallel_model_num"] = graph_max_parallel_model_num; all_options["lower_functional_ops"] = lower_functional_ops; all_options["job"] = job; all_options["task_index"] = task_index; @@ -1722,6 +1732,7 @@ std::map NpuAttrs::GetDefaultPassOptions() { pass_options["use_off_line"] = "1"; pass_options["mix_compile_mode"] = "0"; pass_options["iterations_per_loop"] = std::to_string(1); + pass_options["graph_max_parallel_model_num"] = std::to_string(1); pass_options["lower_functional_ops"] = "0"; pass_options["job"] = "default"; pass_options["task_index"] = std::to_string(0); @@ -1777,7 +1788,8 @@ Status NpuAttrs::SetNpuOptimizerAttr(const GraphOptimizationPassOptions &options bool enable_dp = false; bool use_off_line = true; bool mix_compile_mode = false; - int64_t iterations_per_loop = 1U; + int64_t iterations_per_loop = 1L; + int64_t graph_max_parallel_model_num = 1L; bool lower_functional_ops = false; std::string job = "localhost"; int64_t task_index = 0L; @@ -2033,6 +2045,12 @@ Status NpuAttrs::SetNpuOptimizerAttr(const GraphOptimizationPassOptions &options if (params.count("iterations_per_loop") > 0) { iterations_per_loop = params.at("iterations_per_loop").i(); } + if (params.count("graph_max_parallel_model_num") > 0) { + graph_max_parallel_model_num = params.at("graph_max_parallel_model_num").i(); + const int32_t max_model_num = 8; + NPU_REQUIRES((graph_max_parallel_model_num > 0) && (graph_max_parallel_model_num <= max_model_num), + errors::Internal("graph_max_parallel_model_num should set between 1 to 8")); + } if (params.count("lower_functional_ops") > 0) { lower_functional_ops = params.at("lower_functional_ops").b(); } @@ -2531,6 +2549,7 @@ Status NpuAttrs::SetNpuOptimizerAttr(const GraphOptimizationPassOptions &options pass_options["use_off_line"] = std::to_string(static_cast(use_off_line)); pass_options["mix_compile_mode"] = std::to_string(static_cast(mix_compile_mode)); pass_options["iterations_per_loop"] = std::to_string(iterations_per_loop); + pass_options["graph_max_parallel_model_num"] = std::to_string(graph_max_parallel_model_num); pass_options["lower_functional_ops"] = std::to_string(static_cast(lower_functional_ops)); pass_options["job"] = job; pass_options["task_index"] = std::to_string(task_index); diff --git a/tf_adapter_2.x/npu_device/core/npu_device.cpp b/tf_adapter_2.x/npu_device/core/npu_device.cpp index 7100e8400..9971eb0a5 100644 --- a/tf_adapter_2.x/npu_device/core/npu_device.cpp +++ b/tf_adapter_2.x/npu_device/core/npu_device.cpp @@ -216,6 +216,7 @@ std::string NpuDevice::CreateDevice(const char *name, int device_index, *device = new (std::nothrow) NpuDevice(); if (*device == nullptr) { + delete ge_session; return "Failed create new npu device instance"; } (*device)->device_id = device_index; -- Gitee