diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000000000000000000000000000000000000..772d005e42205120a89125c957db05acb7be9e50 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,70 @@ +{ + "files.associations": { + "array": "cpp", + "any": "cpp", + "string_view": "cpp", + "regex": "cpp", + "*.inc": "cpp", + "atomic": "cpp", + "*.tcc": "cpp", + "bitset": "cpp", + "cctype": "cpp", + "cfenv": "cpp", + "chrono": "cpp", + "cinttypes": "cpp", + "clocale": "cpp", + "cmath": "cpp", + "complex": "cpp", + "condition_variable": "cpp", + "cstdarg": "cpp", + "cstddef": "cpp", + "cstdint": "cpp", + "cstdio": "cpp", + "cstdlib": "cpp", + "cstring": "cpp", + "ctime": "cpp", + "cwchar": "cpp", + "cwctype": "cpp", + "deque": "cpp", + "forward_list": "cpp", + "list": "cpp", + "unordered_map": "cpp", + "unordered_set": "cpp", + "vector": "cpp", + "exception": "cpp", + "algorithm": "cpp", + "functional": "cpp", + "iterator": "cpp", + "map": "cpp", + "memory": "cpp", + "memory_resource": "cpp", + "numeric": "cpp", + "optional": "cpp", + "random": "cpp", + "ratio": "cpp", + "set": "cpp", + "string": "cpp", + "system_error": "cpp", + "tuple": "cpp", + "type_traits": "cpp", + "utility": "cpp", + "fstream": "cpp", + "future": "cpp", + "initializer_list": "cpp", + "iomanip": "cpp", + "iosfwd": "cpp", + "iostream": "cpp", + "istream": "cpp", + "limits": "cpp", + "mutex": "cpp", + "new": "cpp", + "ostream": "cpp", + "sstream": "cpp", + "stdexcept": "cpp", + "streambuf": "cpp", + "thread": "cpp", + "typeindex": "cpp", + "typeinfo": "cpp", + "valarray": "cpp" + } +} \ No newline at end of file diff --git a/tf_adapter/graph/attr_manager.cc b/tf_adapter/graph/attr_manager.cc new file mode 100644 index 0000000000000000000000000000000000000000..e32e142ee3d54c367405d83a7ff679b59c1e9ce3 --- /dev/null +++ b/tf_adapter/graph/attr_manager.cc @@ -0,0 +1,201 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "attr_manager.h" +#include "tf_adapter/common/adapter_logger.h" + +namespace tensorflow { +namespace { + const std::map kGlobalConfigOptions = { + {"_precision_mode", ge::PRECISION_MODE}, + {"_precision_mode_v2", "ge.exec.precision_mode_v2"}, + {"_auto_tune_mode", "ge.autoTuneMode"}, + {"_graph_run_mode", ge::OPTION_GRAPH_RUN_MODE}, + {"_op_debug_level", ge::OP_DEBUG_LEVEL}, + {"_enable_scope_fusion_passes", ge::OPTION_EXEC_ENABLE_SCOPE_FUSION_PASSES}, + {"_enable_exception_dump", "ge.exec.enable_exception_dump"}, + {"_aoe_mode", "ge.jobType"}, + {"_work_path", "ge.tuningPath"}, + {"_op_compiler_cache_mode", "ge.op_compiler_cache_mode"}, + {"_op_compiler_cache_dir", "ge.op_compiler_cache_dir"}, + {"_debug_dir", "ge.debugDir"}, + {"_hcom_multi_mode", "ge.hcomMultiMode"}, + {"_distribute_config", "distribute_config"}, + {"_modify_mixlist", ge::MODIFY_MIXLIST}, + {"_fusion_switch_file", "ge.fusionSwitchFile"}, + {"_op_precision_mode", ge::OP_PRECISION_MODE}, + {"_op_select_implmode", ge::OP_SELECT_IMPL_MODE}, + {"_device_type", "ge.deviceType"}, + {"_soc_config", "ge.socVersion"}, + {"_hccl_timeout", "ge.exec.hcclExecuteTimeOut"}, + {"_HCCL_algorithm", "HCCL_algorithm"}, + {"_customize_dtypes", "ge.customizeDtypes"}, + {"_op_debug_config", "ge.exec.opDebugConfig"}, + {"_graph_exec_timeout", "ge.exec.graphExecTimeout"}, + {"_static_memory_policy", "ge.exec.staticMemoryPolicy"}, + {"_logical_device_cluster_deploy_mode", "ge.exec.logicalDeviceClusterDeployMode"}, + {"_logical_device_id", "ge.exec.logicalDeviceId"}, + {"_model_deploy_mode", "ge.exec.modelDeployMode"}, + {"_model_deploy_devicelist", "ge.exec.modelDeployDevicelist"}, + {"_dump_data", "ge.exec.dumpData"}, + {"_aoe_config_file", "ge.aoe_config_file"}, + {"_stream_sync_timeout", "stream_sync_timeout"}, + {"_event_sync_timeout", "event_sync_timeout"}, + {"_es_cluster_config", "ge.esClusterConfig"} + }; + // session options + const std::map kSessionConfigOptions = { + {"_variable_format_optimize", "ge.exec.variable_acc"}, + {"_hcom_parallel", ge::HCOM_PARALLEL}, + {"_graph_memory_max_size", ge::GRAPH_MEMORY_MAX_SIZE}, + {"_enable_dump", ge::OPTION_EXEC_ENABLE_DUMP}, + {"_variable_memory_max_size", ge::VARIABLE_MEMORY_MAX_SIZE}, + {"_enable_dump_debug", ge::OPTION_EXEC_ENABLE_DUMP_DEBUG}, + {"_input_fusion_size", "ge.exec.input_fusion_size"}, + {"_dump_path", ge::OPTION_EXEC_DUMP_PATH}, + {"_dump_step", ge::OPTION_EXEC_DUMP_STEP}, + {"_dump_mode", ge::OPTION_EXEC_DUMP_MODE}, + {"_dump_debug_mode", ge::OPTION_EXEC_DUMP_DEBUG_MODE}, + {"_stream_max_parallel_num", ge::STREAM_MAX_PARALLEL_NUM}, + {"_ac_parallel_enable", ge::AC_PARALLEL_ENABLE}, + {"_quant_dumpable", ge::QUANT_DUMPABLE}, + {"_is_tailing_optimization", "ge.exec.isTailingOptimization"}, + {"_op_select_implmode", ge::OP_SELECT_IMPL_MODE}, + {"_optypelist_for_implmode", ge::OPTYPELIST_FOR_IMPLMODE}, + {"_input_shape", "ge.inputShape"}, + {"_dynamic_dims", "ge.dynamicDims"}, + {"_buffer_optimize", "ge.bufferOptimize"}, + {"_enable_small_channel", "ge.enableSmallChannel"}, + {"_fusion_switch_file", "ge.fusionSwitchFile"}, + {"_enable_compress_weight", "ge.enableCompressWeight"}, + {"_compress_weight_conf", "compress_weight_conf"}, + {"_dynamic_node_type", "ge.dynamicNodeType"}, + {"_session_device_id", "ge.session_device_id"}, + {"_modify_mixlist", ge::MODIFY_MIXLIST}, + {"_op_precision_mode", "ge.exec.op_precision_mode"}, + {"_graph_run_mode", ge::OPTION_GRAPH_RUN_MODE}, + {"_hccl_timeout", "ge.exec.hcclExecuteTimeOut"}, + {"_HCCL_algorithm", "HCCL_algorithm"}, + {"_atomic_clean_policy", "ge.exec.atomicCleanPolicy"}, + {"_memory_optimization_policy", "ge.exec.memoryOptimizationPolicy"}, + {"_topo_sorting_mode", "ge.topoSortingMode"}, + {"_insert_op_file", "ge.insertOpFile"}, + {"_resource_config_path", "ge.resourceConfigPath"}, + {"_dump_layer", "ge.exec.dumpLayer"}, + {"_external_weight", "ge.externalWeight"}, + {"_graph_parallel_option_path", "ge.graphParallelOptionPath"}, + {"_enable_graph_parallel", "ge.enableGraphParallel"}, + {"_graph_slice", "ge.graphSliceMode"}, + {"_compile_dynamic_mode", "ge.compile_dynamic_mode"}, + {"_jit_compile", "ge.jit_compile"}, + {"_graph_ compiler_cache_dir", "ge.graph_compiler_cache_dir"} + }; + + const std::map kGraphConfigOptions = { + {"_recompute_mode", "ge.recompute"}, + {"_max_key_num", "ge.max_key_num"}, + {"_use_counter_filter", "ge.use_counter_filter"}, + {"_embedding_dim", "ge.embedding_dim"}, + {"_execute_times", "ge.execute_times"}, + {"_padding_key", "ge.padding_key"}, + {"_embedding_flags", "ge.embedding_flags"}, + {"_jit_compile", "ge.jit_compile"}, + {"_is_var_init_graph", "ge.exec.isVarInitGraph"} + }; + const std::map kTfConfigOptions = { + {"data_format", "data_format"}, + {"_placeholder_index", "_placeholder_index"}, + {"_is_dynamic_getnext", "_is_dynamic_getnext"}, + {"job", "job"}, + {"mix_compile_mode", "mix_compile_mode"}, + {"accelerate_train_mode", "accelerate_train_mode"}, + {"_dynamic_input", "ge.exec.dynamicInput"}, + {"_dynamic_graph_execute_mode", "ge.exec.dynamicGraphExecuteMode"}, + {"_getnext_inputs_shape_range", "_getnext_inputs_shape_range"}, + {"_data_inputs_shape_range", "ge.exec.dataInputsShapeRange"}, + {"iterations_per_loop", "iterations_per_loop"} + }; +} +void AttrManager::ParserOptionFromConfigFile(OpKernelConstruction *ctx, + const std::map &config_map) { + for (const auto &iter : config_map) { + std::string value; + (void)ctx->GetAttr(iter.first, value); + if (!value.empty()) { + (void)all_attr_map_.emplace(std::make_pair(iter.first, value)); + } + } +} +void AttrManager::Init(OpKernelConstruction *ctx) { + ParserOptionFromConfigFile(ctx, kGlobalConfigOptions); + ParserOptionFromConfigFile(ctx, kSessionConfigOptions); + ParserOptionFromConfigFile(ctx, kAoeConfigOptions); + ParserOptionFromConfigFile(ctx, kGraphConfigOptions); + ParserOptionFromConfigFile(ctx, kTfConfigOptions); +} + +std::map AttrManager::GetOptionMapFromConfig( + const std::map &config_map) { + std::map option_map; + for (const auto &iter : all_attr_map_) { + const auto it = config_map.find(iter->first); + if (it != kGraphConfigOptions.cend()) { + (void)graph_option.emplace(std::make_pair(it->second, iter->second)); + } + } + return option_map; +} +std::map AttrManager::GetGraphOption() { + auto graph_option = GetOptionMapFromConfig(kGraphConfigOptions); + if (all_attr_map_["_dynamic_input"] == "1") { + (void)graph_option.emplace(std::make_pair("ge.exec.dynamicInput", all_attr_map_["_dynamic_input"])); + (void)graph_option.emplace(std::make_pair("ge.exec.dynamicGraphExecuteMode", + all_attr_map_["_dynamic_graph_execute_mode"])); + (void)graph_option.emplace(std::make_pair("ge.exec.dataInputsShapeRange", + all_attr_map_["_data_inputs_shape_range"])); + if (all_attr_map_["_dynamic_graph_execute_mode"] == "dynamic_execute" && + all_attr_map_["_data_inputs_shape_range"].empty() && + all_attr_map_["_getnext_inputs_shape_range"].empty()) { + (void)graph_option.emplace(std::make_pair("ge.shape_generalized_build_mode", "shape_generalized")); + } + } + if (std::atoi(pass_options["iterations_per_loop"].c_str()) > 1) { + (void)graph_option.emplace(std::make_pair("iterations_per_loop", all_attr_map_["iterations_per_loop"])); + } + const std::string graph_level_sat_value = (all_attr_map_["mix_compile_mode"] == "0") ? "1" : "0"; + (void)graph_option.emplace(std::make_pair("ge.graphLevelSat", graph_level_sat_value)); + return graph_option; +} +std::map AttrManager::GetInitOption() { + return GetOptionMapFromConfig(kGlobalConfigOptions); +} + +std::map AttrManager::GetSessionOption() { + return GetOptionMapFromConfig(kSessionConfigOptions); +} + +std::string AttrManager::GetOption(std::string &key) { + const auto it = all_attr_map_.find(key); + if (it != all_attr_map_.cend()) { + ADP_LOG(INFO) << "Key: " << it->first << ", value: " << it->second; + return iter->second; + } + return ""; +} +void AttrManager::SetOption(std::string &key, std::string &value) { + all_attr_map_[key] = value; +} +} \ No newline at end of file diff --git a/tf_adapter/graph/attr_manager.h b/tf_adapter/graph/attr_manager.h new file mode 100644 index 0000000000000000000000000000000000000000..c801e17717a167aecfaf37b5cce04ce59b439647 --- /dev/null +++ b/tf_adapter/graph/attr_manager.h @@ -0,0 +1,41 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TENSORFLOW_TFADAPTER_GRAPH_ATTR_MANAGER_H_ +#define TENSORFLOW_TFADAPTER_GRAPH_ATTR_MANAGER_H_ +#include +#include "tensorflow/core/framework/op_kernel.h" + +namespace tensorflow { +class AttrManager { + public: + void Init(OpKernelConstruction *ctx); + std::map GetGraphOption(); + std::map GetInitOption(); + std::map GetSessionOption(); + std::map GetAoeOption(); + std::map GetTfOption(); + std::string GetOption(std::string &key); + bool SetOption(std::string &key, std::string &value); + private: + void ParserOptionFromConfigFile(OpKernelConstruction *ctx, + const std::map); + std::map GetOptionMapFromConfig( + const std::map &config_map); + std::map all_attr_map_; +}; +} +#endif // TENSORFLOW_TFADAPTER_GRAPH_ATTR_MANAGER_H_ \ No newline at end of file diff --git a/tf_adapter/graph/event_processor.cc b/tf_adapter/graph/event_processor.cc new file mode 100644 index 0000000000000000000000000000000000000000..82d15173cf9f8ed369d7bcde16fbb8c9c83da1ca --- /dev/null +++ b/tf_adapter/graph/event_processor.cc @@ -0,0 +1,218 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "event_processor.h" + +namespace tensorflow { +Status EventProcessor::CheckAndRemoveGraph(const bool &shape_changed, const int32_t &graph_id) { + // To be compatible with old versions, we should check dynamic_input and dynamic_config + if (shape_changed || IsModelNeedRebuild(graph_id)) { + ge::Status status = ge_session_->RemoveGraph(graph_id); + if (status != ge::SUCCESS) { + return errors::Internal("Remove graph: ", graph_id, " failed"); + } + } + return Status::OK(); + } +} +Status EventProcessor::ParserGraph() { + if (!build_flag_) { + // Get Graph + // ParserGraph + OP_REQUIRES_ASYNC(ctx, ctx->function_library() != nullptr, errors::Internal("function library is nullptr"), done); + FunctionLibraryDefinition *flib_def = + const_cast(ctx->function_library()->GetFunctionLibraryDefinition()); + OP_REQUIRES_ASYNC(ctx, flib_def != nullptr, errors::Internal("flib_def is nullptr"), done); + + // Build GraphDef from FunctionDef + GraphDef ori_graph_def; + bool is_allreduce = false; + OP_REQUIRES_OK_ASYNC(ctx, BuildGraphDef(*flib_def, input_vec, ori_graph_def, is_initialized_graph_, is_allreduce), + done); + + /* if graph is init verify graph, return */ + if (this->is_initialized_graph_) { + Tensor initialized_tensor(ctx->expected_output_dtype(0), TensorShape({0})); + ctx->set_output(0, initialized_tensor); + done(); + return; + } + if (kDumpGraph) { + const std::string pbtxt_path = GetDumpPath() + "TF_" + geop_name.c_str() + ".pbtxt"; + (void) WriteTextProto(Env::Default(), pbtxt_path, ori_graph_def); + } + endTime = InferShapeUtil::GetCurrentTimestap(); + ADP_LOG(EVENT) << "[GEOP] In GEOP computeAsync, kernel_name: " << geop_name << " ,TFadapter cost time: [" + << ((endTime - startTime) / kMicrosToMillis) << " ms]."; + ADP_LOG(INFO) << "[GEOP] TFadpter process graph success, GE parser begin, kernel_name: " << geop_name + << " , tf session: " << tf_session_ << " , graph id: " << cache_graph_id; + ge::ComputeGraphPtr compute_graph = nullptr; + try { + const std::string compute_graph_name = "ge_default_" + CurrentTimeInStr(); + compute_graph = std::make_shared(compute_graph_name.c_str()); + } catch (...) { + OP_REQUIRES_ASYNC(ctx, false, errors::Internal("make shared failed"), done); + } + OP_REQUIRES_ASYNC(ctx, compute_graph != nullptr, errors::InvalidArgument("create ComputeGraph failed"), done); + // parser, tensorflow graph to ge graph + OP_REQUIRES_OK_ASYNC(ctx, DoGraphParser(compute_graph, flib_def, ori_graph_def), done); + ADP_LOG(INFO) << "[GEOP] Tensorflow graph parse to ge graph success, kernel_name: " << geop_name + << ", tf session: " << tf_session_ << " , graph id: " << cache_graph_id + << ", iteration_per_loop: " << iteration_per_loop_ << ", need iteration: " << this->need_iteration_; + size_t nodes = compute_graph->GetAllNodesSize(); + if (nodes == 0) { + build_flag_ = true; + compute_graph_empty_ = true; + endTime = InferShapeUtil::GetCurrentTimestap(); + ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is empty, kernel_name:" << geop_name + << ", ret_status:" << ToString(ge::SUCCESS) << " , tf session: " << tf_session_ + << " ,graph id: " << cache_graph_id << " [" << ((endTime - startTime) / kMicrosToMillis) << " ms]"; + done(); + return; + } + } +} +Status EventProcessor::AddGraph() { + // convert to ge::graph + if (graph_options_.count("input_format") != 0) { + ADP_LOG(INFO) << "graph_options_[\"input_format\"] = " << graph_options_["input_format"]; + } + ge::Graph ge_graph = ge::GraphUtilsEx::CreateGraphFromComputeGraph(compute_graph); + + const auto cahce_option_iter = sess_options_.find("ge.graph_compiler_cache_dir"); + if (cahce_option_iter != sess_options_.cend() && !cahce_option_iter->second.empty()) { + graph_options_["ge.graph_key"] = geop_name; + } + + if (is_host_graph_) { + ADP_LOG(INFO) << "[GEOP] set graph option."; + graph_options_["ge.exec.placement"] = "HOST"; + } + graph_options_["ge.shape_generalized_build_mode"] = "shape_precise"; + graph_options_["ge.exec.overflow"] = "1"; + OP_REQUIRES_OK_ASYNC(ctx, DoAccelerateTrain(), done); + // call ge session addGraph api + auto graph_options = graph_options_; + if (is_aoe_) { + graph_options["ge.buildMode"] = "normal"; + } + if ((is_dynamic_getnext_ != "1") && (iteration_per_loop_ <= 1)) { + SetReuseOptions("ge.exec.inputReuseMemIndexes", ctx->num_inputs(), sess_options_, init_options_, graph_options); + } + SetReuseOptions("ge.exec.outputReuseMemIndexes", ctx->num_outputs(), sess_options_, init_options_, graph_options); + graph_options["ge.exec.graphIOMemAllocMode"] = "ByGE"; + auto const graph_option_ascend_string = ChangeStringToAscendString(graph_options); + ADP_LOG(INFO) << "Graph options: "; + NpuAttrs::LogOptions(graph_options); + auto status = ge_session_->AddGraph(cache_graph_id, ge_graph, graph_option_ascend_string); + std::stringstream ss; + if (status != ge::SUCCESS) { + std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); + ADP_LOG(FATAL) << "[GEOP] call ge session add graph failed, kernel: " << geop_name << " ,tf session: " + << tf_session_ << ", graph id: " << cache_graph_id; + + ss << "[GEOP] call ge session add graph failed, kernel: " << geop_name << ", tf session: " << tf_session_ + << ", graph id: " << cache_graph_id << std::endl + << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + } + OP_REQUIRES_ASYNC(ctx, status == ge::SUCCESS, errors::Internal(ss.str()), done); + add_graph_flag_ = true; + ADP_LOG(INFO) << "[GEOP] Add graph to ge session success, kernel_name: " << geop_name + << ", tf session: " << tf_session_ << ", graph id: " << cache_graph_id; +} +Status EventProcessor::BuildGraph() { + build_flag_ = true; + if (!is_multi_batch_ && is_lazy_recompile_mode) { + cache_graphs_.insert(std::make_pair(input_shapes, cache_graph_id)); + graph_counts_.push_back(std::make_pair(input_shapes, 1)); + } + if (need_compile_graph_first_) { + ge::Status build_graph_status = ge_session_->BuildGraph(cache_graph_id, inputs); + std::stringstream ss; + if (build_graph_status != ge::SUCCESS) { + ss << "[GEOP] GE session build graph failed, domi_ret : " << build_graph_status << std::endl + << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + } + OP_REQUIRES_ASYNC(ctx, build_graph_status == ge::SUCCESS, errors::Internal(ss.str()), done); + ADP_LOG(INFO) << "[GEOP] Build graph success."; + done(); + return; + } + LOG(INFO) << "The model has been compiled on the Ascend AI processor, current graph id is: " << cache_graph_id; + } else { + if (compute_graph_empty_) { + endTime = InferShapeUtil::GetCurrentTimestap(); + ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is empty, kernel_name:" << geop_name + << ", ret_status:" << ToString(ge::SUCCESS) << " , tf session: " << tf_session_ + << " ,graph id: " << cache_graph_id << " [" << ((endTime - startTime) / kMicrosToMillis) << " ms]"; + done(); + return; + } + } +} +Status EventProcessor::RunGraph() { + int64 run_start_time = InferShapeUtil::GetCurrentTimestap(); + auto callback = [done, ctx, run_start_time](ge::Status ge_status, std::vector &outputs) { + if (ge_status == ge::SUCCESS) { + if (BuildOutputTensorInfo(ctx, outputs) != Status::OK()) { + ADP_LOG(FATAL) << ctx->op_kernel().name() << " GEOP::DoRunAsync get output failed."; + std::stringstream ss; + ss << ctx->op_kernel().name() << "GEOP::DoRunAsync get output failed." << std::endl + << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + OP_REQUIRES_ASYNC(ctx, false, errors::Internal(ss.str()), done); + return; + } + } else if (ge_status == ge::END_OF_SEQUENCE) { + ctx->SetStatus(errors::OutOfRange("End of sequence")); + ADP_LOG(WARNING) << "[GEOP] Out of range: End of sequence."; + LOG(WARNING) << "[GEOP] Out of range: End of sequence."; + } else if (ge_status != ge::SUCCESS) { + std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); + ADP_LOG(FATAL) << ctx->op_kernel().name() << "GEOP::::DoRunAsync Failed"; + std::stringstream ss; + ss << ctx->op_kernel().name() << "GEOP::::DoRunAsync Failed" << std::endl + << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + OP_REQUIRES_ASYNC(ctx, false, errors::Internal(ss.str()), done); + return; + } + int64 run_end_time = InferShapeUtil::GetCurrentTimestap(); + ADP_LOG(INFO) << "[GEOP] RunGraphAsync callback, status:" << ge_status + << ", kernel_name:" << ctx->op_kernel().name() << "[ " << (run_end_time - run_start_time) << "us]"; + done(); + }; + + // call ge session runGraphAsync api + ADP_LOG(INFO) << "[GEOP] Call ge session RunGraphAsync, kernel_name: " << geop_name << ", tf session: " << tf_session_ + << ", graph id: " << cache_graph_id; + ge::Status run_graph_status = ge_session_->RunGraphAsync(cache_graph_id, inputs, callback); + std::stringstream ss; + if (run_graph_status != ge::SUCCESS) { + std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); + ADP_LOG(FATAL) << "[GEOP] call ge session RunGraphAsync Failed, kernel:" << geop_name << " ,tf session: " + << tf_session_ << " ,graph id: " << cache_graph_id; + ss << "[GEOP] call ge session RunGraphAsync Failed, kernel:" << geop_name << ", tf session: " << tf_session_ + << ", graph id: " << cache_graph_id << std::endl + << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); + } + OP_REQUIRES_ASYNC(ctx, run_graph_status == ge::SUCCESS, errors::Internal(ss.str()), done); + + endTime = InferShapeUtil::GetCurrentTimestap(); + ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, kernel_name: " << geop_name + << ", ret_status: " << ToString(run_graph_status) << ", tf session : " << tf_session_ + << ", graph id: " << cache_graph_id << ", cost [" << ((endTime - startTime) / kMicrosToMillis) << "ms]"; + return; +} +} diff --git a/tf_adapter/graph/event_processor.h b/tf_adapter/graph/event_processor.h new file mode 100644 index 0000000000000000000000000000000000000000..d75df7a519f04e5bb89996798b40f2ae966fe81d --- /dev/null +++ b/tf_adapter/graph/event_processor.h @@ -0,0 +1,48 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TENSORFLOW_TFADAPTER_GRAPH_EVENT_PROCESSOR_H_ +#define TENSORFLOW_TFADAPTER_GRAPH_EVENT_PROCESSOR_H_ +#include "tensorflow/core/platform/mutex.h" +#include "ge/ge_api_types.h" + +namespace tensorflow { +enum HandlerStatus { + Init, + ParserDone, + AddDone, + BuildDone, + RunDone +} + +class EventProcessor { + public: + EventProcessor(HandlerStatus status) : status_(status) {} + ~EventProcessor() = default; + Status CheckAndRemoveGraph(const bool &shape_changed, const int32_t &graph_id); + Status ParserGraph(); + Status AddGraph(); + Status BuildGraph(); + Status RunGraph(); + private: + mutex mu_; + mutex wait_mu_; + condition_variable cv_; + HandlerStatus status_; +}; +} // namespace tensorflow + +#endif // TENSORFLOW_TFADAPTER_GRAPH_GRAPH_HANDLER_H_ \ No newline at end of file diff --git a/tf_adapter/graph/graph_builder.cc b/tf_adapter/graph/graph_builder.cc new file mode 100644 index 0000000000000000000000000000000000000000..7ced7a6c5937647cb40134c0308718f1b15f84f8 --- /dev/null +++ b/tf_adapter/graph/graph_builder.cc @@ -0,0 +1,97 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "graph_builder" +namespace tensorflow { + // Build GraphDef from FunctionDef. +Status GraphBuillder::BuildGraphDef(FunctionLibraryDefinition &flib_def, const std::vector &input_vec, + GraphDef &graph_def, bool &is_initialize, bool &is_allreduce) { + const FunctionDef *function_def = flib_def.Find(function_.name()); + NPU_REQUIRES(function_def != nullptr, errors::Internal("Function:", function_.name(), " fdef is nullptr")); + // get infershape + Graph graph(OpRegistry::Global()); + Status ret = InferShapeUtil::InferShape(input_vec, &flib_def, function_def, &graph); + if (!ret.ok()) { + ADP_LOG(ERROR) << "[GEOP] InferShape failed, " << ret.error_message(); + LOG(ERROR) << "[GEOP] InferShape failed, " << ret.error_message(); + return ret; + } + + if (is_multi_batch_) { + attr_manager_.SetOption("_jit_compile", "1"); + BuildShapeNodeAndCacheArgNodes(graph); + } + + NPU_REQUIRES_OK(ProcessForDiffNodeTypes(graph, is_initialize, is_allreduce)); + + // set input_shape to dynamic nodes shape desc + if (is_multi_batch_) { + ret = ChangeInputsShapeDesc(); + if (!ret.ok()) { + ADP_LOG(ERROR) << "[GEOP] ChangeInputsShapeDesc failed, " << ret.error_message(); + LOG(ERROR) << "[GEOP] ChangeInputsShapeDesc failed, " << ret.error_message(); + return ret; + } + } + HandleDpOpAndGetNextNodes(graph); + + // 二进制场景(jit=0 or jit=2), 如果shape变化,则更新输入shape + if ((attr_manager_.GetOption("_jit_compile") != "1") || (attr_manager_.GetOption("_compile_dynamic_mode"))) { + UpdateInputsShapeDesc(graph); + } + + graph.ToGraphDef(&graph_def); + std::string enable_force_v2_control; + (void) ReadStringFromEnvVar("ENABLE_FORCE_V2_CONTROL", "", &enable_force_v2_control); + if (enable_force_v2_control == "1") { + Status status = FunctionalizeControlFlow(&graph, &flib_def); + if (status != Status::OK()) { + LOG(WARNING) << "[GEOP] Failed functionalize control flow: " << status.error_message(); + return Status::OK(); + } + graph.ToGraphDef(&graph_def); + } + return Status::OK(); +} + +Status GraphBuilder::DoGraphParser(ge::ComputeGraphPtr &compute_graph, FunctionLibraryDefinition *flib_def, + GraphDef &ori_graph_def) { + std::shared_ptr model_parser = + domi::ModelParserFactory::Instance()->CreateModelParser(domi::FrameworkType::TENSORFLOW); + REQUIRES_NOT_NULL(model_parser); + + std::map const_value_map; + std::vector partition_graph; + auto tf_status = SeparateGraphDef(ori_graph_def, partition_graph, const_value_map); + if (!tf_status.ok()) { + return tf_status; + } + auto build_sub_graph = [this, flib_def](const ge::AscendString &graph) -> ge::AscendString { + const auto &graph_def = this->BuildSubGraph(flib_def, std::string(graph.GetString())); + return ge::AscendString(graph_def.c_str(), graph_def.length()); + }; + ge::Status status = + model_parser->ParseProtoWithSubgraph(partition_graph, const_value_map, build_sub_graph, compute_graph); + if (status != ge::SUCCESS) { + std::stringstream ss; + ss << "graph parse failed. ret : " << status << std::endl << "Error Message is : " + << std::endl << ge::GEGetErrorMsgV2().GetString(); + return errors::Internal(ss.str()); + } + + domi::GetContext().format = ge::GetParserContext().format; + return Status::OK(); +} +} diff --git a/tf_adapter/graph/graph_builder.h b/tf_adapter/graph/graph_builder.h new file mode 100644 index 0000000000000000000000000000000000000000..f19e5941eb0230721d4e3f3222c07d834287be24 --- /dev/null +++ b/tf_adapter/graph/graph_builder.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TENSORFLOW_TFADAPTER_GRAPH_GRAPH_BUILDER_H_ +#define TENSORFLOW_TFADAPTER_GRAPH_GRAPH_BUILDER_H_ + +namespace tensorflow { +class GraphBuilder { + public: + static Status BuildGraphDef(FunctionLibraryDefinition &flib_def, const std::vector &input_vec, + GraphDef &graph_def, bool &is_initialize, bool &is_allreduce); + static Status DoGraphParser(ge::ComputeGraphPtr &compute_graph, FunctionLibraryDefinition *flib_def, + GraphDef &ori_graph_def); + private: + +}; +} + +#endif // TENSORFLOW_TFADAPTER_GRAPH_GRAPH_BUILDER_H_ \ No newline at end of file diff --git a/tf_adapter/graph/graph_manager.cc b/tf_adapter/graph/graph_manager.cc new file mode 100644 index 0000000000000000000000000000000000000000..81095eab8011e3fa07e8c9db0d7d36dee8f74742 --- /dev/null +++ b/tf_adapter/graph/graph_manager.cc @@ -0,0 +1,44 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "graph_manager.h" + +namespace tensorflow { +namespace { + const int32_t kMaxCacheNum = 10; +} +GraphManager &GraphManager::GetInstance() { + static GraphManager instance; + return instance; +} +int32_t GraphManager::CreateGraphId(const std::string &session_name, const std::string &op_name) { + auto it = graph_id_map_.find(session_name); + if (it != graph_id_map_.end()) { + auto iter_graph_id = it->second.find(op_name); + if (iter_graph_id != it->second.end()) { + return iter_graph_id->second; + } + int32_t graph_id = current_graph_id_ * kMaxCacheNum + 1; + it->second.emplace(std::make_pair(op_name, graph_id)); + return graph_id; + } + int32_t graph_id = current_graph_id_ * kMaxCacheNum + 1; + std::map graph_id_op_name_map = {{op_name, graph_id}}; + graph_id_map_.emplace(std::make_pair(session_name, graph_id_op_name_map)); + return graph_id; +} + +} \ No newline at end of file diff --git a/tf_adapter/graph/graph_manager.h b/tf_adapter/graph/graph_manager.h new file mode 100644 index 0000000000000000000000000000000000000000..eba30548c9581254fc46101795933088a7d497e7 --- /dev/null +++ b/tf_adapter/graph/graph_manager.h @@ -0,0 +1,39 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TENSORFLOW_TFADAPTER_GRAPH_GRAPH_MANAGER_H_ +#define TENSORFLOW_TFADAPTER_GRAPH_GRAPH_MANAGER_H_ +#include + +namespace tensorflow { +class GraphManager { + public: + GraphManager() = default; + ~GraphManager() = default; + + static GraphManager &GetInstance(); + int32_t CreateGraphId(const std::string &session_name, const std::string &op_name); + GraphManager(const GraphManager&) = delete; + GraphManager(GraphManager &&) = delete; + GraphManager& operator=(const GraphManager&) = delete; + GraphManager& operator=(GraphManager &&) = delete; + private: + std::map> graph_id_map_; + int32_t current_graph_id_ = 0; +}; +} + +#endif // TENSORFLOW_TFADAPTER_GRAPH_GRAPH_MANAGER_H_ \ No newline at end of file diff --git a/tf_adapter/graph/npu_aoe.cpp b/tf_adapter/graph/npu_aoe.cpp new file mode 100644 index 0000000000000000000000000000000000000000..89b72e1fe430c5b47de6336c025cd6df0122d4a3 --- /dev/null +++ b/tf_adapter/graph/npu_aoe.cpp @@ -0,0 +1,236 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "npu_aoe.h" +#include +#include "tf_adapter/common/adapter_logger.h" +#include "graph/graph_builder.h" +#include "tf_adapter/util/npu_attrs.h" + +namespace tensorflow { +NpuAoe &NpuAoe::GetInstance() { + static NpuAoe instance; + return instance; +} + +Status NpuAoe::RunAoeTuning(std::vector &input_vec, + std::vector &inputs, + const int32_t &graph_id, + const OpKernelContext *const ctx) { + mutex_lock lock{aoe_mu_}; + if (ge_graphs_.find(graph_id) != ge_graphs_.end()) { + ADP_LOG(INFO) << "Graph: " << graph_id << " has been tuned."; + return Status::OK(); + } + + ADP_LOG(INFO) << "Graph: " << graph_id << " begin tune."; + + // Get Graph + OP_REQUIRES(ctx, ctx->function_library() != nullptr, errors::Internal("Function library is nullptr")); + FunctionLibraryDefinition *flib_def = + const_cast(ctx->function_library()->GetFunctionLibraryDefinition()); + OP_REQUIRES(ctx, flib_def != nullptr, errors::Internal("flib_def is nullptr")); + std::shared_ptr graph = std::make_shared(OpRegistry::Global()); + OP_REQUIRES(ctx, graph != nullptr, errors::Internal("graph is nullptr")); + // Build GraphDef from FunctionDef + bool is_allreduce = false; + bool is_initialized_graph = false; + GraphDef ori_graph_def; + OP_REQUIRES_OK(ctx, + GraphBuilder::BuildGraphDef(*flib_def, input_vec, ori_graph_def, is_initialized_graph, is_allreduce)); + + if (is_initialized_graph) { + ADP_LOG(INFO) << ctx->op_kernel().name() << " graph is initialized"; + return Status::OK(); + } + + if (kDumpGraph) { + const std::string pbtxt_path = GetDumpPath() + "TF_" + ctx->op_kernel().name() + "_AOE.pbtxt"; + (void)WriteTextProto(Env::Default(), pbtxt_path, ori_graph_def); + } + const std::string compute_graph_name = "ge_default_" + CurrentTimeInStr(); + ge::ComputeGraphPtr compute_graph = std::make_shared(compute_graph_name.c_str()); + OP_REQUIRES(ctx, compute_graph != nullptr, errors::Internal("compute_graph is nullptr")); + // parser, tensorflow graph to ge graph + OP_REQUIRES_OK(ctx, GraphBuilder::DoGraphParser(compute_graph, flib_def, ori_graph_def)); + ADP_LOG(INFO) << "[GEOP] Tensorflow graph parse to ge graph success."; + + // convert to ge::graph + ge::Graph ge_graph = ge::GraphUtilsEx::CreateGraphFromComputeGraph(compute_graph); + ge_graph.SetNeedIteration(false); + if (is_host_graph_) { + graph_options_["ge.exec.placement"] = "HOST"; + } + graph_options_["ge.exec.overflow"] = "1"; + + // run aoe tuning + bool is_mdat_tuning = (init_options_["ge.jobType"] == kMdatTuning) && + (attr_manager_.GetOption("_recompute_mode") == kAutoRecompute); + if ((init_options_["ge.jobType"] == "1") || (init_options_["ge.jobType"] == "2") || + ((init_options_["ge.jobType"] == "4") && is_allreduce) || is_mdat_tuning) { + std::function callback = [this]() { + if (aoe_destroy_session_ != nullptr) { + AoeStatus aoe_destroy_ret = (*aoe_destroy_session_)(session_id_); + if (aoe_destroy_ret != Aoe::AOE_SUCCESS) { + ADP_LOG(ERROR) << "exec aoe destroy func failed[" << aoe_destroy_ret << "]."; + return; + } + ADP_LOG(INFO) << "[GEOP] aoe destroy success[" << aoe_destroy_ret << "]."; + } + }; + ADP_LOG(INFO) << "[GEOP] in tune mode, training graph handled by tools."; + + // aoe create session + AoeStatus session_ret = (*aoe_create_session_)(session_id_); + if (session_ret != Aoe::AOE_SUCCESS) { + ADP_LOG(ERROR) << "exec aoe create session func failed[" << session_ret << "]."; + return -1; + } + { + GE_MAKE_GUARD(destroy, callback); + const auto &ge_status = CreateGeSession(); + if (!ge_status.ok()) { + ADP_LOG(ERROR) << "get ge session failed[" << ge_status.error_message() << "]."; + return -1; + } + // share ge_session to aoe + AoeStatus set_ret = (*aoe_set_gesession_)(session_id_, ge_session_); + if (set_ret != Aoe::AOE_SUCCESS) { + ADP_LOG(ERROR) << "exec aoe set session func failed[" << set_ret << "]."; + return -1; + } + // set tuning graph + AoeStatus tune_ret = (*aoe_set_tuninggraph_)(session_id_, ge_graph); + if (tune_ret != Aoe::AOE_SUCCESS) { + ADP_LOG(ERROR) << "exec aoe set graph func failed[" << tune_ret << "]."; + return -1; + } + // set tuning inputs + AoeStatus set_inputs_ret = (*aoe_set_tuning_graph_input_)(session_id_, inputs); + if (set_inputs_ret != Aoe::AOE_SUCCESS) { + ADP_LOG(ERROR) << "exec aoe set tuning inputs func failed[" << set_inputs_ret << "]."; + return -1; + } + // aoe tuning + std::map tuing_options; + tuing_options.insert({ge::AscendString("ge.recompute"), ge::AscendString(attr_manager_.GetOption("_recompute_mode"))}); + tuing_options.insert( + {ge::AscendString("ge.aoe_config_file"), ge::AscendString(init_options_["ge.aoe_config_file"].c_str())}); + AoeStatus aoe_tune_ret = (*aoe_tuning_graph_)(session_id_, tuing_options); + if ((aoe_tune_ret != Aoe::AOE_SUCCESS) && (aoe_tune_ret != Aoe::AOE_ERROR_NO_AICORE_GRAPH)) { + ADP_LOG(ERROR) << "exec aoe tuning func failed[" << aoe_tune_ret << "]."; + return -1; + } + ADP_LOG(INFO) << "[GEOP] aoe success[" << aoe_tune_ret << "]."; + } + } + return 0; +} + +tensorflow::Status NpuAoe::AoeTuningInitialize(OpKernelConstruction *ctx, + const std::string &work_path, + const std::string &job_type, + const std::string &resource_config_path) { + if (is_initialize_) { + return Status::OK(); + } + ADP_LOG(INFO) << "Start to run aoe initialize"; + handle_ = mmDlopen("libaoe_tuning.so", MMPA_RTLD_NOW); + OP_REQUIRES(ctx, handle_ != nullptr, errors::InvalidArgument("libaoe_tuning.so dlopen failed, ", mmDlerror())); + + OP_REQUIRES(ctx, LoadAoeFunc(ctx)); + std::map aoe_global_options; + (void)aoe_global_options.emplace(ge::AscendString("work_path"), ge::AscendString(work_path.c_str())); + (void)aoe_global_options.emplace(ge::AscendString("job_type"), + ge::AscendString(job_type.c_str())); + (void)aoe_global_options.emplace(ge::AscendString("ge.resourceConfigPath"), + ge::AscendString(resource_config_path.c_str())); + auto ret = aoe_func_.aoe_initialize(aoe_global_options); + OP_REQUIRES_ASYNC(ctx, ret == Aoe::AOE_SUCCESS, errors::Internal("exec aoe initialize func failed[", ret, "]")); + is_initialize_ = true; + ADP_LOG(INFO) << "Run aoe initialize success"; + return Status::OK(); +} + +tensorflow::Status NpuAoe::LoadAoeFunc(OpKernelConstruction *ctx) { + ADP_LOG(INFO) << "Start to load aoe function"; + // aoe init + aoe_func_.aoe_initialize_ = (AoeInitializeFunc) mmDlsym(handle_, "AoeInitialize"); + OP_REQUIRES(ctx, aoe_func_.aoe_initialize_ != nullptr, + errors::InvalidArgument("dlsym Aoe initialize API failed, ", mmDlerror())); + // aoe finalize + aoe_func_.aoe_finalize_ = (AoeFinalizeFunc) mmDlsym(handle_, "AoeFinalize"); + OP_REQUIRES(ctx, aoe_func_.aoe_initialize_ != nullptr, + errors::InvalidArgument("dlsym Aoe Finalize API failed, ", mmDlerror())); + // aoe create session + aoe_func_.aoe_create_session_ = (AoeCreateSessionFunc) mmDlsym(handle_, "AoeCreateSession"); + OP_REQUIRES(ctx, aoe_func_.aoe_create_session_ != nullptr, + errors::InvalidArgument("dlsym Aoe create session API failed, ", mmDlerror())); + // aoe destroy session + aoe_func_.aoe_destroy_session_ = (AoeDestroySessionFunc) mmDlsym(handle_, "AoeDestroySession"); + OP_REQUIRES(ctx, aoe_func_.aoe_destroy_session_ != nullptr, + errors::InvalidArgument("dlsym Aoe destroy session API failed, ", mmDlerror())); + // share ge_session to aoe + aoe_func_.aoe_set_gesession_ = (AoeSetGeSessionFunc) mmDlsym(handle_, "AoeSetGeSession"); + OP_REQUIRES(ctx, aoe_func_.aoe_set_gesession_ != nullptr, + errors::InvalidArgument("dlsym Aoe set session API failed, ", mmDlerror())); + // aoe set depend graphs + aoe_func_.aoe_set_dependgraphs_ = (AoeSetDependGraphFunc) mmDlsym(handle_, "AoeSetDependGraphs"); + OP_REQUIRES(ctx, aoe_func_.aoe_set_dependgraphs_ != nullptr, + errors::InvalidArgument("dlsym Aoe set depend graphs API failed, ", mmDlerror())); + // aoe set tuning graph + aoe_func_.aoe_set_tuninggraph_ = (AoeSetTuningGraphFunc) mmDlsym(handle_, "AoeSetTuningGraph"); + OP_REQUIRES(ctx, aoe_func_.aoe_set_tuninggraph_ != nullptr, + errors::InvalidArgument("dlsym Aoe aoe set tuning graph API failed, ", mmDlerror())); + // aoe tuning + aoe_func_.aoe_tuning_graph_ = (AoeTuningGraphFunc) mmDlsym(handle_, "AoeTuningGraph"); + OP_REQUIRES(ctx, aoe_func_.aoe_tuning_graph_ != nullptr, + errors::InvalidArgument("dlsym Aoe tuning graph API failed, ", mmDlerror())); + // aoe set tuning depend graphs inputs + aoe_func_.aoe_set_depend_graphs_inputs_ = + reinterpret_cast(mmDlsym(handle_, "AoeSetDependGraphsInputs")); + OP_REQUIRES(ctx, aoe_func_.aoe_set_depend_graphs_inputs_ != nullptr, + errors::InvalidArgument("dlsym Aoe set tuning depend graphs inputs API failed, ", mmDlerror())); + // aoe set tuning graph inputs + aoe_func_.aoe_set_tuning_graph_input_ = + reinterpret_cast(mmDlsym(handle_, "AoeSetTuningGraphInput")); + OP_REQUIRES(ctx, aoe_func_.aoe_set_tuning_graph_input_ != nullptr, + errors::InvalidArgument("dlsym Aoe set tuning graph inputs API failed, ", mmDlerror())); + ADP_LOG(INFO) << "Load aoe function success"; + return Status::OK(); +} + +Status NpuAoe::AoeTuningFinalize() { + if (handle_ != nullptr && is_initialize_) { + ADP_LOG(INFO) << "Start to run aoe finalize"; + auto ret = aoe_func_.aoe_finalize(); + if (tune_ret != Aoe::AOE_SUCCESS) { + return errors::InvalidArgument("Exec aoe finalize func failed."); + } + is_initialize_ = false; + ADP_LOG(INFO) << "Run aoe finalize success"; + } + return tensorflow::Status::OK(); +} + +NpuAoe::~NpuAoe() { + if (handle_ != nullptr) { + ADP_LOG(INFO) << "Close handle"; + (void)mmDlclose(handle_); + handle_ = nullptr; + } +} +} // namespace npu \ No newline at end of file diff --git a/tf_adapter/graph/npu_aoe.h b/tf_adapter/graph/npu_aoe.h new file mode 100644 index 0000000000000000000000000000000000000000..5c418caafede354aa9d74757d5de72baa78eaf77 --- /dev/null +++ b/tf_adapter/graph/npu_aoe.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TENSORFLOW_TFADAPTER_GRAPH_NPU_AOE_H_ +#define TENSORFLOW_TFADAPTER_GRAPH_NPU_AOE_H_ + +#include +#include "aoe_tuning_api.h" +#include "ge/ge_api.h" +#include "tensorflow/core/platform/mutex.h" + +namespace tensorflow { +using SessionId = uint64_t; +using AoeStatus = int32_t; +using AoeInitializeFunc = AoeStatus (*)(const std::map &); +using AoeFinalizeFunc = AoeStatus (*)(); +using AoeCreateSessionFunc = AoeStatus (*)(SessionId &); +using AoeDestroySessionFunc = AoeStatus (*)(SessionId); +using AoeSetGeSessionFunc = AoeStatus (*)(SessionId, ge::Session *); +using AoeSetDependGraphFunc = AoeStatus (*)(SessionId, const std::vector &); +using AoeSetDependGraphsInputsFunc = AoeStatus (*)(SessionId, const std::vector> &); +using AoeSetTuningGraphInputFunc = AoeStatus (*)(SessionId, const std::vector &); +using AoeSetTuningGraphFunc = AoeStatus (*)(SessionId, const ge::Graph &); +using AoeTuningGraphFunc = AoeStatus (*)(SessionId, const std::map &); + +struct AoeFunc { + AoeInitializeFunc aoe_initialize = nullptr; + AoeFinalizeFunc aoe_finalize = nullptr; + AoeCreateSessionFunc aoe_create_session = nullptr; + AoeDestroySessionFunc aoe_destroy_session = nullptr; + AoeSetGeSessionFunc aoe_set_gesession = nullptr; + AoeSetDependGraphFunc aoe_set_dependgraphs = nullptr; + AoeSetTuningGraphFunc aoe_set_tuninggraph = nullptr; + AoeTuningGraphFunc aoe_tuning_graph = nullptr; + AoeSetDependGraphsInputsFunc aoe_set_depend_graphs_inputs = nullptr; + AoeSetTuningGraphInputFunc aoe_set_tuning_graph_input = nullptr; +}; + +class NpuAoe { + public: + NpuAoe() = default; + ~NpuAoe(); + static NpuAoe &GetInstance(); + Status AoeTuningInitialize(OpKernelConstruction *ctx, const std::string &work_path, + const std::string &job_type); + Status RunAoeTuning(NpuDevice &device, TFE_Context *context, bool need_build, uint64_t graph_id, + const std::string &name, const tensorflow::GraphDef &graph_def, + std::vector &inputs); + Status AoeTuningFinalize(); + + NpuAoe(const NpuAoe&) = delete; + NpuAoe(NpuAoe &&) = delete; + NpuAoe& operator=(const NpuAoe&) = delete; + NpuAoe& operator=(NpuAoe &&) = delete; + + private: + Status LoadAoeFunc(OpKernelConstruction *ctx); + + AoeFunc aoe_func_; + void *handle_ = nullptr; + bool is_initialize_ = false; + std::map ge_graphs_; + mutex aoe_mu_; +}; +} // namespace npu + +#endif // TENSORFLOW_TFADAPTER_GRAPH_NPU_AOE_H_ \ No newline at end of file diff --git a/tf_adapter/kernels/geop_npu.cc b/tf_adapter/kernels/geop_npu.cc index 537994554704cccd28487f9d60cb74dde2d405af..97c750d4762cab83afaf1092cf7cff817a1a54c0 100644 --- a/tf_adapter/kernels/geop_npu.cc +++ b/tf_adapter/kernels/geop_npu.cc @@ -80,6 +80,8 @@ #include "tf_adapter_2.x/npu_device/core/npu_micros.h" #include "tensorflow/core/graph/algorithm.h" #include "tensorflow/core/framework/graph_to_functiondef.h" +#include "graph/npu_aoe.h" +#include "graph/graph_manager.h" namespace tensorflow { #ifdef TF_VERSION_TF2 @@ -349,19 +351,18 @@ std::string CurrentTimeInStr() { static const int64 kMicrosToMillis = 1000; const int kInvalidGraphId = 0; -const int kMaxCacheNum = 10; const int kFatalSleepTime = 3000; const std::string kAllReduce = "HcomAllReduce"; GeOp::GeOp(OpKernelConstruction *ctx) - : AsyncOpKernel(ctx), init_flag_(false), build_flag_(false), add_graph_flag_(false), sess_init_flag_(false), - compute_graph_empty_(false), is_input_convert_(false), data_format_(""), graph_id_(0), - is_initialized_graph_(false), need_iteration_(false), tf_session_(""), ge_session_(nullptr), job_type_(""), - is_host_graph_(false), handle_(nullptr), need_compile_graph_first_(false), tuned_flag_(ATOMIC_FLAG_INIT), - jit_compile_("2"), is_dynamic_input_(false), session_id_(0), aoe_initialize_(nullptr), + : AsyncOpKernel(ctx), build_flag_(false), add_graph_flag_(false), + compute_graph_empty_(false), is_input_convert_(false), graph_id_(0), + is_initialized_graph_(false), need_iteration_(false), tf_session_(""), ge_session_(nullptr), + is_host_graph_(false), need_compile_graph_first_(false), tuned_flag_(ATOMIC_FLAG_INIT), + is_dynamic_input_(false), session_id_(0), aoe_initialize_(nullptr), aoe_finalize_(nullptr), aoe_create_session_(nullptr), aoe_destroy_session_(nullptr), aoe_set_gesession_(nullptr), aoe_set_dependgraphs_(nullptr), aoe_set_tuninggraph_(nullptr), aoe_tuning_graph_(nullptr), - aoe_set_depend_graphs_inputs_(nullptr), aoe_set_tuning_graph_input_(nullptr) { + aoe_set_depend_graphs_inputs_(nullptr), aoe_set_tuning_graph_input_(nullptr), is_multi_batch_(false) { Initialize(ctx); } @@ -369,133 +370,65 @@ GeOp::~GeOp() { Finalize(); } +void GeOp::GetTfSession() { + Status s = ctx->GetAttr("_session", &tf_session_); + if (s.ok()) { + ADP_LOG(INFO) << "[GEOP] get session info from attr, tf session: " << tf_session_; + } + if (attr_manager_.GetOption("job") != "localhost") { // in ps mode : ctx->session_handle() is empty + tf_session_ = "ps_worker_session"; + ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " when in ps mode."; + } + if (tf_session_.empty()) { + tf_session_ = ctx->session_handle(); + ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " from session handle."; + } +} + void GeOp::Initialize(OpKernelConstruction *ctx) { mutex_lock lock{mu_}; int64 startTime = InferShapeUtil::GetCurrentTimestap(); ADP_LOG(INFO) << "[GEOP] Begin GeOp initialize."; - if (init_flag_) { - ADP_LOG(WARNING) << "[GEOP] GEOP already Initialize."; - return; - } - CHECK_NOT_NULL(ctx); const NameAttrList *func = nullptr; OP_REQUIRES_OK(ctx, ctx->GetAttr("function", &func)); function_ = *func; - std::string data_format; - OP_REQUIRES_OK(ctx, ctx->GetAttr("data_format", &data_format)); - ADP_LOG(INFO) << "Attr 'data_format' of " << ctx->def().name() << " is " << data_format; - this->data_format_ = data_format; - - Status s = ctx->GetAttr("_session", &tf_session_); - if (s.ok()) { - ADP_LOG(INFO) << "[GEOP] get session info from attr, tf session: " << tf_session_; - } + GetTfSession(); + attr_manager_.Init(ctx); - (void) ctx->GetAttr("_recompute_mode", &recompute_mode_); - (void) ctx->GetAttr("_max_key_num", &max_key_num_); - (void) ctx->GetAttr("_use_counter_filter", &use_counter_filter_); - (void) ctx->GetAttr("_embedding_dim", &embedding_dim_); - (void) ctx->GetAttr("_compile_dynamic_mode", &compile_dynamic_mode_); - (void) ctx->GetAttr("_padding_key", &padding_key_); - (void) ctx->GetAttr("_embedding_flags", &embedding_flags_); - (void) ctx->GetAttr("_dynamic_input", &dynamic_input_); - (void) ctx->GetAttr("_jit_compile", &jit_compile_); - if (!dynamic_input_.empty() && dynamic_input_ == "1") { - jit_compile_ = "1"; + if (attr_manager_.GetOption("_dynamic_input") == "1") { + attr_manager_.SetOption("_jit_compile", "1"); is_dynamic_input_ = true; - OP_REQUIRES_OK(ctx, ctx->GetAttr("_dynamic_graph_execute_mode", &dynamic_graph_execute_mode_)); - (void) ctx->GetAttr("_getnext_inputs_shape_range", &getnext_inputs_shape_range_); - (void) ctx->GetAttr("_data_inputs_shape_range", &data_inputs_shape_range_); - (void) ctx->GetAttr("_is_dynamic_getnext", &is_dynamic_getnext_); - (void) ctx->GetAttr("_placeholder_index", &placeholder_index_); - } - (void) ctx->GetAttr("_train_graph", &is_train_graph_); - (void) ctx->GetAttr("_is_var_init_graph", &is_var_init_graph_); - ADP_LOG(INFO) << "[GEOP] dynamic_input: " << dynamic_input_ - << ", dynamic_graph_execute_mode: " << dynamic_graph_execute_mode_ - << ", jit_compile: " << jit_compile_ - << ", is_dynamic_input: " << is_dynamic_input_ - << ", getnext_inputs_shape_range: " << getnext_inputs_shape_range_ - << ", data_inputs_shape_range: " << data_inputs_shape_range_ << ", is_train_graph: " << is_train_graph_ - << ", is_dynamic_getnext: " << is_dynamic_getnext_ << ", placeholder_index: " << placeholder_index_ - << ", is_var_init_graph: " << is_var_init_graph_ << ", use_counter_filter: " << use_counter_filter_ - << ", max_key_num: " << max_key_num_ << ", embedding_dim: " << embedding_dim_ - << ", padding_key: " << padding_key_ << ", embedding_flags: " << embedding_flags_ - << ", compile_dynamic_mode: " << compile_dynamic_mode_; + } + is_multi_batch_ = IsDynamicConfig(); // global environment Initialize, invoke once for each process - std::string sess_config = ""; - OP_REQUIRES_OK(ctx, ctx->GetAttr("_NpuOptimizer", &sess_config)); - std::map pass_options = NpuAttrs::GetPassOptions(ctx); - iteration_per_loop_ = std::atoi(pass_options["iterations_per_loop"].c_str()); - job_type_ = pass_options["job"]; - mix_compile_mode_ = pass_options["mix_compile_mode"]; - accelerate_train_mode_ = pass_options["accelerate_train_mode"]; - ADP_LOG(INFO) << "accelerate train mode :" << accelerate_train_mode_; + iteration_per_loop_ = std::atoi(attr_manager_.GetOption("iterations_per_loop").c_str()); + is_accelerate_train_mode_on_ = IsAccelerateTrainOn(); + is_aoe_ = IsAoeOn(); if (GePlugin::GetInstance()->IsGlobal()) { ADP_LOG(INFO) << "[GEOP] GePlugin global, skip GePlugin init"; - InitAoeFlag(); } else { - init_options_ = NpuAttrs::GetInitOptions(ctx); - InitAoeFlag(); + init_options_ = attr_manager_.GetInitOption(); // aoe should not init ge async GePlugin::GetInstance()->Init(init_options_, false, !is_aoe_); ADP_LOG(INFO) << "[GEOP] GePlugin init success."; } - ADP_LOG(INFO) << "init options: "; + sess_options_ = attr_manager_.GetSessionOption(); + + graph_id_ = GraphManager::GetInstance().CreateGraphId(tf_session_, ctx->def().name()); + + ADP_LOG(INFO) << "[GEOP] Node name: " << ctx->def().name() << ", tf session: " << tf_session_; + + OP_REQUIRES_OK(ctx, CreateGeSession()); + if (is_aoe_) { - handle_ = mmDlopen("libaoe_tuning.so", MMPA_RTLD_NOW); - OP_REQUIRES(ctx, handle_ != nullptr, errors::InvalidArgument("libaoe_tuning.so dlopen failed, ", mmDlerror())); - // aoe init - aoe_initialize_ = (AoeInitializeFunc) mmDlsym(handle_, "AoeInitialize"); - OP_REQUIRES(ctx, aoe_initialize_ != nullptr, - errors::InvalidArgument("dlsym Aoe initialize API failed, ", mmDlerror())); - // aoe finalize - aoe_finalize_ = (AoeFinalizeFunc) mmDlsym(handle_, "AoeFinalize"); - OP_REQUIRES(ctx, aoe_initialize_ != nullptr, - errors::InvalidArgument("dlsym Aoe Finalize API failed, ", mmDlerror())); - // aoe create session - aoe_create_session_ = (AoeCreateSessionFunc) mmDlsym(handle_, "AoeCreateSession"); - OP_REQUIRES(ctx, aoe_create_session_ != nullptr, - errors::InvalidArgument("dlsym Aoe create session API failed, ", mmDlerror())); - // aoe destroy session - aoe_destroy_session_ = (AoeDestroySessionFunc) mmDlsym(handle_, "AoeDestroySession"); - OP_REQUIRES(ctx, aoe_destroy_session_ != nullptr, - errors::InvalidArgument("dlsym Aoe destroy session API failed, ", mmDlerror())); - // share ge_session to aoe - aoe_set_gesession_ = (AoeSetGeSessionFunc) mmDlsym(handle_, "AoeSetGeSession"); - OP_REQUIRES(ctx, aoe_set_gesession_ != nullptr, - errors::InvalidArgument("dlsym Aoe set session API failed, ", mmDlerror())); - // aoe set depend graphs - aoe_set_dependgraphs_ = (AoeSetDependGraphFunc) mmDlsym(handle_, "AoeSetDependGraphs"); - OP_REQUIRES(ctx, aoe_set_dependgraphs_ != nullptr, - errors::InvalidArgument("dlsym Aoe set depend graphs API failed, ", mmDlerror())); - // aoe set tuning graph - aoe_set_tuninggraph_ = (AoeSetTuningGraphFunc) mmDlsym(handle_, "AoeSetTuningGraph"); - OP_REQUIRES(ctx, aoe_set_tuninggraph_ != nullptr, - errors::InvalidArgument("dlsym Aoe aoe set tuning graph API failed, ", mmDlerror())); - // aoe tuning - aoe_tuning_graph_ = (AoeTuningGraphFunc) mmDlsym(handle_, "AoeTuningGraph"); - OP_REQUIRES(ctx, aoe_tuning_graph_ != nullptr, - errors::InvalidArgument("dlsym Aoe tuning graph API failed, ", mmDlerror())); - // aoe set tuning depend graphs inputs - aoe_set_depend_graphs_inputs_ = - reinterpret_cast(mmDlsym(handle_, "AoeSetDependGraphsInputs")); - OP_REQUIRES(ctx, aoe_set_depend_graphs_inputs_ != nullptr, - errors::InvalidArgument("dlsym Aoe set tuning depend graphs inputs API failed, ", mmDlerror())); - // aoe set tuning graph inputs - aoe_set_tuning_graph_input_ = - reinterpret_cast(mmDlsym(handle_, "AoeSetTuningGraphInput")); - OP_REQUIRES(ctx, aoe_set_tuning_graph_input_ != nullptr, - errors::InvalidArgument("dlsym Aoe set tuning graph inputs API failed, ", mmDlerror())); - } - - sess_options_ = NpuAttrs::GetSessOptions(ctx); + NpuAoe::GetInstance().AoeTuningInitialize(ctx, + init_options_["ge.tuningPath"], init_options_["ge.jobType"], + sess_options_["ge.resourceConfigPath"]); + } input_shapes_vec_.resize(ctx->num_inputs() + 1, absl::nullopt); - - init_flag_ = true; int64 endTime = InferShapeUtil::GetCurrentTimestap(); ADP_LOG(EVENT) << "[GEOP] GeOp Initialize success, cost: " << " [" << ((endTime - startTime) / kMicrosToMillis) << " ms]."; @@ -526,16 +459,11 @@ void GeOp::Finalize() { if (!GePlugin::GetInstance()->IsGlobal()) { GePlugin::GetInstance()->Finalize(); ADP_LOG(INFO) << "[GEOP] GePlugin Finalize success."; - if (!init_options_["ge.jobType"].empty() && !init_options_["ge.tuningPath"].empty() && - aoe_finalize_ != nullptr && tuned_initialize_flag_) { - AoeStatus tune_ret = (*aoe_finalize_)(); - if (tune_ret != Aoe::AOE_SUCCESS) { - ADP_LOG(ERROR) << "[GEOP] exec aoe finalize func failed."; - LOG(ERROR) << "[GEOP] exec aoe finalize func failed."; + if (is_aoe_) { + if (NpuAoe::GetInstance().AoeTuningFinalize() != Status::OK()) { return; } } - tuned_initialize_flag_ = false; } else { ADP_LOG(INFO) << "[GEOP] GePlugin global, skip GePlugin Finalize"; } @@ -543,13 +471,9 @@ void GeOp::Finalize() { ADP_LOG(WARNING) << "[GEOP] Save check report failed."; LOG(WARNING) << "[GEOP] Save check report failed."; } - if (handle_ != nullptr) { - (void) mmDlclose(handle_); - } } } } - init_flag_ = false; ADP_LOG(INFO) << "[GEOP] GeOp Finalize success, tf session: " << tf_session_ << ", graph_id_: " << graph_id_; return; } @@ -620,11 +544,11 @@ Status GeOp::AccelerateInfo::JudgeNeedRecompile(bool &need_recompile) { } Status GeOp::DoAccelerateTrain() { - if (!IsAccelerateTrainOn()) { + if (!is_accelerate_train_mode_on_) { return Status::OK(); } // accelerate_train_mode_ must be valid if `IsAccelerateTrainOn` is true - REQUIRES_STATUS_OK(ParserAccelerateTrain(accelerate_train_mode_)); + REQUIRES_STATUS_OK(ParserAccelerateTrain(attr_manager_.GetOption("accelerate_train_mode"))); // accelerate by modify precision mode if (need_recover_precision_mode_) { @@ -636,11 +560,11 @@ Status GeOp::DoAccelerateTrain() { } Status GeOp::NeedRecompileWhenAccelerateTrainOn(bool &need_recompile) { - if (!IsAccelerateTrainOn()) { + if (!is_accelerate_train_mode_on_) { need_recompile = false; return Status::OK(); } - REQUIRES_STATUS_OK(ParserAccelerateTrain(accelerate_train_mode_)); + REQUIRES_STATUS_OK(ParserAccelerateTrain(attr_manager_.GetOption("accelerate_train_mode"))); return accelerate_info_.JudgeNeedRecompile(need_recompile); } @@ -741,7 +665,7 @@ Status GeOp::ParserAccelerateTrain(const std::string &accelerate_train_mode) { } bool GeOp::IsAccelerateTrainOn() { - return !(accelerate_train_mode_.empty()); + return !(attr_manager_.GetOption("accelerate_train_mode").empty()); } Status GeOp::CheckAndModifyPrecisionMode() { @@ -928,24 +852,12 @@ void GeOp::GetExecGraphId(uint32_t &cache_graph_id, std::vector inp } bool GeOp::IsDynamicConfig() { - const bool result = !sess_options_["ge.inputShape"].empty() && !sess_options_["ge.dynamicDims"].empty() && - !sess_options_["ge.dynamicNodeType"].empty(); + const bool result = !attr_manager_.GetOption("_input_shape").empty() && !attr_manager_.GetOption("_dynamic_dims").empty() && + !attr_manager_.GetOption("_dynamic_node_type").empty(); ADP_LOG(INFO) << "[GEOP] IsDynamicConfig result is: " << result; return result; } -void GeOp::SetDynamicInput() { - if (dynamic_input_ == "1") { - graph_options_["ge.exec.dynamicInput"] = dynamic_input_; - graph_options_["ge.exec.dynamicGraphExecuteMode"] = dynamic_graph_execute_mode_; - graph_options_["ge.exec.dataInputsShapeRange"] = data_inputs_shape_range_; - if (dynamic_graph_execute_mode_ == "dynamic_execute" && data_inputs_shape_range_.empty() && - getnext_inputs_shape_range_.empty()) { - graph_options_["ge.shape_generalized_build_mode"] = "shape_generalized"; - } - } -} - PartialTensorShape GeOp::MakeCompatShape(const PartialTensorShape &a, const PartialTensorShape &b) const { const static auto kUnknownRankShape = PartialTensorShape(); if (a.dims() != b.dims()) { @@ -956,12 +868,13 @@ PartialTensorShape GeOp::MakeCompatShape(const PartialTensorShape &a, const Part bool GeOp::MaybeUpdateShape(OpKernelContext *const ctx) { bool updated = false; + std::string compile_dynamic_mode = attr_manager_.GetOption("_compile_dynamic_mode"); for (size_t i = 0UL; i < static_cast(ctx->num_inputs()); i++) { auto &shape = input_shapes_vec_[i]; auto &value_shape = ctx->input(static_cast(i)).shape(); if (!shape.has_value()) { // 第一次迭代时初始化shape - if (compile_dynamic_mode_ == "1") { + if (compile_dynamic_mode == "1") { shape = MakeUnknownShape(value_shape.dims()); } else { shape = value_shape; @@ -974,7 +887,7 @@ bool GeOp::MaybeUpdateShape(OpKernelContext *const ctx) { updated = true; ADP_LOG(INFO) << "Compat input " << i << " shape " << shape.value().DebugString() << " vs. " << value_shape.DebugString(); - if ((jit_compile_ == "1") && (compile_dynamic_mode_ != "1")) { + if ((attr_manager_.GetOption("_jit_compile") == "1") && (compile_dynamic_mode != "1")) { shape = value_shape; ADP_LOG(WARNING) << "Dynamic shape, recommended to configure jit_compile value to false or auto"; } else { @@ -999,19 +912,9 @@ Status GeOp::CreateGeSession() { LOG(ERROR) << ss.str(); return errors::Internal(ss.str()); } - static bool first = true; - if (first) { - ADP_LOG(INFO) << "[GePlugin] Initialize ge success."; - first = false; + if (!SessionManager::GetInstance().GetOrCreateGeSession(tf_session_, ge_session_, sess_options_)) { + return errors::Internal("Get ge session failed."); } - if (!sess_init_flag_) { - mutex_lock lock{mu_}; - if (!SessionManager::GetInstance().GetOrCreateGeSession(tf_session_, ge_session_, sess_options_) || - tf_session_.empty() || ge_session_ == nullptr) { - return errors::Internal("Get ge session failed."); - } - } - sess_init_flag_ = true; ADP_LOG(INFO) << "[GEOP] tf session: " << tf_session_ << " get ge session success."; return Status::OK(); } @@ -1056,47 +959,6 @@ PartialTensorShape GeOp::MakeUnknownShape(const int32_t &size) const { } void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { - // ctx is not nullptr - OP_REQUIRES_ASYNC(ctx, init_flag_, errors::InvalidArgument("GeOp not Initialize success."), done); - if (!sess_init_flag_) { - if (job_type_ != "localhost") { // in ps mode : ctx->session_handle() is empty - tf_session_ = "ps_worker_session"; - ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " when in ps mode."; - } - if (tf_session_.empty()) { - tf_session_ = ctx->session_handle(); - ADP_LOG(INFO) << "[GEOP] get tf session " << tf_session_ << " from session handle."; - } - OP_REQUIRES_ASYNC(ctx, IncrementGraphIdCount(graph_id_), errors::Internal("Get ge session failed."), done); - - ADP_LOG(INFO) << "[GEOP] Node name: " << ctx->op_kernel().name() << " , tf session: " << tf_session_; - if (!init_options_["ge.jobType"].empty() && !init_options_["ge.tuningPath"].empty()) { - uint32_t device_id = 0; - OP_REQUIRES_OK_ASYNC(ctx, GetEnvDeviceID(device_id), done); - ADP_LOG(INFO) << "[GEOP] in tuning func, aoe_mode:" << init_options_["ge.jobType"] - << ", work_path:" << init_options_["ge.tuningPath"] - << ", distribute_config:" << init_options_["distribute_config"]; - tune_options_.insert(init_options_.cbegin(), init_options_.cend()); - tune_options_.insert({"devices", std::to_string(device_id)}); - tune_options_.insert(sess_options_.cbegin(), sess_options_.cend()); - tune_options_.insert({"work_path", init_options_["ge.tuningPath"]}); - tune_options_.insert({"job_type", init_options_["ge.jobType"]}); - // aoe ini - if (!tuned_initialize_flag_) { - std::map global_options; - global_options.insert( - {ge::AscendString("work_path"), ge::AscendString(init_options_["ge.tuningPath"].c_str())}); - global_options.insert({ge::AscendString("job_type"), ge::AscendString(init_options_["ge.jobType"].c_str())}); - global_options.insert({ge::AscendString("ge.resourceConfigPath"), - ge::AscendString(sess_options_["ge.resourceConfigPath"].c_str())}); - AoeStatus init_ret = (*aoe_initialize_)(global_options); - OP_REQUIRES_ASYNC(ctx, init_ret == Aoe::AOE_SUCCESS, - errors::Internal("[GEOP] exec aoe initialize func failed[", init_ret, "]."), done); - tuned_initialize_flag_ = true; - } - } - } - // convert input to const OP_REQUIRES_OK_ASYNC(ctx, GraphInputConvertToConst(ctx), done); std::string geop_name = ctx->op_kernel().name(); @@ -1107,272 +969,56 @@ void GeOp::ComputeAsync(OpKernelContext *ctx, DoneCallback done) { int64 startTime = InferShapeUtil::GetCurrentTimestap(); int64 endTime = 0; - // To be compatible with old versions, we should check dynamic_input_ and dynamic_config - bool is_set_dynamic_config = IsDynamicConfig(); - if (dynamic_input_ != "1" && !is_set_dynamic_config) { - bool shape_changed = MaybeUpdateShape(ctx); - if (build_flag_ && shape_changed) { - ge::Status status = ge_session_->RemoveGraph(graph_id_); - if (status != ge::SUCCESS) { - ADP_LOG(WARNING) << "[GEOP] GE remove graph failed, ret : " << ToString(status) << ", graph_id: " << graph_id_; - } - build_flag_ = false; - } - } - std::vector input_vec; std::vector input_shapes; std::vector inputs; OP_REQUIRES_OK_ASYNC(ctx, (BuildInputTensorInfo(ctx, input_vec, input_shapes, inputs)), done); + // To be compatible with old versions, we should check dynamic_input and dynamic_config + bool shape_changed = false; + if ((!is_dynamic_input_) && (!is_multi_batch_)) { + shape_changed = MaybeUpdateShape(ctx); + } +// CheckRebuild // if input shapes changed, cache graphs uint32_t cache_graph_id = graph_id_; - bool is_lazy_recompile_mode = (dynamic_input_ == "1") && (dynamic_graph_execute_mode_ == "lazy_recompile"); - ADP_LOG(INFO) << "is_set_dynamic_config: " << is_set_dynamic_config + bool is_lazy_recompile_mode = (is_dynamic_input_) && + (attr_manager_.GetOption("_dynamic_graph_execute_mode") == "lazy_recompile"); + ADP_LOG(INFO) << "is_multi_batch: " << is_multi_batch_ << " is_aoe_: " << is_aoe_ << " is_lazy_recompile_mode: " << is_lazy_recompile_mode; - if (is_aoe_) { - if (is_set_dynamic_config) { - ADP_LOG(ERROR) << "dynamic input config can not use with mstuning."; - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("dynamic input config can not use with mstuning."), done); - return; - } + if (is_aoe_ ) { + OP_REQUIRES_ASYNC(ctx, !is_multi_batch_, + errors::Internal("dynamic input config can not use with mstuning."), done); auto input_vec_aoe = input_vec; - if (RunTuning(input_vec_aoe, inputs, ctx) != 0) { - ADP_LOG(ERROR) << "RunTuning fail."; - std::stringstream ss; - ss << std::endl << ge::GEGetErrorMsgV2().GetString(); - OP_REQUIRES_ASYNC(ctx, false, errors::Internal(ss.str()), done); - } - if (InitRebuildFlag(cache_graph_id) != 0) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("Failed to check rebuild flag"), done); - return; - } - ADP_LOG(INFO) << geop_name << " RunTuning finish."; - } else if (is_set_dynamic_config) { - if (InitRebuildFlag(cache_graph_id) != 0) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("Failed to check rebuild flag"), done); - return; - } - } else { - // in dynamic input mode, cache graphs. - if (is_lazy_recompile_mode) { - GetExecGraphId(cache_graph_id, input_shapes); - } - if (InitRebuildFlag(cache_graph_id) != 0) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("Failed to check rebuild flag"), done); - return; - } - } - - if (!build_flag_) { - // Get Graph - OP_REQUIRES_ASYNC(ctx, ctx->function_library() != nullptr, errors::Internal("function library is nullptr"), done); - FunctionLibraryDefinition *flib_def = - const_cast(ctx->function_library()->GetFunctionLibraryDefinition()); - OP_REQUIRES_ASYNC(ctx, flib_def != nullptr, errors::Internal("flib_def is nullptr"), done); - - // Build GraphDef from FunctionDef - GraphDef ori_graph_def; - bool is_allreduce = false; - OP_REQUIRES_OK_ASYNC(ctx, BuildGraphDef(*flib_def, input_vec, ori_graph_def, is_initialized_graph_, is_allreduce), - done); - - /* if graph is init verify graph, return */ - if (this->is_initialized_graph_) { - Tensor initialized_tensor(ctx->expected_output_dtype(0), TensorShape({0})); - ctx->set_output(0, initialized_tensor); - done(); - return; - } - if (kDumpGraph) { - const std::string pbtxt_path = GetDumpPath() + "TF_" + geop_name.c_str() + ".pbtxt"; - (void) WriteTextProto(Env::Default(), pbtxt_path, ori_graph_def); - } - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(EVENT) << "[GEOP] In GEOP computeAsync, kernel_name: " << geop_name << " ,TFadapter cost time: [" - << ((endTime - startTime) / kMicrosToMillis) << " ms]."; - ADP_LOG(INFO) << "[GEOP] TFadpter process graph success, GE parser begin, kernel_name: " << geop_name - << " , tf session: " << tf_session_ << " , graph id: " << cache_graph_id; - ge::ComputeGraphPtr compute_graph = nullptr; - try { - const std::string compute_graph_name = "ge_default_" + CurrentTimeInStr(); - compute_graph = std::make_shared(compute_graph_name.c_str()); - } catch (...) { - OP_REQUIRES_ASYNC(ctx, false, errors::Internal("make shared failed"), done); - } - OP_REQUIRES_ASYNC(ctx, compute_graph != nullptr, errors::InvalidArgument("create ComputeGraph failed"), done); - // parser, tensorflow graph to ge graph - OP_REQUIRES_OK_ASYNC(ctx, DoGraphParser(compute_graph, flib_def, ori_graph_def), done); - ADP_LOG(INFO) << "[GEOP] Tensorflow graph parse to ge graph success, kernel_name: " << geop_name - << ", tf session: " << tf_session_ << " , graph id: " << cache_graph_id - << ", iteration_per_loop: " << iteration_per_loop_ << ", need iteration: " << this->need_iteration_; - size_t nodes = compute_graph->GetAllNodesSize(); - if (nodes == 0) { - build_flag_ = true; - compute_graph_empty_ = true; - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is empty, kernel_name:" << geop_name - << ", ret_status:" << ToString(ge::SUCCESS) << " , tf session: " << tf_session_ - << " ,graph id: " << cache_graph_id << " [" << ((endTime - startTime) / kMicrosToMillis) << " ms]"; - done(); - return; - } - // convert to ge::graph - if (graph_options_.count("input_format") != 0) { - ADP_LOG(INFO) << "graph_options_[\"input_format\"] = " << graph_options_["input_format"]; - } - ge::Graph ge_graph = ge::GraphUtilsEx::CreateGraphFromComputeGraph(compute_graph); - if (iteration_per_loop_ > 1) { - ge_graph.SetNeedIteration(this->need_iteration_); - graph_options_["iterations_per_loop"] = std::to_string(iteration_per_loop_); - } - - const auto cahce_option_iter = sess_options_.find("ge.graph_compiler_cache_dir"); - if (cahce_option_iter != sess_options_.cend() && !cahce_option_iter->second.empty()) { - graph_options_["ge.graph_key"] = geop_name; - } - - if (is_host_graph_) { - ADP_LOG(INFO) << "[GEOP] set graph option."; - graph_options_["ge.exec.placement"] = "HOST"; - } - graph_options_["ge.shape_generalized_build_mode"] = "shape_precise"; - if (!recompute_mode_.empty()) { - graph_options_["ge.recompute"] = recompute_mode_; - } - if (!max_key_num_.empty()) { - graph_options_["ge.max_key_num"] = max_key_num_; - } - if (!embedding_dim_.empty()) { - graph_options_["ge.embedding_dim"] = embedding_dim_; - } - if (!use_counter_filter_.empty()) { - graph_options_["ge.use_counter_filter"] = use_counter_filter_; - } - if (!padding_key_.empty()) { - graph_options_["ge.padding_key"] = padding_key_; - } - if (!embedding_flags_.empty()) { - graph_options_["ge.embedding_flags"] = embedding_flags_; - } - SetDynamicInput(); - graph_options_["ge.exec.isVarInitGraph"] = is_var_init_graph_; - graph_options_["ge.jit_compile"] = jit_compile_; - graph_options_["ge.exec.overflow"] = "1"; - graph_options_["ge.graphLevelSat"] = (mix_compile_mode_ == "0") ? "1" : "0"; - OP_REQUIRES_OK_ASYNC(ctx, DoAccelerateTrain(), done); - // call ge session addGraph api - auto graph_options = graph_options_; - if (is_aoe_) { - graph_options["ge.buildMode"] = "normal"; - } - if ((is_dynamic_getnext_ != "1") && (iteration_per_loop_ <= 1)) { - SetReuseOptions("ge.exec.inputReuseMemIndexes", ctx->num_inputs(), sess_options_, init_options_, graph_options); - } - SetReuseOptions("ge.exec.outputReuseMemIndexes", ctx->num_outputs(), sess_options_, init_options_, graph_options); - ADP_LOG(EVENT) << "[GEOP] call ge session add graph jit_compile: " << jit_compile_; - graph_options["ge.exec.graphIOMemAllocMode"] = "ByGE"; - OP_REQUIRES_OK_ASYNC(ctx, CreateGeSession(), done); - auto const graph_option_ascend_string = ChangeStringToAscendString(graph_options); - ADP_LOG(INFO) << "Graph options: "; - NpuAttrs::LogOptions(graph_options); - auto status = ge_session_->AddGraph(cache_graph_id, ge_graph, graph_option_ascend_string); - std::stringstream ss; - if (status != ge::SUCCESS) { - std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); - ADP_LOG(FATAL) << "[GEOP] call ge session add graph failed, kernel: " << geop_name << " ,tf session: " - << tf_session_ << ", graph id: " << cache_graph_id; - - ss << "[GEOP] call ge session add graph failed, kernel: " << geop_name << ", tf session: " << tf_session_ - << ", graph id: " << cache_graph_id << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - } - OP_REQUIRES_ASYNC(ctx, status == ge::SUCCESS, errors::Internal(ss.str()), done); - add_graph_flag_ = true; - ADP_LOG(INFO) << "[GEOP] Add graph to ge session success, kernel_name: " << geop_name - << ", tf session: " << tf_session_ << ", graph id: " << cache_graph_id; - - build_flag_ = true; - if (!is_set_dynamic_config && is_lazy_recompile_mode) { - cache_graphs_.insert(std::make_pair(input_shapes, cache_graph_id)); - graph_counts_.push_back(std::make_pair(input_shapes, 1)); - } - if (need_compile_graph_first_) { - ge::Status build_graph_status = ge_session_->BuildGraph(cache_graph_id, inputs); - std::stringstream ss; - if (build_graph_status != ge::SUCCESS) { - ss << "[GEOP] GE session build graph failed, domi_ret : " << build_graph_status << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - } - OP_REQUIRES_ASYNC(ctx, build_graph_status == ge::SUCCESS, errors::Internal(ss.str()), done); - ADP_LOG(INFO) << "[GEOP] Build graph success."; - done(); - return; - } - LOG(INFO) << "The model has been compiled on the Ascend AI processor, current graph id is: " << cache_graph_id; - } else { - if (compute_graph_empty_) { - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, compute_graph is empty, kernel_name:" << geop_name - << ", ret_status:" << ToString(ge::SUCCESS) << " , tf session: " << tf_session_ - << " ,graph id: " << cache_graph_id << " [" << ((endTime - startTime) / kMicrosToMillis) << " ms]"; - done(); - return; - } - } - - int64 run_start_time = InferShapeUtil::GetCurrentTimestap(); - auto callback = [done, ctx, run_start_time](ge::Status ge_status, std::vector &outputs) { - if (ge_status == ge::SUCCESS) { - if (BuildOutputTensorInfo(ctx, outputs) != Status::OK()) { - ADP_LOG(FATAL) << ctx->op_kernel().name() << " GEOP::DoRunAsync get output failed."; - std::stringstream ss; - ss << ctx->op_kernel().name() << "GEOP::DoRunAsync get output failed." << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - OP_REQUIRES_ASYNC(ctx, false, errors::Internal(ss.str()), done); - return; - } - } else if (ge_status == ge::END_OF_SEQUENCE) { - ctx->SetStatus(errors::OutOfRange("End of sequence")); - ADP_LOG(WARNING) << "[GEOP] Out of range: End of sequence."; - LOG(WARNING) << "[GEOP] Out of range: End of sequence."; - } else if (ge_status != ge::SUCCESS) { - std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); - ADP_LOG(FATAL) << ctx->op_kernel().name() << "GEOP::::DoRunAsync Failed"; - std::stringstream ss; - ss << ctx->op_kernel().name() << "GEOP::::DoRunAsync Failed" << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - OP_REQUIRES_ASYNC(ctx, false, errors::Internal(ss.str()), done); - return; - } - int64 run_end_time = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(INFO) << "[GEOP] RunGraphAsync callback, status:" << ge_status - << ", kernel_name:" << ctx->op_kernel().name() << "[ " << (run_end_time - run_start_time) << "us]"; - done(); - }; - - // call ge session runGraphAsync api - ADP_LOG(INFO) << "[GEOP] Call ge session RunGraphAsync, kernel_name: " << geop_name << ", tf session: " << tf_session_ - << ", graph id: " << cache_graph_id; - ge::Status run_graph_status = ge_session_->RunGraphAsync(cache_graph_id, inputs, callback); - std::stringstream ss; - if (run_graph_status != ge::SUCCESS) { - std::this_thread::sleep_for(std::chrono::milliseconds(kFatalSleepTime)); - ADP_LOG(FATAL) << "[GEOP] call ge session RunGraphAsync Failed, kernel:" << geop_name << " ,tf session: " - << tf_session_ << " ,graph id: " << cache_graph_id; - ss << "[GEOP] call ge session RunGraphAsync Failed, kernel:" << geop_name << ", tf session: " << tf_session_ - << ", graph id: " << cache_graph_id << std::endl - << "Error Message is : " << std::endl << ge::GEGetErrorMsgV2().GetString(); - } - OP_REQUIRES_ASYNC(ctx, run_graph_status == ge::SUCCESS, errors::Internal(ss.str()), done); - - endTime = InferShapeUtil::GetCurrentTimestap(); - ADP_LOG(INFO) << "[GEOP] End GeOp::ComputeAsync, kernel_name: " << geop_name - << ", ret_status: " << ToString(run_graph_status) << ", tf session : " << tf_session_ - << ", graph id: " << cache_graph_id << ", cost [" << ((endTime - startTime) / kMicrosToMillis) << "ms]"; + OP_REQUIRES_ASYNC(ctx, NpuAoe::GetInstance().RunAoeTuning(input_vec_aoe, inputs, cache_graph_id, ctx).ok(), + errors::Internal("Run tuning failed."), done); + } + // in dynamic input mode, cache graphs. + if (is_lazy_recompile_mode) { + GetExecGraphId(cache_graph_id, input_shapes); + } + OP_REQUIRES_ASYNC(ctx, graph_handler_.CheckAndRemoveGraph(shape_changed, cache_graph_id).ok(), + errors::Internal("Check and remove graph failed."), done); + +// End CheckRebuild + OP_REQUIRES_ASYNC(ctx, graph_handler_.ParserGraph().ok(), + errors::Internal("Check and remove graph failed."), done); +// End ParserGraph +// AddGraph + OP_REQUIRES_ASYNC(ctx, graph_handler_.AddGraph().ok(), + errors::Internal("Check and remove graph failed."), done); +// end AddGraph +// BuildGraph + OP_REQUIRES_ASYNC(ctx, graph_handler_.BuildGraph().ok(), + errors::Internal("Check and remove graph failed."), done); +// EndBuildGraph +// RunGraph + OP_REQUIRES_ASYNC(ctx, graph_handler_.RunGraph().ok(), + errors::Internal("Check and remove graph failed."), done); return; +// EndRunGraph } void GeOp::ChangeChannelNameAttr(NodeDef &node_def) const { @@ -1423,9 +1069,9 @@ void GeOp::AddNodeAttrs(Node *node, bool &is_initialize) { // Add dp custom kernel label if (node->type_string() == "IteratorGetNext") { node->AddAttr("_kernel", "dp"); - if (dynamic_input_ == "1") { - node->AddAttr("_dynamic_graph_execute_mode", dynamic_graph_execute_mode_); - node->AddAttr("_getnext_inputs_shape_range", getnext_inputs_shape_range_); + if (is_dynamic_input_) { + node->AddAttr("_dynamic_graph_execute_mode", attr_manager_.GetOption("_dynamic_graph_execute_mode")); + node->AddAttr("_getnext_inputs_shape_range", attr_manager_.GetOption("_getnext_inputs_shape_range")); } } if (node->type_string() == "Assert" || node->type_string() == "Print" || node->type_string() == "PrintV2") { @@ -1589,7 +1235,8 @@ void GeOp::HandleDpOpAndGetNextNodes(Graph &graph) { remove_nodes.push_back(iterator_node); } } - if (dynamic_input_ == "1" && dynamic_graph_execute_mode_ == "lazy_recompile") { + if (is_dynamic_input_ && + attr_manager_.GetOption("_dynamic_graph_execute_mode") == "lazy_recompile") { graph_options_["ge.exec.enableCopyOutputAddr"] = "1"; } } @@ -1708,16 +1355,15 @@ Status GeOp::BuildGraphDef(FunctionLibraryDefinition &flib_def, const std::vecto return ret; } - bool is_set_dynamic_config = IsDynamicConfig(); - if (is_set_dynamic_config) { - jit_compile_ = "1"; + if (is_multi_batch_) { + attr_manager_.SetOption("_jit_compile", "1"); BuildShapeNodeAndCacheArgNodes(graph); } NPU_REQUIRES_OK(ProcessForDiffNodeTypes(graph, is_initialize, is_allreduce)); // set input_shape to dynamic nodes shape desc - if (is_set_dynamic_config) { + if (is_multi_batch_) { ret = ChangeInputsShapeDesc(); if (!ret.ok()) { ADP_LOG(ERROR) << "[GEOP] ChangeInputsShapeDesc failed, " << ret.error_message(); @@ -1728,7 +1374,9 @@ Status GeOp::BuildGraphDef(FunctionLibraryDefinition &flib_def, const std::vecto HandleDpOpAndGetNextNodes(graph); // 二进制场景(jit=0 or jit=2), 如果shape变化,则更新输入shape - if ((jit_compile_ != "1") || (compile_dynamic_mode_ == "1")) { UpdateInputsShapeDesc(graph); } + if ((attr_manager_.GetOption("_jit_compile") != "1") || (attr_manager_.GetOption("_compile_dynamic_mode"))) { + UpdateInputsShapeDesc(graph); + } graph.ToGraphDef(&graph_def); std::string enable_force_v2_control; @@ -2012,12 +1660,11 @@ int GeOp::RunTuning(std::vector &input_vec, std::vector &inp if (is_host_graph_) { graph_options_["ge.exec.placement"] = "HOST"; } - SetDynamicInput(); graph_options_["ge.exec.overflow"] = "1"; - graph_options_["ge.graphLevelSat"] = (mix_compile_mode_ == "0") ? "1" : "0"; // run aoe tuning - bool is_mdat_tuning = (init_options_["ge.jobType"] == kMdatTuning) && (recompute_mode_ == kAutoRecompute); + bool is_mdat_tuning = (init_options_["ge.jobType"] == kMdatTuning) && + (attr_manager_.GetOption("_recompute_mode") == kAutoRecompute); if ((init_options_["ge.jobType"] == "1") || (init_options_["ge.jobType"] == "2") || ((init_options_["ge.jobType"] == "4") && is_allreduce) || is_mdat_tuning) { std::function callback = [this]() { @@ -2065,7 +1712,7 @@ int GeOp::RunTuning(std::vector &input_vec, std::vector &inp } // aoe tuning std::map tuing_options; - tuing_options.insert({ge::AscendString("ge.recompute"), ge::AscendString(recompute_mode_.c_str())}); + tuing_options.insert({ge::AscendString("ge.recompute"), ge::AscendString(attr_manager_.GetOption("_recompute_mode"))}); tuing_options.insert( {ge::AscendString("ge.aoe_config_file"), ge::AscendString(init_options_["ge.aoe_config_file"].c_str())}); AoeStatus aoe_tune_ret = (*aoe_tuning_graph_)(session_id_, tuing_options); @@ -2302,7 +1949,9 @@ Status GeOp::BuildInputTensorInfo(OpKernelContext *const ctx, std::vectornum_inputs(); std::string cur_input_shapes; - + std::shared_ptr model_parser = + domi::ModelParserFactory::Instance()->CreateModelParser(domi::FrameworkType::TENSORFLOW); + REQUIRES_NOT_NULL(model_parser); // populate inputs for (int i = 0; i < num_inputs; i++) { Tensor tensor(ctx->input(i)); @@ -2319,16 +1968,14 @@ Status GeOp::BuildInputTensorInfo(OpKernelContext *const ctx, std::vector model_parser = - domi::ModelParserFactory::Instance()->CreateModelParser(domi::FrameworkType::TENSORFLOW); - REQUIRES_NOT_NULL(model_parser); ge::DataType type = model_parser->ConvertToGeDataType(static_cast(data_type)); if (type == ge::DT_UNDEFINED) { ADP_LOG(ERROR) << "[GEOP] No Supported datatype : " << data_type; LOG(ERROR) << "[GEOP] No Supported datatype : " << data_type; return errors::InvalidArgument("No Supported datatype : ", data_type); } - if (is_dynamic_getnext_ == "1" && (placeholder_index_.find(std::to_string(i)) == std::string::npos)) { + if (attr_manager_.GetOption("_is_dynamic_getnext") == "1" && + (attr_manager_.GetOption("_placeholder_index").find(std::to_string(i)) == std::string::npos)) { REQUIRES_NOT_NULL(tensor_ptr); AnalyzeInputDesc(tensor_ptr, input, type, input_shapes); } else { @@ -2360,11 +2007,6 @@ Status GeOp::BuildInputTensorInfo(OpKernelContext *const ctx, std::vector(node->def()); const OpDef &op_def = node->op_def(); - std::string format = this->data_format_; + std::string format = attr_manager_.GetOption("data_format"); int32_t domi_format = domi::domiTensorFormat_t::DOMI_TENSOR_RESERVED; TF_RETURN_IF_ERROR(this->DomiFormatFromString(format, domi_format)); @@ -2544,14 +2186,14 @@ Status GeOp::DomiFormatFromString(std::string format, int32_t &domi_format) cons } return errors::Internal("DomiFormatFromString, not supported format, format = ", format); } -void GeOp::InitAoeFlag() { - is_aoe_ = (!init_options_["ge.jobType"].empty()) && (!init_options_["ge.tuningPath"].empty()); +bool GeOp::IsAoeOn() { + return (!attr_manager_.GetOption("_aoe_mode").empty()) && + (!attr_manager_.GetOption("_work_path").empty()); } } // namespace tensorflow namespace tensorflow { mutex GeOp::mu_(LINKER_INITIALIZED); -bool GeOp::tuned_initialize_flag_(false); const std::string GeOp::INPUT_DESC = "input_tensor_desc"; const std::string GeOp::OUTPUT_DESC = "output_tensor_desc"; diff --git a/tf_adapter/kernels/geop_npu.h b/tf_adapter/kernels/geop_npu.h index 9758464632f79b89e331e60f887950b38b61c136..c80bf9be484669351fb0620c7039b32b507a1fcd 100644 --- a/tf_adapter/kernels/geop_npu.h +++ b/tf_adapter/kernels/geop_npu.h @@ -19,17 +19,15 @@ #include #include - +#include "graph/event_processor.h" #include "tensorflow/core/common_runtime/function.h" #include "tensorflow/core/framework/op_kernel.h" #include "tensorflow/core/platform/mutex.h" #include "tensorflow/core/util/env_var.h" - -#include "ge/ge_api.h" +#include "graph/attr_manager.h" #include "ge/ge_api_types.h" #include "graph/tensor.h" #include "graph/utils/graph_utils.h" -#include "aoe_tuning_api.h" namespace tensorflow { using SessionId = uint64_t; @@ -170,9 +168,9 @@ public: Status DoGraphParser(ge::ComputeGraphPtr &compute_graph, FunctionLibraryDefinition *flib_def, GraphDef &ori_graph_def); - + void GetTfSession(); Status CreateGeSession(); - void InitAoeFlag(); + bool IsAoeOn(); static const std::string INPUT_DESC; static const std::string OUTPUT_DESC; static const std::string SERIALIZE_FORMAT; @@ -181,40 +179,28 @@ public: static const std::string SubGraph; static mutex mu_; - static bool tuned_initialize_flag_; - bool init_flag_; bool build_flag_; bool add_graph_flag_; bool sess_init_flag_; bool compute_graph_empty_; bool is_input_convert_; - - std::string input_shapes_; NameAttrList function_; - std::string data_format_; uint32_t graph_id_; bool is_initialized_graph_; bool need_iteration_; std::string tf_session_; ge::Session *ge_session_; - std::string job_type_; - std::string mix_compile_mode_; - std::string accelerate_train_mode_; std::map, uint32_t> cache_graphs_; std::vector, uint32_t>> graph_counts_; std::map sess_options_; std::map init_options_; - static std::unordered_map session_and_graph_id_map_; uint32_t iteration_per_loop_; bool is_host_graph_; std::map graph_options_; std::map outputs_shape_; - std::string is_train_graph_; void *handle_; std::vector dynamic_shape_nodes_; - std::string dynamic_input_; - std::string compile_dynamic_mode_; std::string dynamic_graph_execute_mode_; std::string data_inputs_shape_range_; std::string getnext_inputs_shape_range_; @@ -224,15 +210,12 @@ public: std::string placeholder_index_; std::atomic_flag tuned_flag_; std::vector> remove_index_; - std::string is_var_init_graph_; std::string max_key_num_; std::string embedding_dim_; std::string use_counter_filter_; std::string padding_key_; std::string embedding_flags_; - std::string recompute_mode_; std::vector> input_shapes_vec_; - std::string jit_compile_; bool is_dynamic_input_; std::map is_getnext_dynamic_shape_; SessionId session_id_; @@ -250,6 +233,10 @@ public: AoeSetTuningGraphInputFunc aoe_set_tuning_graph_input_; // accelerate train AccelerateInfo accelerate_info_; + EventProcessor graph_handler_; + AttrManager attr_manager_; + bool is_multi_batch_; + bool is_accelerate_train_mode_on_; }; } // namespace tensorflow #endif // TENSORFLOW_KERNELS_GEOP_NPU_H_ diff --git a/tf_adapter/util/npu_attrs.cc b/tf_adapter/util/npu_attrs.cc index f80ca6c02124145fe20ba4a7ead8cb8402908ac0..8511e15d22cc4225a93d92d7ee75aee34176f6f8 100644 --- a/tf_adapter/util/npu_attrs.cc +++ b/tf_adapter/util/npu_attrs.cc @@ -1829,7 +1829,7 @@ Status NpuAttrs::SetNpuOptimizerAttr(const GraphOptimizationPassOptions &options std::string variable_location = "Device"; std::string es_cluster_config; std::string graph_slice_mode; - std::string jit_compile; + std::string jit_compile = "2"; int64_t input_fusion_size = 131072L; // default 128KB std::string accelerate_train_mode; int32_t execute_times = -1;