diff --git a/tf_adapter/kernels/geop_npu.cc b/tf_adapter/kernels/geop_npu.cc index 537994554704cccd28487f9d60cb74dde2d405af..1fc652acce9adf924069e25a4e2635ca86a4e99b 100644 --- a/tf_adapter/kernels/geop_npu.cc +++ b/tf_adapter/kernels/geop_npu.cc @@ -33,7 +33,7 @@ #include #include #include - +#include #include "tf_adapter/common/adapter_logger.h" #include "tf_adapter/common/common.h" #include "tf_adapter/util/ge_plugin.h" @@ -104,7 +104,7 @@ const float kMaxStepRatio = 0.9; const float kDefaultLossRatio = 1.05; const float kMinLossRatio = 1.01; const float kMaxLossRatio = 1.5; - +const int32_t kMaxAddNum = 8; const std::map fast_value_string_2_eunm = {{"fast", GeOp::FastValue::kfast}, {"fast1", GeOp::FastValue::kfast1}}; @@ -333,6 +333,14 @@ void SetReuseOptions(const std::string &key, int32_t num, const std::mapsecond; } } +class ExitCallbackGuarder { + public: + explicit ExitCallbackGuarder(std::function done) : done_(done) {} + ~ExitCallbackGuarder() { done_(); } + + private: + std::function done_; +}; } // namespace std::string CurrentTimeInStr() { @@ -354,10 +362,10 @@ const int kFatalSleepTime = 3000; const std::string kAllReduce = "HcomAllReduce"; GeOp::GeOp(OpKernelConstruction *ctx) - : AsyncOpKernel(ctx), init_flag_(false), build_flag_(false), add_graph_flag_(false), sess_init_flag_(false), - compute_graph_empty_(false), is_input_convert_(false), data_format_(""), graph_id_(0), - is_initialized_graph_(false), need_iteration_(false), tf_session_(""), ge_session_(nullptr), job_type_(""), - is_host_graph_(false), handle_(nullptr), need_compile_graph_first_(false), tuned_flag_(ATOMIC_FLAG_INIT), + : AsyncOpKernel(ctx), init_flag_(false), sess_init_flag_(false), + is_input_convert_(false), data_format_(""), graph_id_(0), + is_initialized_graph_(false), is_empty_graph_(false), need_iteration_(false), tf_session_(""), ge_session_(nullptr), job_type_(""), + is_host_graph_(false), handle_(nullptr), tuned_flag_(ATOMIC_FLAG_INIT), jit_compile_("2"), is_dynamic_input_(false), session_id_(0), aoe_initialize_(nullptr), aoe_finalize_(nullptr), aoe_create_session_(nullptr), aoe_destroy_session_(nullptr), aoe_set_gesession_(nullptr), aoe_set_dependgraphs_(nullptr), aoe_set_tuninggraph_(nullptr), aoe_tuning_graph_(nullptr), @@ -508,18 +516,13 @@ void GeOp::Finalize() { // global environment finalize, invoke once for each process { mutex_lock lock{mu_}; - uint32_t graph_id = -1; if (sess_init_flag_ || !tf_session_.empty()) { - bool ret = DecrementGraphIdCount(tf_session_, graph_id); + bool ret = DecrementGraphIdCount(); if (!ret) { ADP_LOG(ERROR) << "tf session " << tf_session_ << " sub graph id failed."; LOG(ERROR) << "tf session " << tf_session_ << " sub graph id failed."; return; } - if (graph_id == kInvalidGraphId) { - SessionManager::GetInstance().DestroyGeSession(tf_session_); - ClearGraphIdCount(); - } } if (!SessionManager::GetInstance().IsGeSessionExist()) { @@ -808,42 +811,54 @@ bool GeOp::IsGraphNeedRebuild(const uint32_t cache_graph_id) { return ((need_recover_precision_mode_) || (ge_session_->IsGraphNeedRebuild(cache_graph_id))); } -int32_t GeOp::InitRebuildFlag(uint32_t cache_graph_id) { - if (!build_flag_) { - ADP_LOG(INFO) << "[GEOP] tf session " << tf_session_ << ", graph id: " << cache_graph_id - << " does not build yet, no need to check rebuild"; - return 0; - } - if (compute_graph_empty_) { - ADP_LOG(INFO) << "[GEOP] tf session " << tf_session_ << ", graph id: " << cache_graph_id - << " is empty, no need to check rebuild"; - return 0; - } - if (ge_session_ == nullptr) { - ADP_LOG(ERROR) << "[GEOP] GE session is nullptr"; - LOG(ERROR) << "[GEOP] GE session is nullptr"; - return -1; - } - if (!IsGraphNeedRebuild(cache_graph_id)) { - ADP_LOG(INFO) << "[GEOP] tf session " << tf_session_ << ", graph id: " << cache_graph_id << " no need to rebuild"; - return 0; +Status GeOp::RemoveGraph(const uint32_t &graph_id) { + if (graph_handler_.status == Init) { + return Status::OK(); } - ADP_LOG(INFO) << "[GEOP] The graph need rebuild, graph id " << cache_graph_id << " ,need_change_precision_mode: " - << need_recover_precision_mode_; - - // The graph need to rebuild, remove it from GE first. - ADP_LOG(INFO) << "[GEOP] tf session: " << tf_session_ << ", graph id: " << cache_graph_id; - auto ret = ge_session_->RemoveGraph(cache_graph_id); + auto ret = ge_session_->RemoveGraph(graph_id); if (ret != ge::SUCCESS) { - ADP_LOG(ERROR) << "[GEOP] Failed to remove graph " << cache_graph_id << " from ge, error code " << ret; - LOG(ERROR) << "[GEOP] Failed to remove graph " << cache_graph_id << " from ge, error code " << ret << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - return -1; + return errors::Internal("[GEOP] Failed to remove graph ", + graph_id, "from ge, error code ", ret, + "Error Message is : ", ge::GEGetErrorMsgV2().GetString()); } + ADP_LOG(INFO) << "[GEOP] tf session: " << tf_session_ << ", graph id: " << graph_id << "Removed graph"; + ADP_LOG(INFO) << "Set graph_status to Init" << std::endl; + return Status::OK(); +} - build_flag_ = false; - compute_graph_empty_ = false; - return 0; +Status GeOp::CheckAndRemoveGraph(OpKernelContext *ctx, const uint32_t &graph_id) { + mutex_lock lock{graph_handler_.graph_mu}; + // Init状态不需要做check + if (graph_handler_.status == Init) { + return Status::OK(); + } + // To be compatible with old versions, we should check dynamic_input_ and dynamic_config + bool shape_changed = false; + bool is_set_dynamic_config = IsDynamicConfig(); + if ((!is_dynamic_input_) && (!is_set_dynamic_config)) { + shape_changed = MaybeUpdateShape(ctx); + } + if (shape_changed || IsGraphNeedRebuild(graph_id)) { + ADP_LOG(INFO) << "[GEOP] The graph need rebuild, graph id " + << graph_id << " ,need_change_precision_mode: " + << need_recover_precision_mode_; + // 让进入需要Remove状态时,其他线程需要等待他remove完 + graph_handler_.status = Removing; + while (graph_handler_.graph_run_num > 0) { + ADP_LOG(INFO) << "Remove wait, run_num: " << graph_handler_.graph_run_num + << ", graph_status: " << graph_handler_.status; + graph_handler_.cv.wait(lock); + } + auto ret = RemoveGraph(graph_id); + graph_handler_.status = Init; + // 当remove模型时,所有的线程需要重新做加载,mask右移一位,重置flag + graph_handler_.add_graph_mask = graph_handler_.add_graph_mask << 1UL; + // 重置addGraph的个数 + graph_handler_.add_total_num = 0; + graph_handler_.cv.notify_all(); + return ret; + } + return Status::OK(); } bool GeOp::IncrementGraphIdCount(uint32_t &graph_id) { @@ -854,16 +869,25 @@ bool GeOp::IncrementGraphIdCount(uint32_t &graph_id) { } auto it = session_and_graph_id_map_.find(tf_session_); if (it != session_and_graph_id_map_.end()) { - it->second = it->second + kMaxCacheNum; - graph_id = it->second; - return true; + auto iter_graph_id = it->second.find(geop_name_); + if (iter_graph_id != it->second.end()) { + graph_id = iter_graph_id->second; + } else { + graph_id = current_size_ * kMaxCacheNum + 1U; + it->second.insert(std::make_pair(geop_name_, graph_id)); + current_size_++; + } + } else { + graph_id = current_size_ * kMaxCacheNum + 1U; + std::unordered_map graph_id_map = {{geop_name_, graph_id}}; + session_and_graph_id_map_.insert(std::make_pair(tf_session_, graph_id_map)); + current_size_++; } - graph_id = 1; - session_and_graph_id_map_.insert(std::make_pair(tf_session_, graph_id)); + (void)add_graph_flag_map_.insert(std::make_pair(graph_id, 0UL)); return true; } -bool GeOp::DecrementGraphIdCount(const std::string &tf_session, uint32_t &graph_id) { +bool GeOp::DecrementGraphIdCount() { if (tf_session_.empty()) { ADP_LOG(ERROR) << "[GEOP] Sub graph id failed, tf session is empty."; LOG(ERROR) << "[GEOP] Sub graph id failed, tf session is empty."; @@ -872,17 +896,24 @@ bool GeOp::DecrementGraphIdCount(const std::string &tf_session, uint32_t &graph_ auto it = session_and_graph_id_map_.find(tf_session_); if (it != session_and_graph_id_map_.end()) { - if (it->second == 1) { - it->second = it->second - 1; - graph_id = it->second; - return true; + auto graph_name_iter = it->second.find(geop_name_); + if (graph_name_iter != it->second.end()) { + it->second.erase(graph_name_iter); + } else { + ADP_LOG(ERROR) << "[GEOP] Sub graph id failed, can not find geop name " << geop_name_; + LOG(ERROR) << "[GEOP] Sub graph id failed, can not find geop name " << geop_name_; + return false; + } + if (it->second.empty()) { + session_and_graph_id_map_.erase(it); + sess_init_flag_ = false; + SessionManager::GetInstance().DestroyGeSession(tf_session_); + ClearGraphIdCount(); } - it->second = it->second - kMaxCacheNum; - graph_id = it->second; return true; } - ADP_LOG(ERROR) << "[GEOP] Sub graph id failed, can not find tf session " << tf_session; - LOG(ERROR) << "[GEOP] Sub graph id failed, can not find tf session " << tf_session; + ADP_LOG(ERROR) << "[GEOP] Sub graph id failed, can not find tf session " << tf_session_; + LOG(ERROR) << "[GEOP] Sub graph id failed, can not find tf session " << tf_session_; return false; } @@ -894,6 +925,7 @@ void GeOp::ClearGraphIdCount() { } void GeOp::GetExecGraphId(uint32_t &cache_graph_id, std::vector input_shapes) { + mutex_lock lock{graph_handler_.graph_mu}; size_t num = cache_graphs_.size(); if (cache_graphs_.find(input_shapes) != cache_graphs_.end()) { auto iter = std::find_if(graph_counts_.begin(), graph_counts_.end(), @@ -904,7 +936,9 @@ void GeOp::GetExecGraphId(uint32_t &cache_graph_id, std::vector inp iter->second += 1; } cache_graph_id = cache_graphs_[input_shapes]; - build_flag_ = true; + ADP_LOG(INFO) << "Set graph_status to CompileDone when get exec graphid, graph_id: " << cache_graph_id; + graph_handler_.status = CompileDone; + graph_handler_.cv.notify_all(); } else { ADP_LOG(INFO) << "[GEOP] This is a dynamic shape neural network, we recommend setting jit_compile to false"; if (num >= kMaxCacheNum) { @@ -922,8 +956,10 @@ void GeOp::GetExecGraphId(uint32_t &cache_graph_id, std::vector inp } else { cache_graph_id = graph_id_ + num; } - build_flag_ = false; - compute_graph_empty_ = false; + add_graph_flag_map_.insert(std::make_pair(cache_graph_id, 0UL)); + ADP_LOG(INFO) << "Set graph_status to Init when has no cache graph, graph_id: " << cache_graph_id; + graph_handler_.status = Init; + graph_handler_.cv.notify_all(); } } @@ -954,8 +990,8 @@ PartialTensorShape GeOp::MakeCompatShape(const PartialTensorShape &a, const Part return MakeUnknownShape(b.dims()); } -bool GeOp::MaybeUpdateShape(OpKernelContext *const ctx) { - bool updated = false; +void GeOp::InitGraphShape(OpKernelContext *const ctx) { + mutex_lock lock{graph_handler_.graph_mu}; for (size_t i = 0UL; i < static_cast(ctx->num_inputs()); i++) { auto &shape = input_shapes_vec_[i]; auto &value_shape = ctx->input(static_cast(i)).shape(); @@ -966,12 +1002,15 @@ bool GeOp::MaybeUpdateShape(OpKernelContext *const ctx) { } else { shape = value_shape; } - updated = true; ADP_LOG(INFO) << "Init input " << i << " shape to " << shape.value().DebugString(); - continue; } + } +} +bool GeOp::MaybeUpdateShape(OpKernelContext *const ctx) { + for (size_t i = 0UL; i < static_cast(ctx->num_inputs()); i++) { + auto &shape = input_shapes_vec_[i]; + auto &value_shape = ctx->input(static_cast(i)).shape(); if (!shape.value().IsCompatibleWith(value_shape)) { - updated = true; ADP_LOG(INFO) << "Compat input " << i << " shape " << shape.value().DebugString() << " vs. " << value_shape.DebugString(); if ((jit_compile_ == "1") && (compile_dynamic_mode_ != "1")) { @@ -981,9 +1020,10 @@ bool GeOp::MaybeUpdateShape(OpKernelContext *const ctx) { shape = MakeCompatShape(shape.value(), value_shape); } ADP_LOG(INFO) << "Refresh input " << i << " shape to " << shape.value().DebugString(); + return true; } } - return updated; + return false; } Status GeOp::CreateGeSession() { @@ -1004,12 +1044,9 @@ Status GeOp::CreateGeSession() { ADP_LOG(INFO) << "[GePlugin] Initialize ge success."; first = false; } - if (!sess_init_flag_) { - mutex_lock lock{mu_}; - if (!SessionManager::GetInstance().GetOrCreateGeSession(tf_session_, ge_session_, sess_options_) || - tf_session_.empty() || ge_session_ == nullptr) { - return errors::Internal("Get ge session failed."); - } + if (!SessionManager::GetInstance().GetOrCreateGeSession(tf_session_, ge_session_, sess_options_) || + tf_session_.empty() || ge_session_ == nullptr) { + return errors::Internal("Get ge session failed."); } sess_init_flag_ = true; ADP_LOG(INFO) << "[GEOP] tf session: " << tf_session_ << " get ge session success."; @@ -1055,70 +1092,278 @@ PartialTensorShape GeOp::MakeUnknownShape(const int32_t &size) const { return status.ok() ? out_shape : kUnknownRankShape; } +Status GeOp::ParserGraph(OpKernelContext *ctx, std::vector &input_vec) { + // Get Graph + if (graph_handler_.status == CompileDone) { + return Status::OK(); + } + auto func_lib = ctx->function_library(); + if (func_lib == nullptr) { + return errors::Internal("function library is nullptr"); + } + FunctionLibraryDefinition *flib_def = + const_cast(func_lib->GetFunctionLibraryDefinition()); + if (flib_def == nullptr) { + return errors::Internal("flib_def is nullptr"); + } + // Build GraphDef from FunctionDef + GraphDef ori_graph_def; + bool is_allreduce = false; + auto ret = BuildGraphDef(*flib_def, input_vec, ori_graph_def, is_initialized_graph_, is_allreduce); + if (!ret.ok()) { + return ret; + } + if (kDumpGraph) { + const std::string pbtxt_path = GetDumpPath() + "TF_" + geop_name_.c_str() + ".pbtxt"; + (void)WriteTextProto(Env::Default(), pbtxt_path, ori_graph_def); + } + ADP_LOG(INFO) << "[GEOP] TFadpter process graph success, GE parser begin, kernel_name: " << geop_name_ + << " , tf session: " << tf_session_; + const std::string compute_graph_name = "ge_default_" + CurrentTimeInStr(); + graph_handler_.graph = std::make_shared(compute_graph_name.c_str()); + if (graph_handler_.graph == nullptr) { + return errors::Internal("compute graph is nullptr"); + } + // parser, tensorflow graph to ge graph + ret = DoGraphParser(graph_handler_.graph, flib_def, ori_graph_def); + if (!ret.ok()) { + return ret; + } + ADP_LOG(INFO) << "[GEOP] Tensorflow graph parse to ge graph success, kernel_name: " << geop_name_ + << ", tf session: " << tf_session_ + << ", iteration_per_loop: " << iteration_per_loop_ << + ", need iteration: " << need_iteration_; + return SetGraphOptions(); +} + +Status GeOp::AddGraph(OpKernelContext *ctx, const uint32_t &graph_id) { + // 当此线程未add过图,且总大小小于maxNum,需要去做add + if (((add_graph_flag_map_[graph_id] & graph_handler_.add_graph_mask) == graph_handler_.add_graph_mask) || + (graph_handler_.add_total_num >= kMaxAddNum)) { + return Status::OK(); + } + // call ge session addGraph api + auto graph_options = graph_options_; + if (is_aoe_) { + graph_options["ge.buildMode"] = "normal"; + } + if ((is_dynamic_getnext_ != "1") && (iteration_per_loop_ <= 1)) { + SetReuseOptions("ge.exec.inputReuseMemIndexes", ctx->num_inputs(), + sess_options_, init_options_, graph_options); + } + SetReuseOptions("ge.exec.outputReuseMemIndexes", + ctx->num_outputs(), sess_options_, init_options_, graph_options); + ADP_LOG(EVENT) << "[GEOP] call ge session add graph jit_compile: " << jit_compile_; + graph_options["ge.exec.graphIOMemAllocMode"] = "ByGE"; + const auto graph_option_ascend_string = ChangeStringToAscendString(graph_options); + ADP_LOG(INFO) << "Graph options: "; + NpuAttrs::LogOptions(graph_options); + ge::Graph ge_graph = ge::GraphUtilsEx::CreateGraphFromComputeGraph(graph_handler_.graph); + if (iteration_per_loop_ > 1) { + ge_graph.SetNeedIteration(need_iteration_); + } + + auto status = ge_session_->AddGraph(graph_id, ge_graph, graph_option_ascend_string); + std::stringstream ss; + if (status != ge::SUCCESS) { + std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); + ss << "[GEOP] call ge session add graph failed, kernel: " << geop_name_ << ", tf session: " << tf_session_ + << ", graph id: " << graph_id << std::endl + << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + return errors::Internal(ss.str()); + } + add_graph_flag_map_[graph_id] = graph_handler_.add_graph_mask; + graph_handler_.add_total_num++; + ADP_LOG(INFO) << "[GEOP] Add graph to ge session success, kernel_name: " << geop_name_ + << ", tf session: " << tf_session_ << ", graph id: " << graph_id + << ", add_num: " << graph_handler_.add_total_num; + return Status::OK(); +} + +Status GeOp::BuildGraph(const uint32_t &graph_id, const std::vector &inputs) { + if (graph_handler_.status == CompileDone) { + return Status::OK(); + } + ge::Status build_graph_status = ge_session_->BuildGraph(graph_id, inputs); + std::stringstream ss; + if (build_graph_status != ge::SUCCESS) { + ss << "[GEOP] GE session build graph failed, domi_ret : " << build_graph_status << std::endl + << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + return errors::Internal(ss.str()); + } + ADP_LOG(INFO) << "Set graph_status to CompileDone"; + graph_handler_.status = CompileDone; + graph_handler_.cv.notify_all(); + LOG(INFO) << "The model has been compiled on the Ascend AI processor, current graph id is: " << graph_id; + return Status::OK(); +} + +Status GeOp::RunGraph(OpKernelContext *ctx, const uint32_t &graph_id, + const std::vector &inputs, + ge::RunAsyncCallback callback) { + // call ge session runGraphAsync api + mutex_lock lock(graph_handler_.graph_mu); + while (graph_handler_.status == Init) { + ADP_LOG(INFO) << "RunGraph wait, graph_status: " << graph_handler_.status; + graph_handler_.cv.wait(lock); + } + ADP_LOG(INFO) << "[GEOP] Call ge session RunGraphAsync, kernel_name: " + << geop_name_ << ", tf session: " << tf_session_ + << ", graph id: " << graph_id; + ge::Status run_graph_status = ge_session_->RunGraphAsync(graph_id, inputs, callback); + std::stringstream ss; + if (run_graph_status != ge::SUCCESS) { + std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); + ss << "[GEOP] call ge session RunGraphAsync Failed, kernel:" << geop_name_ << ", tf session: " << tf_session_ + << ", graph id: " << graph_id << std::endl + << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + return errors::Internal(ss.str()); + } + ADP_LOG(INFO) << "End RunGraph run_num: " << graph_handler_.graph_run_num; + graph_handler_.graph_run_num++; + graph_handler_.cv.notify_all(); + return Status::OK(); +} + +Status GeOp::SetGraphOptions() { + // convert to ge::graph + if (graph_options_.count("input_format") != 0) { + ADP_LOG(INFO) << "graph_options_[\"input_format\"] = " << graph_options_["input_format"]; + } + + if (iteration_per_loop_ > 1) { + graph_options_["iterations_per_loop"] = std::to_string(iteration_per_loop_); + } + + const auto cahce_option_iter = sess_options_.find("ge.graph_compiler_cache_dir"); + if (cahce_option_iter != sess_options_.cend() && !cahce_option_iter->second.empty()) { + graph_options_["ge.graph_key"] = geop_name_; + } + + if (is_host_graph_) { + ADP_LOG(INFO) << "[GEOP] set graph option."; + graph_options_["ge.exec.placement"] = "HOST"; + } + graph_options_["ge.shape_generalized_build_mode"] = "shape_precise"; + if (!recompute_mode_.empty()) { + graph_options_["ge.recompute"] = recompute_mode_; + } + if (!max_key_num_.empty()) { + graph_options_["ge.max_key_num"] = max_key_num_; + } + if (!embedding_dim_.empty()) { + graph_options_["ge.embedding_dim"] = embedding_dim_; + } + if (!use_counter_filter_.empty()) { + graph_options_["ge.use_counter_filter"] = use_counter_filter_; + } + if (!padding_key_.empty()) { + graph_options_["ge.padding_key"] = padding_key_; + } + if (!embedding_flags_.empty()) { + graph_options_["ge.embedding_flags"] = embedding_flags_; + } + SetDynamicInput(); + graph_options_["ge.exec.isVarInitGraph"] = is_var_init_graph_; + graph_options_["ge.jit_compile"] = jit_compile_; + graph_options_["ge.exec.overflow"] = "1"; + graph_options_["ge.graphLevelSat"] = (mix_compile_mode_ == "0") ? "1" : "0"; + return DoAccelerateTrain(); +} + +Status GeOp::CompileGraph(OpKernelContext *ctx, std::vector &input_vec, + const uint32_t &graph_id, const std::vector &inputs, + std::vector input_shapes) { + mutex_lock lock{graph_handler_.graph_mu}; + while (graph_handler_.status == Removing) { + ADP_LOG(INFO) << "Compile graph wait, status: " << graph_handler_.status; + graph_handler_.cv.wait(lock); + } + auto ret = ParserGraph(ctx, input_vec); + if (!ret.ok()) { + return ret; + } + /* if graph is init verify graph, return */ + if (is_initialized_graph_) { + Tensor initialized_tensor(ctx->expected_output_dtype(0), TensorShape({0})); + ctx->set_output(0, initialized_tensor); + ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is initialize, kernel_name:" + << geop_name_ << ", ret_status:" << ToString(ge::SUCCESS) + << " , tf session: " << tf_session_ << " ,graph id: " << graph_id; + return Status::OK(); + } + if (graph_handler_.graph->GetAllNodesSize() == 0UL) { + ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is empty, kernel_name:" << geop_name_ + << ", ret_status:" << ToString(ge::SUCCESS) << " , tf session: " << tf_session_ + << " ,graph id: " << graph_id; + is_empty_graph_ = true; + return Status::OK(); + } + ret = AddGraph(ctx, graph_id); + if (!ret.ok()) { + return ret; + } + const bool is_set_dynamic_config = IsDynamicConfig(); + const bool is_lazy_recompile_mode = IsLazyCompile(); + if (!is_set_dynamic_config && is_lazy_recompile_mode) { + cache_graphs_.insert(std::make_pair(input_shapes, graph_id)); + graph_counts_.push_back(std::make_pair(input_shapes, 1)); + } + ret = BuildGraph(graph_id, inputs); + if (!ret.ok()) { + return ret; + } + return Status::OK(); +} + +bool GeOp::IsLazyCompile() { + return ((dynamic_input_ == "1") && (dynamic_graph_execute_mode_ == "lazy_recompile")); +} + void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { - // ctx is not nullptr OP_REQUIRES_ASYNC(ctx, init_flag_, errors::InvalidArgument("GeOp not Initialize success."), done); - if (!sess_init_flag_) { - if (job_type_ != "localhost") { // in ps mode : ctx->session_handle() is empty - tf_session_ = "ps_worker_session"; - ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " when in ps mode."; - } - if (tf_session_.empty()) { - tf_session_ = ctx->session_handle(); - ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " from session handle."; - } - OP_REQUIRES_ASYNC(ctx, IncrementGraphIdCount(graph_id_), errors::Internal("Get ge session failed."), done); - - ADP_LOG(INFO) << "[GEOP] Node name: " << ctx->op_kernel().name() << " , tf session: " << tf_session_; - if (!init_options_["ge.jobType"].empty() && !init_options_["ge.tuningPath"].empty()) { - uint32_t device_id = 0; - OP_REQUIRES_OK_ASYNC(ctx, GetEnvDeviceID(device_id), done); - ADP_LOG(INFO) << "[GEOP] in tuning func, aoe_mode:" << init_options_["ge.jobType"] - << ", work_path:" << init_options_["ge.tuningPath"] - << ", distribute_config:" << init_options_["distribute_config"]; - tune_options_.insert(init_options_.cbegin(), init_options_.cend()); - tune_options_.insert({"devices", std::to_string(device_id)}); - tune_options_.insert(sess_options_.cbegin(), sess_options_.cend()); - tune_options_.insert({"work_path", init_options_["ge.tuningPath"]}); - tune_options_.insert({"job_type", init_options_["ge.jobType"]}); - // aoe ini - if (!tuned_initialize_flag_) { - std::map global_options; - global_options.insert( - {ge::AscendString("work_path"), ge::AscendString(init_options_["ge.tuningPath"].c_str())}); - global_options.insert({ge::AscendString("job_type"), ge::AscendString(init_options_["ge.jobType"].c_str())}); - global_options.insert({ge::AscendString("ge.resourceConfigPath"), - ge::AscendString(sess_options_["ge.resourceConfigPath"].c_str())}); - AoeStatus init_ret = (*aoe_initialize_)(global_options); - OP_REQUIRES_ASYNC(ctx, init_ret == Aoe::AOE_SUCCESS, - errors::Internal("[GEOP] exec aoe initialize func failed[", init_ret, "]."), done); - tuned_initialize_flag_ = true; + { + mutex_lock lock{mu_}; + if (!sess_init_flag_) { + if (job_type_ != "localhost") { // in ps mode : ctx->session_handle() is empty + tf_session_ = "ps_worker_session"; + ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " when in ps mode."; + } + if (tf_session_.empty()) { + tf_session_ = ctx->session_handle(); + ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " from session handle."; } + geop_name_ = ctx->op_kernel().name(); + OP_REQUIRES_ASYNC(ctx, IncrementGraphIdCount(graph_id_), errors::Internal("Get ge session failed."), done); + OP_REQUIRES_OK_ASYNC(ctx, CreateGeSession(), done); + ADP_LOG(INFO) << "[GEOP] Node name: " << geop_name_ << " , tf session: " << tf_session_; + } + } + if (is_aoe_) { + ADP_LOG(INFO) << "[GEOP] in tuning func, aoe_mode:" << init_options_["ge.jobType"] + << ", work_path:" << init_options_["ge.tuningPath"] + << ", distribute_config:" << init_options_["distribute_config"]; + // aoe ini + mutex_lock lock{mu_}; + if (!tuned_initialize_flag_) { + std::map global_options; + global_options.insert( + {ge::AscendString("work_path"), ge::AscendString(init_options_["ge.tuningPath"].c_str())}); + global_options.insert({ge::AscendString("job_type"), ge::AscendString(init_options_["ge.jobType"].c_str())}); + global_options.insert({ge::AscendString("ge.resourceConfigPath"), + ge::AscendString(sess_options_["ge.resourceConfigPath"].c_str())}); + AoeStatus init_ret = (*aoe_initialize_)(global_options); + OP_REQUIRES_ASYNC(ctx, init_ret == Aoe::AOE_SUCCESS, + errors::Internal("[GEOP] exec aoe initialize func failed[", init_ret, "]."), done); + tuned_initialize_flag_ = true; } } - // convert input to const OP_REQUIRES_OK_ASYNC(ctx, GraphInputConvertToConst(ctx), done); - std::string geop_name = ctx->op_kernel().name(); uint32_t num_inputs = static_cast(ctx->num_inputs()); ADP_LOG(INFO) << "[GEOP] Begin GeOp::ComputeAsync" - << ", kernel_name:" << geop_name << ", num_inputs:" << num_inputs + << ", kernel_name:" << geop_name_ << ", num_inputs:" << num_inputs << ", num_outputs:" << ctx->num_outputs(); - int64 startTime = InferShapeUtil::GetCurrentTimestap(); - int64 endTime = 0; - - // To be compatible with old versions, we should check dynamic_input_ and dynamic_config - bool is_set_dynamic_config = IsDynamicConfig(); - if (dynamic_input_ != "1" && !is_set_dynamic_config) { - bool shape_changed = MaybeUpdateShape(ctx); - if (build_flag_ && shape_changed) { - ge::Status status = ge_session_->RemoveGraph(graph_id_); - if (status != ge::SUCCESS) { - ADP_LOG(WARNING) << "[GEOP] GE remove graph failed, ret : " << ToString(status) << ", graph_id: " << graph_id_; - } - build_flag_ = false; - } - } std::vector input_vec; std::vector input_shapes; @@ -1127,262 +1372,87 @@ void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { // if input shapes changed, cache graphs uint32_t cache_graph_id = graph_id_; - bool is_lazy_recompile_mode = (dynamic_input_ == "1") && (dynamic_graph_execute_mode_ == "lazy_recompile"); - ADP_LOG(INFO) << "is_set_dynamic_config: " << is_set_dynamic_config - << " is_aoe_: " << is_aoe_ + bool is_lazy_recompile_mode = IsLazyCompile(); + ADP_LOG(INFO) << " is_aoe_: " << is_aoe_ << " is_lazy_recompile_mode: " << is_lazy_recompile_mode; + InitGraphShape(ctx); if (is_aoe_) { - if (is_set_dynamic_config) { - ADP_LOG(ERROR) << "dynamic input config can not use with mstuning."; - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("dynamic input config can not use with mstuning."), done); - return; - } + bool is_set_dynamic_config = IsDynamicConfig(); + OP_REQUIRES_ASYNC(ctx, !is_set_dynamic_config, + errors::Internal("dynamic input config can not use with mstuning."), done); auto input_vec_aoe = input_vec; - if (RunTuning(input_vec_aoe, inputs, ctx) != 0) { - ADP_LOG(ERROR) << "RunTuning fail."; - std::stringstream ss; - ss << std::endl << ge::GEGetErrorMsgV2().GetString(); - OP_REQUIRES_ASYNC(ctx, false, errors::Internal(ss.str()), done); - } - if (InitRebuildFlag(cache_graph_id) != 0) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("Failed to check rebuild flag"), done); - return; - } - ADP_LOG(INFO) << geop_name << " RunTuning finish."; - } else if (is_set_dynamic_config) { - if (InitRebuildFlag(cache_graph_id) != 0) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("Failed to check rebuild flag"), done); - return; - } - } else { + OP_REQUIRES_ASYNC(ctx, RunTuning(input_vec_aoe, inputs, ctx) == 0, + errors::Internal("RunTuning fail.\n", ge::GEGetErrorMsgV2().GetString()), done); + ADP_LOG(INFO) << geop_name_ << " RunTuning finish."; + } + + if (is_lazy_recompile_mode) { // in dynamic input mode, cache graphs. - if (is_lazy_recompile_mode) { - GetExecGraphId(cache_graph_id, input_shapes); - } - if (InitRebuildFlag(cache_graph_id) != 0) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("Failed to check rebuild flag"), done); - return; - } + GetExecGraphId(cache_graph_id, input_shapes); } - if (!build_flag_) { - // Get Graph - OP_REQUIRES_ASYNC(ctx, ctx->function_library() != nullptr, errors::Internal("function library is nullptr"), done); - FunctionLibraryDefinition *flib_def = - const_cast(ctx->function_library()->GetFunctionLibraryDefinition()); - OP_REQUIRES_ASYNC(ctx, flib_def != nullptr, errors::Internal("flib_def is nullptr"), done); - - // Build GraphDef from FunctionDef - GraphDef ori_graph_def; - bool is_allreduce = false; - OP_REQUIRES_OK_ASYNC(ctx, BuildGraphDef(*flib_def, input_vec, ori_graph_def, is_initialized_graph_, is_allreduce), - done); - - /* if graph is init verify graph, return */ - if (this->is_initialized_graph_) { - Tensor initialized_tensor(ctx->expected_output_dtype(0), TensorShape({0})); - ctx->set_output(0, initialized_tensor); - done(); - return; - } - if (kDumpGraph) { - const std::string pbtxt_path = GetDumpPath() + "TF_" + geop_name.c_str() + ".pbtxt"; - (void) WriteTextProto(Env::Default(), pbtxt_path, ori_graph_def); - } - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(EVENT) << "[GEOP] In GEOP computeAsync, kernel_name: " << geop_name << " ,TFadapter cost time: [" - << ((endTime - startTime) / kMicrosToMillis) << " ms]."; - ADP_LOG(INFO) << "[GEOP] TFadpter process graph success, GE parser begin, kernel_name: " << geop_name - << " , tf session: " << tf_session_ << " , graph id: " << cache_graph_id; - ge::ComputeGraphPtr compute_graph = nullptr; - try { - const std::string compute_graph_name = "ge_default_" + CurrentTimeInStr(); - compute_graph = std::make_shared(compute_graph_name.c_str()); - } catch (...) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("make shared failed"), done); - } - OP_REQUIRES_ASYNC(ctx, compute_graph != nullptr, errors::InvalidArgument("create ComputeGraph failed"), done); - // parser, tensorflow graph to ge graph - OP_REQUIRES_OK_ASYNC(ctx, DoGraphParser(compute_graph, flib_def, ori_graph_def), done); - ADP_LOG(INFO) << "[GEOP] Tensorflow graph parse to ge graph success, kernel_name: " << geop_name - << ", tf session: " << tf_session_ << " , graph id: " << cache_graph_id - << ", iteration_per_loop: " << iteration_per_loop_ << ", need iteration: " << this->need_iteration_; - size_t nodes = compute_graph->GetAllNodesSize(); - if (nodes == 0) { - build_flag_ = true; - compute_graph_empty_ = true; - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is empty, kernel_name:" << geop_name - << ", ret_status:" << ToString(ge::SUCCESS) << " , tf session: " << tf_session_ - << " ,graph id: " << cache_graph_id << " [" << ((endTime - startTime) / kMicrosToMillis) << " ms]"; - done(); - return; - } - // convert to ge::graph - if (graph_options_.count("input_format") != 0) { - ADP_LOG(INFO) << "graph_options_[\"input_format\"] = " << graph_options_["input_format"]; - } - ge::Graph ge_graph = ge::GraphUtilsEx::CreateGraphFromComputeGraph(compute_graph); - if (iteration_per_loop_ > 1) { - ge_graph.SetNeedIteration(this->need_iteration_); - graph_options_["iterations_per_loop"] = std::to_string(iteration_per_loop_); - } + OP_REQUIRES_OK_ASYNC(ctx, CheckAndRemoveGraph(ctx, cache_graph_id), done); - const auto cahce_option_iter = sess_options_.find("ge.graph_compiler_cache_dir"); - if (cahce_option_iter != sess_options_.cend() && !cahce_option_iter->second.empty()) { - graph_options_["ge.graph_key"] = geop_name; - } + OP_REQUIRES_OK_ASYNC(ctx, CompileGraph(ctx, input_vec, cache_graph_id, inputs, input_shapes), done); - if (is_host_graph_) { - ADP_LOG(INFO) << "[GEOP] set graph option."; - graph_options_["ge.exec.placement"] = "HOST"; - } - graph_options_["ge.shape_generalized_build_mode"] = "shape_precise"; - if (!recompute_mode_.empty()) { - graph_options_["ge.recompute"] = recompute_mode_; - } - if (!max_key_num_.empty()) { - graph_options_["ge.max_key_num"] = max_key_num_; - } - if (!embedding_dim_.empty()) { - graph_options_["ge.embedding_dim"] = embedding_dim_; - } - if (!use_counter_filter_.empty()) { - graph_options_["ge.use_counter_filter"] = use_counter_filter_; - } - if (!padding_key_.empty()) { - graph_options_["ge.padding_key"] = padding_key_; - } - if (!embedding_flags_.empty()) { - graph_options_["ge.embedding_flags"] = embedding_flags_; - } - SetDynamicInput(); - graph_options_["ge.exec.isVarInitGraph"] = is_var_init_graph_; - graph_options_["ge.jit_compile"] = jit_compile_; - graph_options_["ge.exec.overflow"] = "1"; - graph_options_["ge.graphLevelSat"] = (mix_compile_mode_ == "0") ? "1" : "0"; - OP_REQUIRES_OK_ASYNC(ctx, DoAccelerateTrain(), done); - // call ge session addGraph api - auto graph_options = graph_options_; - if (is_aoe_) { - graph_options["ge.buildMode"] = "normal"; - } - if ((is_dynamic_getnext_ != "1") && (iteration_per_loop_ <= 1)) { - SetReuseOptions("ge.exec.inputReuseMemIndexes", ctx->num_inputs(), sess_options_, init_options_, graph_options); - } - SetReuseOptions("ge.exec.outputReuseMemIndexes", ctx->num_outputs(), sess_options_, init_options_, graph_options); - ADP_LOG(EVENT) << "[GEOP] call ge session add graph jit_compile: " << jit_compile_; - graph_options["ge.exec.graphIOMemAllocMode"] = "ByGE"; - OP_REQUIRES_OK_ASYNC(ctx, CreateGeSession(), done); - auto const graph_option_ascend_string = ChangeStringToAscendString(graph_options); - ADP_LOG(INFO) << "Graph options: "; - NpuAttrs::LogOptions(graph_options); - auto status = ge_session_->AddGraph(cache_graph_id, ge_graph, graph_option_ascend_string); - std::stringstream ss; - if (status != ge::SUCCESS) { - std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); - ADP_LOG(FATAL) << "[GEOP] call ge session add graph failed, kernel: " << geop_name << " ,tf session: " - << tf_session_ << ", graph id: " << cache_graph_id; - - ss << "[GEOP] call ge session add graph failed, kernel: " << geop_name << ", tf session: " << tf_session_ - << ", graph id: " << cache_graph_id << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - } - OP_REQUIRES_ASYNC(ctx, status == ge::SUCCESS, errors::Internal(ss.str()), done); - add_graph_flag_ = true; - ADP_LOG(INFO) << "[GEOP] Add graph to ge session success, kernel_name: " << geop_name - << ", tf session: " << tf_session_ << ", graph id: " << cache_graph_id; - - build_flag_ = true; - if (!is_set_dynamic_config && is_lazy_recompile_mode) { - cache_graphs_.insert(std::make_pair(input_shapes, cache_graph_id)); - graph_counts_.push_back(std::make_pair(input_shapes, 1)); - } - if (need_compile_graph_first_) { - ge::Status build_graph_status = ge_session_->BuildGraph(cache_graph_id, inputs); - std::stringstream ss; - if (build_graph_status != ge::SUCCESS) { - ss << "[GEOP] GE session build graph failed, domi_ret : " << build_graph_status << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - } - OP_REQUIRES_ASYNC(ctx, build_graph_status == ge::SUCCESS, errors::Internal(ss.str()), done); - ADP_LOG(INFO) << "[GEOP] Build graph success."; - done(); - return; - } - LOG(INFO) << "The model has been compiled on the Ascend AI processor, current graph id is: " << cache_graph_id; - } else { - if (compute_graph_empty_) { - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is empty, kernel_name:" << geop_name - << ", ret_status:" << ToString(ge::SUCCESS) << " , tf session: " << tf_session_ - << " ,graph id: " << cache_graph_id << " [" << ((endTime - startTime) / kMicrosToMillis) << " ms]"; - done(); - return; - } + if (is_initialized_graph_ || is_empty_graph_) { + done(); + return; } - int64 run_start_time = InferShapeUtil::GetCurrentTimestap(); - auto callback = [done, ctx, run_start_time](ge::Status ge_status, std::vector &outputs) { + auto callback = [done, ctx, run_start_time, this](ge::Status ge_status, std::vector &outputs) { + ExitCallbackGuarder guarder([ctx, this] () { + mutex_lock lock(graph_handler_.graph_mu); + ADP_LOG(INFO) << "Callback end, run_num: " << graph_handler_.graph_run_num; + graph_handler_.graph_run_num--; + graph_handler_.cv.notify_all(); + }); if (ge_status == ge::SUCCESS) { if (BuildOutputTensorInfo(ctx, outputs) != Status::OK()) { - ADP_LOG(FATAL) << ctx->op_kernel().name() << " GEOP::DoRunAsync get output failed."; + ADP_LOG(ERROR) << ctx->op_kernel().name() << " GEOP::DoRunAsync get output failed."; std::stringstream ss; ss << ctx->op_kernel().name() << "GEOP::DoRunAsync get output failed." << std::endl << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); OP_REQUIRES_ASYNC(ctx, false, errors::Internal(ss.str()), done); - return; } } else if (ge_status == ge::END_OF_SEQUENCE) { ctx->SetStatus(errors::OutOfRange("End of sequence")); ADP_LOG(WARNING) << "[GEOP] Out of range: End of sequence."; LOG(WARNING) << "[GEOP] Out of range: End of sequence."; } else if (ge_status != ge::SUCCESS) { - std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); - ADP_LOG(FATAL) << ctx->op_kernel().name() << "GEOP::::DoRunAsync Failed"; + ADP_LOG(ERROR) << ctx->op_kernel().name() << "GEOP::::DoRunAsync Failed"; std::stringstream ss; ss << ctx->op_kernel().name() << "GEOP::::DoRunAsync Failed" << std::endl << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); OP_REQUIRES_ASYNC(ctx, false, errors::Internal(ss.str()), done); - return; } int64 run_end_time = InferShapeUtil::GetCurrentTimestap(); ADP_LOG(INFO) << "[GEOP] RunGraphAsync callback, status:" << ge_status << ", kernel_name:" << ctx->op_kernel().name() << "[ " << (run_end_time - run_start_time) << "us]"; done(); }; - - // call ge session runGraphAsync api - ADP_LOG(INFO) << "[GEOP] Call ge session RunGraphAsync, kernel_name: " << geop_name << ", tf session: " << tf_session_ - << ", graph id: " << cache_graph_id; - ge::Status run_graph_status = ge_session_->RunGraphAsync(cache_graph_id, inputs, callback); - std::stringstream ss; - if (run_graph_status != ge::SUCCESS) { - std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); - ADP_LOG(FATAL) << "[GEOP] call ge session RunGraphAsync Failed, kernel:" << geop_name << " ,tf session: " - << tf_session_ << " ,graph id: " << cache_graph_id; - ss << "[GEOP] call ge session RunGraphAsync Failed, kernel:" << geop_name << ", tf session: " << tf_session_ - << ", graph id: " << cache_graph_id << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - } - OP_REQUIRES_ASYNC(ctx, run_graph_status == ge::SUCCESS, errors::Internal(ss.str()), done); - - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, kernel_name: " << geop_name - << ", ret_status: " << ToString(run_graph_status) << ", tf session : " << tf_session_ - << ", graph id: " << cache_graph_id << ", cost [" << ((endTime - startTime) / kMicrosToMillis) << "ms]"; + OP_REQUIRES_OK_ASYNC(ctx, RunGraph(ctx, cache_graph_id, inputs, callback), done); + int64_t end_time = InferShapeUtil::GetCurrentTimestap(); + ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, kernel_name: " << geop_name_ + << ", tf session : " << tf_session_ << ", graph id: " + << cache_graph_id << ", cost [" + << ((end_time - run_start_time) / kMicrosToMillis) << "ms]"; return; } void GeOp::ChangeChannelNameAttr(NodeDef &node_def) const { const std::string pre_channel_name = node_def.attr().at("channel_name").s(); + std::cout << "channel_name: " << pre_channel_name << std::endl; uint32_t device_id = 0; (void) GetEnvDeviceID(device_id); AttrValue channel_name = AttrValue(); channel_name.set_s(std::to_string( std::hash{}(tf_session_ + pre_channel_name + "_device_" + std::to_string(device_id)))); + std::cout << "channel_name: " << channel_name.s() << std::endl; (*node_def.mutable_attr())["channel_name"] = channel_name; + std::cout << "[GEOP] changed the value of channel_name attr of node: " << node_def.name() << " to " + << channel_name.s() << std::endl; ADP_LOG(INFO) << "[GEOP] changed the value of channel_name attr of node: " << node_def.name() << " to " << channel_name.s(); } @@ -1443,12 +1513,6 @@ void GeOp::AddNodeAttrs(Node *node, bool &is_initialize) { is_host_graph_ = true; ADP_LOG(INFO) << "[GEOP] variable subgraph is initialized in host."; } - if (!need_compile_graph_first_) { - if (node->name().find("NpuCompile") != std::string::npos) { - need_compile_graph_first_ = true; - ADP_LOG(INFO) << "[GEOP] set subgraph compile first."; - } - } // clear device info && attr node_def.set_device(""); if (node_def.op() == "Const") { @@ -1589,7 +1653,7 @@ void GeOp::HandleDpOpAndGetNextNodes(Graph &graph) { remove_nodes.push_back(iterator_node); } } - if (dynamic_input_ == "1" && dynamic_graph_execute_mode_ == "lazy_recompile") { + if (IsLazyCompile()) { graph_options_["ge.exec.enableCopyOutputAddr"] = "1"; } } @@ -1951,6 +2015,7 @@ void GeOp::SetShapesToOutputDesc(const std::vector &input_shapes, c } int GeOp::RunTuning(std::vector &input_vec, std::vector &inputs, const OpKernelContext *const ctx) { + mutex_lock lock{graph_handler_.graph_mu}; if (tuned_flag_.test_and_set()) { ADP_LOG(INFO) << ctx->op_kernel().name() << " has tuned."; return 0; @@ -2040,11 +2105,6 @@ int GeOp::RunTuning(std::vector &input_vec, std::vector &inp } { GE_MAKE_GUARD(destroy, callback); - const auto &ge_status = CreateGeSession(); - if (!ge_status.ok()) { - ADP_LOG(ERROR) << "get ge session failed[" << ge_status.error_message() << "]."; - return -1; - } // share ge_session to aoe AoeStatus set_ret = (*aoe_set_gesession_)(session_id_, ge_session_); if (set_ret != Aoe::AOE_SUCCESS) { @@ -2190,7 +2250,7 @@ Status GeOp::AnalyzeStringInput(ge::Tensor &input, uint64_t count, const std::st } Status GeOp::GraphInputConvertToConst(OpKernelContext *ctx) { - mutex_lock lock{mu_}; + mutex_lock lock{graph_handler_.graph_mu}; if (is_input_convert_) { return Status::OK(); } @@ -2300,9 +2360,9 @@ Status GeOp::GraphCheckInputEqualConstOp(Tensor &tensor, int32_t index, bool &is Status GeOp::BuildInputTensorInfo(OpKernelContext *const ctx, std::vector &input_vec, std::vector &input_shapes, std::vector &inputs) { // ctx is not nullptr + mutex_lock lock{graph_handler_.graph_mu}; int num_inputs = ctx->num_inputs(); std::string cur_input_shapes; - // populate inputs for (int i = 0; i < num_inputs; i++) { Tensor tensor(ctx->input(i)); @@ -2559,7 +2619,8 @@ const std::string GeOp::SERIALIZE_FORMAT = "serialize_format"; const std::string GeOp::SERIALIZE_DATATYPE = "serialize_datatype"; const std::string GeOp::SERIALIZE_SHAPE = "serialize_shape"; const std::string GeOp::SubGraph = "SubGraph"; -std::unordered_map GeOp::session_and_graph_id_map_; +std::unordered_map> GeOp::session_and_graph_id_map_; +uint32_t GeOp::current_size_ = 0U; REGISTER_KERNEL_BUILDER(Name("GeOp").Device(DEVICE_CPU), GeOp); } // namespace tensorflow diff --git a/tf_adapter/kernels/geop_npu.h b/tf_adapter/kernels/geop_npu.h index 9758464632f79b89e331e60f887950b38b61c136..2d6d5f86209af058efca0448441fbb2a40e89314 100644 --- a/tf_adapter/kernels/geop_npu.h +++ b/tf_adapter/kernels/geop_npu.h @@ -19,6 +19,7 @@ #include #include +#include #include "tensorflow/core/common_runtime/function.h" #include "tensorflow/core/framework/op_kernel.h" @@ -46,6 +47,22 @@ using AoeSetTuningGraphInputFunc = AoeStatus (*)(SessionId, const std::vector &); +enum GraphStatus { + Init, + CompileDone, + Removing +}; + +struct GraphHandler { + GraphStatus status = Init; + mutex graph_mu; + condition_variable cv; + int32_t graph_run_num = 0; + uint64_t add_graph_mask = 1UL; + int32_t add_total_num = 0; + ge::ComputeGraphPtr graph; +}; + class GeOp : public AsyncOpKernel { public: explicit GeOp(OpKernelConstruction *ctx); @@ -94,6 +111,17 @@ public: // prepare output tensor Status BuildOutTensorInfo(OpKernelContext *ctx); + Status ParserGraph(OpKernelContext *ctx, std::vector &input_vec); + Status AddGraph(OpKernelContext *ctx, const uint32_t &graph_id); + Status BuildGraph(const uint32_t &graph_id, + const std::vector &inputs); + Status RunGraph(OpKernelContext *ctx, const uint32_t &graph_id, + const std::vector &inputs, + ge::RunAsyncCallback callback); + Status CompileGraph(OpKernelContext *ctx, std::vector &input_vec, + const uint32_t &graph_id, const std::vector &inputs, + std::vector input_shapes); + bool IsLazyCompile(); // create input and output desc for NodeDef Status GenerateDesc(Node *&node); @@ -108,7 +136,7 @@ public: void AddNodeAttrs(Node *node, bool &is_initialize); - int InitRebuildFlag(uint32_t cache_graph_id); + Status CheckAndRemoveGraph(OpKernelContext *ctx, const uint32_t &graph_id); bool IsGraphNeedRebuild(const uint32_t cache_graph_id); Status DoAccelerateTrain(); Status NeedRecompileWhenAccelerateTrainOn(bool &need_recompile); @@ -120,7 +148,7 @@ public: Status RecoverPrecisionMode(); bool IncrementGraphIdCount(uint32_t &graph_id); - bool DecrementGraphIdCount(const std::string &tf_session, uint32_t &graph_id); + bool DecrementGraphIdCount(); void ClearGraphIdCount(); @@ -138,13 +166,13 @@ public: void AnalyzeInputDesc(void *tensor_ptr, ge::Tensor &input, ge::DataType type, std::vector &input_shapes) const; - + Status RemoveGraph(const uint32_t &graph_id); int RunTuning(std::vector &input_vec, std::vector &inputs, const OpKernelContext *const ctx); std::string BuildSubGraph(FunctionLibraryDefinition *flib_def, const std::string &graph); void SetDynamicInput(); - + Status SetGraphOptions(); void ProcessDpOpFuncDef(const Node &node) const; void BuildQueueDataAndGetNextFromQueue(Graph &graph, const Node &getnext_node, @@ -160,6 +188,7 @@ public: PartialTensorShape MakeCompatShape(const PartialTensorShape &a, const PartialTensorShape &b) const; + void InitGraphShape(OpKernelContext *const ctx); bool MaybeUpdateShape(OpKernelContext *const ctx); PartialTensorShape MakeUnknownShape(const int32_t &size) const; Status ProcessForDiffNodeTypes(Graph &graph, bool &is_initialize, bool &is_allreduce); @@ -184,10 +213,7 @@ public: static bool tuned_initialize_flag_; bool init_flag_; - bool build_flag_; - bool add_graph_flag_; bool sess_init_flag_; - bool compute_graph_empty_; bool is_input_convert_; std::string input_shapes_; @@ -195,6 +221,7 @@ public: std::string data_format_; uint32_t graph_id_; bool is_initialized_graph_; + bool is_empty_graph_; bool need_iteration_; std::string tf_session_; ge::Session *ge_session_; @@ -205,7 +232,7 @@ public: std::vector, uint32_t>> graph_counts_; std::map sess_options_; std::map init_options_; - static std::unordered_map session_and_graph_id_map_; + static std::unordered_map> session_and_graph_id_map_; uint32_t iteration_per_loop_; bool is_host_graph_; std::map graph_options_; @@ -218,8 +245,6 @@ public: std::string dynamic_graph_execute_mode_; std::string data_inputs_shape_range_; std::string getnext_inputs_shape_range_; - bool need_compile_graph_first_; - std::map tune_options_; std::string is_dynamic_getnext_; std::string placeholder_index_; std::atomic_flag tuned_flag_; @@ -250,6 +275,11 @@ public: AoeSetTuningGraphInputFunc aoe_set_tuning_graph_input_; // accelerate train AccelerateInfo accelerate_info_; + GraphHandler graph_handler_; + std::string geop_name_; + static uint32_t current_size_; + // graphid 与 add_flag的映射 + static thread_local std::map add_graph_flag_map_; }; } // namespace tensorflow #endif // TENSORFLOW_KERNELS_GEOP_NPU_H_ diff --git a/tf_adapter/tests/depends/ge_runner/src/callback_executor.cc b/tf_adapter/tests/depends/ge_runner/src/callback_executor.cc new file mode 100644 index 0000000000000000000000000000000000000000..7dfc3e91fcc9dc29ae3c2e26d46650e6a930ad8e --- /dev/null +++ b/tf_adapter/tests/depends/ge_runner/src/callback_executor.cc @@ -0,0 +1,86 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "callback_executor.h" +#include +#include "acl/acl_rt.h" + +namespace tensorflow { + CallbackExecutor &CallbackExecutor::GetInstance() { + static CallbackExecutor instance; + return instance; + } + void CallbackExecutor::Init() { + std::cout << "Start callback thread pool." << std::endl; + copy_thread_pool_.resize(thread_num_); + for (size_t idx = 0UL; idx < copy_thread_pool_.size(); idx++) { + if (copy_thread_pool_[idx] == nullptr) { + std::string thread_name = "thread_pool" + std::to_string(idx); + copy_thread_pool_[idx].reset(new std::thread(std::bind(&CallbackExecutor::CallbackHandler, this))); + } + } + thread_stop_flag_.store(false); + } + + void CallbackExecutor::CallbackHandler() { + std::cout << "Start callback thread." << std::endl; + CallbackPack closure; + while (!thread_stop_flag_.load()) { + { + std::unique_lock lck(queue_lock_); + queue_var_.wait(lck, [this]() { return ((!task_queue_.empty()) || (thread_stop_flag_.load())); }); + if (thread_stop_flag_.load()) { + queue_var_.notify_all(); + break; + } + closure = task_queue_.front(); + task_queue_.pop(); + std::cout << "Run callback" << std::endl; + } + closure.callback(closure.ge_status, closure.outputs); + std::unique_lock lck(queue_lock_); + run_num_--; + } + std::cout << "Callback thread is finished." << std::endl; + } + + void CallbackExecutor::PushTask(const CallbackPack &closure) { + std::unique_lock lck(queue_lock_); + std::cout << "Push closure" << std::endl; + task_queue_.push(closure); + run_num_++; + queue_var_.notify_all(); + } + + void CallbackExecutor::StopThreadPool() { + { + std::unique_lock lck(queue_lock_); + queue_var_.wait(lck, [this]() { return run_num_ <= 0; }); + std::cout << "Stop callback thread." << std::endl; + thread_stop_flag_.store(true); + queue_var_.notify_all(); + } + for (size_t i = 0UL; i < copy_thread_pool_.size(); i++) { + if (copy_thread_pool_[i]->joinable()) { + copy_thread_pool_[i]->join(); + } + } + } + int32_t CallbackExecutor::GetRunNum() { + std::unique_lock lck(queue_lock_); + return run_num_; + } +} \ No newline at end of file diff --git a/tf_adapter/tests/depends/ge_runner/src/callback_executor.h b/tf_adapter/tests/depends/ge_runner/src/callback_executor.h new file mode 100644 index 0000000000000000000000000000000000000000..ca8743372f7fe5bc849184acc4321032fd4c0c63 --- /dev/null +++ b/tf_adapter/tests/depends/ge_runner/src/callback_executor.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TESTS_DEPENDS_GE_RUNNER_SRC_HOST_THREAD_POOL_H_ +#define TESTS_DEPENDS_GE_RUNNER_SRC_HOST_THREAD_POOL_H_ + +#include +#include +#include +#include +#include +#include +#include +#include "ge/ge_api_types.h" +#include "graph/tensor.h" + +namespace tensorflow { +struct CallbackPack { + ge::RunAsyncCallback callback; + ge::Status ge_status; + std::vector outputs; +}; +class CallbackExecutor { + public: + static CallbackExecutor &GetInstance(); + void Init(); + void PushTask(const CallbackPack &closure); + void StopThreadPool(); + int32_t GetRunNum(); + private: + void CallbackHandler(); + std::mutex queue_lock_; + std::condition_variable queue_var_; + std::vector> copy_thread_pool_; + std::queue task_queue_; + std::atomic thread_stop_flag_{false}; + uint32_t thread_num_ = 1U; + int32_t run_num_ = 0; +}; +} +#endif // TESTS_DEPENDS_GE_RUNNER_SRC_HOST_THREAD_POOL_H_ \ No newline at end of file diff --git a/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc b/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc index 35c7c9a2323e42bb9d73a10ecbe6f0c545a4659e..4b3c360501df4b14a3ce8f823f66823fc350d13d 100644 --- a/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc +++ b/tf_adapter/tests/depends/ge_runner/src/ge_runner_stub.cc @@ -38,6 +38,7 @@ #include "ascendcl_stub.h" #include "ge_stub.h" #include "tf_adapter/common/adapter_logger.h" +#include "callback_executor.h" namespace ge { namespace { @@ -268,7 +269,7 @@ bool Session::IsGraphNeedRebuild(uint32_t graphId) { Status Session::AddGraph(uint32_t graphId, const Graph &graph, const std::map &options) { auto ret = graphs_map.find(graphId); if (ret != graphs_map.end()) { - return ge::FAILED; + return ge::SUCCESS; } graphs_map[graphId] = graph; return ge::SUCCESS; @@ -296,6 +297,10 @@ void RegRunGraphAsyncStub(RunGraphAsyncStub stub) { g_RunGraphAsyncStub = stub; } +void ClearRegRunGraphAsyncStub() { + g_RunGraphAsyncStub = nullptr; +} + Status Session::RunGraphAsync(uint32_t graphId, const std::vector &inputs, RunAsyncCallback callback) { if (g_RunGraphAsyncStub != nullptr) { return g_RunGraphAsyncStub(graphId, inputs, callback); @@ -309,7 +314,11 @@ Status Session::RunGraphAsync(uint32_t graphId, const std::vector &i } else { ret = ge::SUCCESS; } - callback(ret, outputs); + tensorflow::CallbackPack pack; + pack.callback = callback; + pack.ge_status = ge::SUCCESS; + pack.outputs = outputs; + tensorflow::CallbackExecutor::GetInstance().PushTask(pack); return ret; } diff --git a/tf_adapter/tests/depends/ge_runner/src/ge_stub.h b/tf_adapter/tests/depends/ge_runner/src/ge_stub.h index 025107d846078cfee6eed5a5acf89a696b2128ec..9a0b2be1b75674f918033738e0eaae11d4789cdd 100644 --- a/tf_adapter/tests/depends/ge_runner/src/ge_stub.h +++ b/tf_adapter/tests/depends/ge_runner/src/ge_stub.h @@ -51,5 +51,6 @@ void RegRunGraphStub(RunGraphStub stub); using RunGraphAsyncStub = std::function&, RunAsyncCallback)>; void RegRunGraphAsyncStub(RunGraphAsyncStub stub); +void ClearRegRunGraphAsyncStub(); } // namespace ge #endif // COMMON_GRAPH_DEBUG_GE_UTIL_H_ diff --git a/tf_adapter/tests/st/kernels/testcase/geop_npu_test.cc b/tf_adapter/tests/st/kernels/testcase/geop_npu_test.cc index 2e632988c924ac9cc135c8fa6615c28677da6ed0..273688fd522fd4e064d4e3b7670626002babe475 100644 --- a/tf_adapter/tests/st/kernels/testcase/geop_npu_test.cc +++ b/tf_adapter/tests/st/kernels/testcase/geop_npu_test.cc @@ -8,6 +8,7 @@ #include #include "gtest/gtest.h" #include "ge_stub.h" +#include "callback_executor.h" #define private public #include "tf_adapter/kernels/geop_npu.h" #undef private @@ -144,6 +145,9 @@ Status GeOpRunGraphAsync(std::string example_path, gtl::InlinedVector(¶ms); async_op->ComputeAsync(ctx1.get(), done); } + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); + } } } return Status::OK(); @@ -187,6 +191,9 @@ Status GeOpRunGraphAsyncMultiStep(std::string example_path, std::vector(¶ms); async_op->ComputeAsync(ctx.get(), done); + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); + } } } } @@ -195,6 +202,7 @@ Status GeOpRunGraphAsyncMultiStep(std::string example_path, std::vector inputs; @@ -594,8 +602,11 @@ TEST_F(GeOpTest, BuildOutputTensorInfo) { }); std::vector outputs; outputs.emplace_back(tensor); - - callback(ge::SUCCESS, outputs); + tensorflow::CallbackPack pack; + pack.callback = callback; + pack.ge_status = ge::SUCCESS; + pack.outputs = outputs; + tensorflow::CallbackExecutor::GetInstance().PushTask(pack); return ge::SUCCESS; }); @@ -779,6 +790,7 @@ TEST_F(GeOpTest, test_Get_GeSession_Failed) { GeOp *geop_node = dynamic_cast(g_op.get()); geop_node->tf_session_ = ""; EXPECT_EQ(geop_node->CreateGeSession().ok(), false); + CallbackExecutor::GetInstance().StopThreadPool(); } } // namespace } //end tensorflow diff --git a/tf_adapter/tests/ut/kernels/pbtxt/geop_jit_compile_auto.pbtxt b/tf_adapter/tests/ut/kernels/pbtxt/geop_jit_compile_auto.pbtxt new file mode 100644 index 0000000000000000000000000000000000000000..a98aeabbf601433c30f2c28e73fee7ef552e5f6b --- /dev/null +++ b/tf_adapter/tests/ut/kernels/pbtxt/geop_jit_compile_auto.pbtxt @@ -0,0 +1,995 @@ +node { + name: "arg_arg_Placeholder_0_0" + op: "_Arg" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "T" + value { + type: DT_INT32 + } + } + attr { + key: "index" + value { + i: 0 + } + } +} +node { + name: "retval_Mul_0_0" + op: "_Retval" + input: "GeOp11_1" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "T" + value { + type: DT_INT32 + } + } + attr { + key: "index" + value { + i: 0 + } + } +} +node { + name: "GeOp11_1" + op: "GeOp" + input: "GeOp11_0" + input: "GeOp11_0:1" + input: "arg_arg_Placeholder_0_0" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "Tin" + value { + list { + type: DT_INT64 + type: DT_INT64 + type: DT_INT32 + } + } + } + attr { + key: "Tout" + value { + list { + type: DT_INT32 + } + } + } + attr { + key: "_NpuOptimizer" + value { + s: "NpuOptimizer" + } + } + attr { + key: "_auto_tune_mode" + value { + s: "" + } + } + attr { + key: "_buffer_optimize" + value { + s: "l2_optimize" + } + } + attr { + key: "_compress_weight_conf" + value { + s: "" + } + } + attr { + key: "_debug_dir" + value { + s: "" + } + } + attr { + key: "_distribute_config" + value { + s: "" + } + } + attr { + key: "_do_npu_optimizer" + value { + s: "1" + } + } + attr { + key: "_dump_debug_mode" + value { + s: "all" + } + } + attr { + key: "_dump_mode" + value { + s: "output" + } + } + attr { + key: "_dump_path" + value { + s: "" + } + } + attr { + key: "_dump_step" + value { + s: "" + } + } + attr { + key: "_enable_compress_weight" + value { + s: "0" + } + } + attr { + key: "_enable_data_pre_proc" + value { + s: "1" + } + } + attr { + key: "_enable_dump" + value { + s: "0" + } + } + attr { + key: "_enable_dump_debug" + value { + s: "0" + } + } + attr { + key: "_enable_exception_dump" + value { + s: "" + } + } + attr { + key: "_enable_scope_fusion_passes" + value { + s: "" + } + } + attr { + key: "_enable_small_channel" + value { + s: "0" + } + } + attr { + key: "_fusion_switch_file" + value { + s: "" + } + } + attr { + key: "_graph_run_mode" + value { + s: "1" + } + } + attr { + key: "_hcom_multi_mode" + value { + s: "" + } + } + attr { + key: "_hcom_parallel" + value { + s: "0" + } + } + attr { + key: "_in_out_pair" + value { + s: "" + } + } + attr { + key: "_in_out_pair_flag" + value { + s: "1" + } + } + attr { + key: "_input_shape" + value { + s: "" + } + } + attr { + key: "_is_dynamic_getnext" + value { + s: "1" + } + } + attr { + key: "_jit_compile" + value { + s: "2" + } + } + attr { + key: "_is_tailing_optimization" + value { + s: "0" + } + } + attr { + key: "_iterations_per_loop" + value { + s: "1" + } + } + attr { + key: "_job" + value { + s: "localhost" + } + } + attr { + key: "_local_device_list" + value { + s: "" + } + } + attr { + key: "_local_rank_id" + value { + s: "-1" + } + } + attr { + key: "_lower_functional_ops" + value { + s: "0" + } + } + attr { + key: "_mix_compile_mode" + value { + s: "0" + } + } + attr { + key: "_mstune_mode" + value { + s: "" + } + } + attr { + key: "_op_compiler_cache_dir" + value { + s: "" + } + } + attr { + key: "_op_compiler_cache_mode" + value { + s: "" + } + } + attr { + key: "_op_debug_level" + value { + s: "0" + } + } + attr { + key: "_op_select_implmode" + value { + s: "" + } + } + attr { + key: "_op_tune_mode" + value { + s: "" + } + } + attr { + key: "_optypelist_for_implmode" + value { + s: "" + } + } + attr { + key: "_placeholder_index" + value { + s: "2" + } + } + attr { + key: "_precision_mode" + value { + s: "" + } + } + attr { + key: "_profiling_mode" + value { + s: "0" + } + } + attr { + key: "_profiling_options" + value { + s: "" + } + } + attr { + key: "_session_device_id" + value { + s: "" + } + } + attr { + key: "_stream_max_parallel_num" + value { + s: "" + } + } + attr { + key: "_task_index" + value { + s: "0" + } + } + attr { + key: "_use_off_line" + value { + s: "1" + } + } + attr { + key: "_variable_format_optimize" + value { + s: "1" + } + } + attr { + key: "_work_path" + value { + s: "" + } + } + attr { + key: "data_format" + value { + s: "NHWC" + } + } + attr { + key: "function" + value { + func { + name: "GeOp11_1" + } + } + } +} +node { + name: "GeOp11_0" + op: "GeOp" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "Tin" + value { + list { + } + } + } + attr { + key: "Tout" + value { + list { + type: DT_INT64 + type: DT_INT64 + } + } + } + attr { + key: "_NpuOptimizer" + value { + s: "NpuOptimizer" + } + } + attr { + key: "_auto_tune_mode" + value { + s: "" + } + } + attr { + key: "_buffer_optimize" + value { + s: "l2_optimize" + } + } + attr { + key: "_compress_weight_conf" + value { + s: "" + } + } + attr { + key: "_debug_dir" + value { + s: "" + } + } + attr { + key: "_distribute_config" + value { + s: "" + } + } + attr { + key: "_do_npu_optimizer" + value { + s: "1" + } + } + attr { + key: "_dump_debug_mode" + value { + s: "all" + } + } + attr { + key: "_dump_mode" + value { + s: "output" + } + } + attr { + key: "_dump_path" + value { + s: "" + } + } + attr { + key: "_dump_step" + value { + s: "" + } + } + attr { + key: "_dynamic_dims" + value { + s: "" + } + } + attr { + key: "_dynamic_graph_execute_mode" + value { + s: "lazy_recompile" + } + } + attr { + key: "_dynamic_input" + value { + s: "1" + } + } + attr { + key: "_dynamic_node_type" + value { + s: "" + } + } + attr { + key: "_enable_compress_weight" + value { + s: "0" + } + } + attr { + key: "_enable_data_pre_proc" + value { + s: "1" + } + } + attr { + key: "_enable_dump" + value { + s: "0" + } + } + attr { + key: "_enable_dump_debug" + value { + s: "0" + } + } + attr { + key: "_enable_exception_dump" + value { + s: "" + } + } + attr { + key: "_enable_scope_fusion_passes" + value { + s: "" + } + } + attr { + key: "_enable_small_channel" + value { + s: "0" + } + } + attr { + key: "_fusion_switch_file" + value { + s: "" + } + } + attr { + key: "_graph_run_mode" + value { + s: "1" + } + } + attr { + key: "_hcom_multi_mode" + value { + s: "" + } + } + attr { + key: "_hcom_parallel" + value { + s: "0" + } + } + attr { + key: "_in_out_pair" + value { + s: "" + } + } + attr { + key: "_in_out_pair_flag" + value { + s: "1" + } + } + attr { + key: "_input_shape" + value { + s: "" + } + } + attr { + key: "_is_dynamic_getnext" + value { + s: "1" + } + } + attr { + key: "_is_tailing_optimization" + value { + s: "0" + } + } + attr { + key: "_iterations_per_loop" + value { + s: "1" + } + } + attr { + key: "_job" + value { + s: "localhost" + } + } + attr { + key: "_local_device_list" + value { + s: "" + } + } + attr { + key: "_local_rank_id" + value { + s: "-1" + } + } + attr { + key: "_lower_functional_ops" + value { + s: "0" + } + } + attr { + key: "_mix_compile_mode" + value { + s: "0" + } + } + attr { + key: "_mstune_mode" + value { + s: "" + } + } + attr { + key: "_op_compiler_cache_dir" + value { + s: "" + } + } + attr { + key: "_op_compiler_cache_mode" + value { + s: "" + } + } + attr { + key: "_op_debug_level" + value { + s: "0" + } + } + attr { + key: "_op_select_implmode" + value { + s: "" + } + } + attr { + key: "_op_tune_mode" + value { + s: "" + } + } + attr { + key: "_optypelist_for_implmode" + value { + s: "" + } + } + attr { + key: "_precision_mode" + value { + s: "" + } + } + attr { + key: "_profiling_mode" + value { + s: "0" + } + } + attr { + key: "_profiling_options" + value { + s: "" + } + } + attr { + key: "_session_device_id" + value { + s: "" + } + } + attr { + key: "_stream_max_parallel_num" + value { + s: "" + } + } + attr { + key: "_task_index" + value { + s: "0" + } + } + attr { + key: "_use_off_line" + value { + s: "1" + } + } + attr { + key: "_variable_format_optimize" + value { + s: "1" + } + } + attr { + key: "_work_path" + value { + s: "" + } + } + attr { + key: "data_format" + value { + s: "NHWC" + } + } + attr { + key: "function" + value { + func { + name: "GeOp11_0" + } + } + } +} +library { + function { + signature { + name: "GeOp11_1" + input_arg { + name: "IteratorGetNext_0_arg" + type: DT_INT64 + } + input_arg { + name: "IteratorGetNext_1_arg" + type: DT_INT64 + } + input_arg { + name: "arg_arg_Placeholder_0_0_0_arg" + type: DT_INT32 + } + output_arg { + name: "Mul_0_retval" + type: DT_INT32 + } + } + node_def { + name: "Cast" + op: "Cast" + input: "IteratorGetNext_0_arg" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "DstT" + value { + type: DT_INT32 + } + } + attr { + key: "SrcT" + value { + type: DT_INT64 + } + } + attr { + key: "Truncate" + value { + b: false + } + } + } + node_def { + name: "Cast_1" + op: "Cast" + input: "IteratorGetNext_1_arg" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "DstT" + value { + type: DT_INT32 + } + } + attr { + key: "SrcT" + value { + type: DT_INT64 + } + } + attr { + key: "Truncate" + value { + b: false + } + } + } + node_def { + name: "Add" + op: "Add" + input: "Cast:y:0" + input: "Cast_1:y:0" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "T" + value { + type: DT_INT32 + } + } + } + node_def { + name: "Mul" + op: "Mul" + input: "Add:z:0" + input: "arg_arg_Placeholder_0_0_0_arg" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "T" + value { + type: DT_INT32 + } + } + attr { + key: "_graph_dynamic_graph_execute_mode" + value { + s: "lazy_recompile" + } + } + attr { + key: "_graph_dynamic_input" + value { + b: true + } + } + attr { + key: "_graph_dynamic_inputs_shape_range" + value { + s: "" + } + } + } + ret { + key: "Mul_0_retval" + value: "Mul:z:0" + } + } + function { + signature { + name: "GeOp11_0" + output_arg { + name: "IteratorGetNext_0_retval" + type: DT_INT64 + } + output_arg { + name: "IteratorGetNext_1_retval" + type: DT_INT64 + } + } + node_def { + name: "IteratorV2" + op: "IteratorV2" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "_NpuOptimizer" + value { + s: "NpuOptimizer" + } + } + attr { + key: "_enable_data_pre_proc" + value { + s: "1" + } + } + attr { + key: "_iterations_per_loop" + value { + s: "1" + } + } + attr { + key: "_job" + value { + s: "localhost" + } + } + attr { + key: "_mix_compile_mode" + value { + s: "0" + } + } + attr { + key: "container" + value { + s: "" + } + } + attr { + key: "output_shapes" + value { + list { + shape { + dim { + size: -1 + } + dim { + size: -1 + } + } + shape { + dim { + size: -1 + } + dim { + size: -1 + } + } + } + } + } + attr { + key: "output_types" + value { + list { + type: DT_INT64 + type: DT_INT64 + } + } + } + attr { + key: "shared_name" + value { + s: "IteratorV2" + } + } + } + node_def { + name: "IteratorGetNext" + op: "IteratorGetNext" + input: "IteratorV2:handle:0" + device: "/job:localhost/replica:0/task:0/device:CPU:0" + attr { + key: "output_shapes" + value { + list { + shape { + dim { + size: -1 + } + dim { + size: -1 + } + } + shape { + dim { + size: -1 + } + dim { + size: -1 + } + } + } + } + } + attr { + key: "output_types" + value { + list { + type: DT_INT64 + type: DT_INT64 + } + } + } + } + ret { + key: "IteratorGetNext_0_retval" + value: "IteratorGetNext:components:0" + } + ret { + key: "IteratorGetNext_1_retval" + value: "IteratorGetNext:components:1" + } + } +} +versions { + producer: 134 + min_consumer: 12 +} diff --git a/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc b/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc index e319a857190d94d3fec111e71ae3e7ff2d4a71b7..74e0a10150b7dafde940325c9bf2b7f1ca8d9584 100644 --- a/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc +++ b/tf_adapter/tests/ut/kernels/testcase/geop_npu_test.cc @@ -11,6 +11,7 @@ #include "tf_adapter/util/npu_plugin.h" #include "tf_adapter/util/npu_plugin.h" #include "tf_adapter/util/util.h" +#include "callback_executor.h" #define private public #include "tf_adapter/kernels/geop_npu.h" #undef private @@ -146,6 +147,9 @@ Status GeOpRunGraphAsync(std::string example_path, gtl::InlinedVector(¶ms); async_op->ComputeAsync(ctx1.get(), done); } + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); + } } } return Status::OK(); @@ -190,6 +194,9 @@ Status GeOpRunGraphAsyncMultiStep(std::string example_path, std::vector(¶ms); async_op->ComputeAsync(ctx.get(), done); } + while (CallbackExecutor::GetInstance().GetRunNum() > 0) { + usleep(100); + } } } return Status::OK(); @@ -198,6 +205,7 @@ Status GeOpRunGraphAsyncMultiStep(std::string example_path, std::vector inputs; @@ -603,8 +611,11 @@ TEST_F(GeOpTest, BuildOutputTensorInfo) { }); std::vector outputs; outputs.emplace_back(tensor); - - callback(ge::SUCCESS, outputs); + tensorflow::CallbackPack pack; + pack.callback = callback; + pack.ge_status = ge::SUCCESS; + pack.outputs = outputs; + tensorflow::CallbackExecutor::GetInstance().PushTask(pack); return ge::SUCCESS; }); @@ -784,6 +795,7 @@ TEST_F(GeOpTest, test_Get_GeSession_Failed) { GeOp *geop_node = dynamic_cast(g_op.get()); geop_node->tf_session_ = ""; EXPECT_EQ(geop_node->CreateGeSession().ok(), false); + CallbackExecutor::GetInstance().StopThreadPool(); } } // namespace } // namespace tensorflow