From f0d9a8fcbb98bf2134bba9ca566d6f941d8e9cc4 Mon Sep 17 00:00:00 2001 From: dengtao Date: Mon, 14 Nov 2022 22:52:46 +0800 Subject: [PATCH] hccl env --- tf_adapter/interface_spec/api_npu_config.pyh | 3 +- .../npu_bridge/estimator/npu/npu_callbacks.py | 3 +- .../npu_bridge/estimator/npu/npu_config.py | 6 +- .../npu_bridge/estimator/npu/npu_estimator.py | 5 +- .../npu_bridge/estimator/npu/npu_hook.py | 3 +- .../python/npu_bridge/estimator/npu/util.py | 8 ++ .../tests/st/util/testcase/ge_plugin_test.cc | 47 ++++++- .../tests/ut/util/testcase/ge_plugin_test.cc | 41 ++++++ tf_adapter/util/ge_plugin.cc | 117 ++++++++++++------ tf_adapter/util/ge_plugin.h | 8 ++ tf_adapter/util/npu_attrs.cc | 28 +++++ .../npu_device/core/npu_wrapper.cpp | 2 + .../python/npu_device/configs/npu_config.py | 2 + .../npu_device/distribute/npu_callbacks.py | 10 +- 14 files changed, 238 insertions(+), 45 deletions(-) diff --git a/tf_adapter/interface_spec/api_npu_config.pyh b/tf_adapter/interface_spec/api_npu_config.pyh index 915c76ea6..5a80153d9 100644 --- a/tf_adapter/interface_spec/api_npu_config.pyh +++ b/tf_adapter/interface_spec/api_npu_config.pyh @@ -18,7 +18,8 @@ class NPURunConfig(run_config_lib.RunConfig): distribute_config=None, modify_mixlist=None, op_precision_mode=None, device_type="default_device_type", soc_config=None, hccl_timeout=None, op_wait_timeout=None, op_execute_timeout=None, HCCL_algorithm=None, customize_dtypes=None, op_debug_config=None, memory_config=None, experimental_config=None, - jit_compile=True, topo_sorting_mode=None, aoe_config_file=None, insert_op_file=None): + jit_compile=True, topo_sorting_mode=None, aoe_config_file=None, insert_op_file=None, stream_sync_timeout=-1, + event_sync_timeout=-1): class ProfilingConfig(): def __init__(self, enable_profiling=False, profiling_options=None): diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py index 89e33519c..697945b5e 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py @@ -22,6 +22,7 @@ from tensorflow.python.keras import backend from tensorflow.python.ops import state_ops from tensorflow.python.ops import control_flow_ops from npu_bridge.hccl import hccl_ops +from npu_bridge.estimator.npu import util as util_lib def broadcast_global_variables(root_rank): @@ -54,7 +55,7 @@ class BroadcastGlobalVariablesCallbackImpl: if self.broadcast_done: return - rank_size = os.getenv("RANK_SIZE", "1") + rank_size = util_lib.get_rank_size() if int(rank_size) > 1: bcast_op = broadcast_global_variables(self.root_rank) backend.get_session().run(bcast_op) diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_config.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_config.py index f4d1f1277..9234612df 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_config.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_config.py @@ -102,7 +102,9 @@ class NPURunConfig(run_config_lib.RunConfig): jit_compile=True, topo_sorting_mode=None, aoe_config_file=None, - insert_op_file=None + insert_op_file=None, + stream_sync_timeout=-1, + event_sync_timeout=-1 ): """ Constructs a NPUConfig. @@ -243,6 +245,8 @@ class NPURunConfig(run_config_lib.RunConfig): self.topo_sorting_mode = topo_sorting_mode self.aoe_config_file = aoe_config_file self.insert_op_file = insert_op_file + self.stream_sync_timeout = stream_sync_timeout + self.event_sync_timeout = event_sync_timeout super(NPURunConfig, self).__init__( diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py index de4e7c11f..e7389a4b3 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py @@ -37,6 +37,7 @@ from tensorflow.python.estimator import estimator as estimator_lib from tensorflow.python.estimator import model_fn as model_fn_lib from tensorflow.python.util import function_utils from tensorflow.python.util import tf_inspect +from npu_bridge.estimator.npu import util as util_lib from npu_bridge.estimator.npu import util from npu_bridge.estimator.npu.npu_config import NPURunConfig @@ -380,7 +381,7 @@ class NPUEstimator(estimator_lib.Estimator): raise RuntimeError('estimator_spec used by NPU train must have type ' '`NPUEstimatorSpec` or `EstimatorSpec`. Got {}'.format(type(estimator_spec))) # 1. NPUBroadcastGlobalVariablesHook - rank_size = os.getenv('RANK_SIZE') + rank_size = util_lib.get_rank_size() if rank_size is not None and rank_size.isdigit() and int(rank_size) > 1 and not config.horovod_mode: npu_hooks.append( NPUBroadcastGlobalVariablesHook(self.__device_info._root_rank, self.__device_info._index)) @@ -758,6 +759,8 @@ class NPUEstimator(estimator_lib.Estimator): if config.insert_op_file is not None: custom_op.parameter_map["insert_op_file"].s = config.insert_op_file custom_op.parameter_map["jit_compile"].b = config._jit_compile + custom_op.parameter_map["stream_sync_timeout"].i = config.stream_sync_timeout + custom_op.parameter_map["event_sync_timeout"].i = config.event_sync_timeout self.__load_session_device_id(config, custom_op) self.__load_modify_mixlist(config, custom_op) diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py index 4a6455a6e..d60df35c8 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py @@ -28,6 +28,7 @@ from tensorflow.python.ops import summary_ops_v2 as contrib_summary from tensorflow.python.platform import tf_logging as logging from tensorflow.python.training import session_run_hook from tensorflow.python.training import basic_session_run_hooks +from npu_bridge.estimator.npu import util as util_lib from npu_bridge.estimator import npu_ops from npu_bridge.hccl import hccl_ops @@ -89,7 +90,7 @@ class NPUBroadcastGlobalVariablesHook(session_run_hook.SessionRunHook): self._root_rank = root_rank self._index = index self._bcast_op = None - rank_size = os.getenv('RANK_SIZE', "1") + rank_size = util_lib.get_rank_size() if rank_size.isdigit(): self._rank_size = int(rank_size) else: diff --git a/tf_adapter/python/npu_bridge/estimator/npu/util.py b/tf_adapter/python/npu_bridge/estimator/npu/util.py index 7fb62822e..e22f5fa41 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/util.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/util.py @@ -232,6 +232,14 @@ def set_iteration_per_loop(sess, train_op, iterations_per_loop=1): return group_train_op +def get_rank_size(): + if os.getenv("CM_WORK_SIZE") is not None and os.getenv("RANK_SIZE") is not None: + raise ValueError("RANK_SIZE and CM_WORK_SIZE cannot be configured at the same time") + rank_size = os.getenv('RANK_SIZE') if os.getenv( + "RANK_SIZE") is not None else os.getenv('CM_WORK_SIZE', '1') + return rank_size + + class IterationPerLoop(): """ An object provide two API to create and set iterations_per_loop diff --git a/tf_adapter/tests/st/util/testcase/ge_plugin_test.cc b/tf_adapter/tests/st/util/testcase/ge_plugin_test.cc index a89b4de00..754faf743 100644 --- a/tf_adapter/tests/st/util/testcase/ge_plugin_test.cc +++ b/tf_adapter/tests/st/util/testcase/ge_plugin_test.cc @@ -30,6 +30,52 @@ TEST_F(GePluginTest, PluginInitTest) { PluginInit(init_options); } +TEST_F(GePluginTest, PluginInitTest_fail) { + std::map init_options; + setenv("JOB_ID", "1000", true); + setenv("CM_WORK_SIZE", "1", true); + setenv("RANK_ID", "0", true); + setenv("POD_NAME", "0", true); + setenv("CM_CHIEF_IP", "11", true); + setenv("CM_CHIEF_PORT", "22", true); + setenv("CM_CHIEF_DEVICE", "8", true); + setenv("CM_WORKER_IP", "127.0.0.1", true); + setenv("FUSION_TENSOR_SIZE", "524288000", true); + std::string tf_config = "{'task':{'type':'a'}, 'cluster':{'chief':['1']}}"; + setenv("TF_CONFIG", tf_config.c_str(), true); + init_options["ge.exec.profilingMode"] = "1"; + init_options["ge.exec.profilingOptions"] = "trace"; + init_options["ge.exec.precision_mode"] = "allow_fp32_to_fp16"; + init_options["ge.autoTuneMode"] = "GA"; + init_options["ge.opDebugLevel"] = "1"; + init_options["ge.jobType"] = "2"; + PluginInit(init_options); +} + +TEST_F(GePluginTest, PluginInitTest_hccl) { + std::map init_options; + unsetenv("RANK_SIZE"); + unsetenv("RANK_TABLE_FILE"); + setenv("JOB_ID", "1000", true); + setenv("CM_WORK_SIZE", "1", true); + setenv("RANK_ID", "0", true); + setenv("POD_NAME", "0", true); + setenv("CM_CHIEF_IP", "11", true); + setenv("CM_CHIEF_PORT", "22", true); + setenv("CM_CHIEF_DEVICE", "8", true); + setenv("CM_WORKER_IP", "127.0.0.1", true); + setenv("FUSION_TENSOR_SIZE", "524288000", true); + std::string tf_config = "{'task':{'type':'a'}, 'cluster':{'chief':['1']}}"; + setenv("TF_CONFIG", tf_config.c_str(), true); + init_options["ge.exec.profilingMode"] = "1"; + init_options["ge.exec.profilingOptions"] = "trace"; + init_options["ge.exec.precision_mode"] = "allow_fp32_to_fp16"; + init_options["ge.autoTuneMode"] = "GA"; + init_options["ge.opDebugLevel"] = "1"; + init_options["ge.jobType"] = "2"; + PluginInit(init_options); +} + TEST_F(GePluginTest, PluginFinalizeTest) { PluginFinalize(); } @@ -112,6 +158,5 @@ TEST_F(GePluginTest, RdmaInitAndRegisterOKTest) { int32_t ret = RdmaInitAndRegister(var_info, size); EXPECT_EQ(ret, 0); } - } } // end tensorflow \ No newline at end of file diff --git a/tf_adapter/tests/ut/util/testcase/ge_plugin_test.cc b/tf_adapter/tests/ut/util/testcase/ge_plugin_test.cc index a89b4de00..bb0bf3c4e 100644 --- a/tf_adapter/tests/ut/util/testcase/ge_plugin_test.cc +++ b/tf_adapter/tests/ut/util/testcase/ge_plugin_test.cc @@ -11,6 +11,24 @@ class GePluginTest : public testing::Test { virtual void TearDown() {} }; +TEST_F(GePluginTest, PluginInitTest_1) { + std::map init_options; + setenv("JOB_ID", "1000", true); + setenv("RANK_SIZE", "1", true); + setenv("RANK_ID", "0", true); + setenv("RANK_TABLE_FILE", "rank_table", true); + setenv("FUSION_TENSOR_SIZE", "524288000", true); + std::string tf_config = "{'task':{'type':'a'}, 'cluster':{'chief':['1']}}"; + setenv("TF_CONFIG", tf_config.c_str(), true); + init_options["ge.exec.profilingMode"] = "1"; + init_options["ge.exec.profilingOptions"] = "trace"; + init_options["ge.exec.precision_mode"] = "allow_fp32_to_fp16"; + init_options["ge.autoTuneMode"] = "GA"; + init_options["ge.opDebugLevel"] = "1"; + init_options["ge.jobType"] = "2"; + PluginInit(init_options); +} + TEST_F(GePluginTest, PluginInitTest) { std::map init_options; setenv("JOB_ID", "1000", true); @@ -30,6 +48,29 @@ TEST_F(GePluginTest, PluginInitTest) { PluginInit(init_options); } +TEST_F(GePluginTest, PluginInitTest_hccl) { + std::map init_options; + unsetenv("RANK_SIZE"); + unsetenv("RANK_TABLE_FILE"); + setenv("JOB_ID", "1000", true); + setenv("CM_WORKER_SIZE", "1", true); + setenv("RANK_ID", "0", true); + setenv("CM_CHIEF_IP", "11", true); + setenv("CM_CHIEF_PORT", "22", true); + setenv("CM_CHIEF_DEVICE", "8", true); + setenv("CM_WORKER_IP", "127.0.0.1", true); + setenv("FUSION_TENSOR_SIZE", "524288000", true); + std::string tf_config = "{'task':{'type':'a'}, 'cluster':{'chief':['1']}}"; + setenv("TF_CONFIG", tf_config.c_str(), true); + init_options["ge.exec.profilingMode"] = "1"; + init_options["ge.exec.profilingOptions"] = "trace"; + init_options["ge.exec.precision_mode"] = "allow_fp32_to_fp16"; + init_options["ge.autoTuneMode"] = "GA"; + init_options["ge.opDebugLevel"] = "1"; + init_options["ge.jobType"] = "2"; + PluginInit(init_options); +} + TEST_F(GePluginTest, PluginFinalizeTest) { PluginFinalize(); } diff --git a/tf_adapter/util/ge_plugin.cc b/tf_adapter/util/ge_plugin.cc index 0743023ed..2523117c1 100644 --- a/tf_adapter/util/ge_plugin.cc +++ b/tf_adapter/util/ge_plugin.cc @@ -36,6 +36,8 @@ using namespace tdt; using namespace tensorflow; namespace { const int kFatalSleepTime = 3000; +const int64 kInvalidRankSize = -1; +const int64 kDefaultRankSize = 1; inline string ToString(ge::Status status) { return ::ge::StatusFactory::Instance()->GetErrDesc(status); } @@ -128,39 +130,21 @@ void GePlugin::Init(std::map &init_options, const bool LOG(WARNING) << "[GePlugin] can not find Environment variable : JOB_ID"; } - int64 rankSizeNum = 1; - (void) ReadInt64FromEnvVar("RANK_SIZE", 1, &rankSizeNum); - if (rankSizeNum > UINT32_MAX) { - rankSizeNum = UINT32_MAX; - ADP_LOG(WARNING) << "[GePlugin] RANK_SIZE is larger than UINT32_MAX, set to UINT32_MAX."; - LOG(WARNING) << "[GePlugin] RANK_SIZE is larger than UINT32_MAX, set to UINT32_MAX."; - } - - bool is_use_hcom = false; - bool deploy_mode = false; + std::string cm_chief_ip; + (void) ReadStringFromEnvVar("CM_CHIEF_IP", "", &cm_chief_ip); + (void) ReadInt64FromEnvVar("CM_WORKER_SIZE", kInvalidRankSize, &work_size_num); std::string env_rank_table_file; (void) ReadStringFromEnvVar("RANK_TABLE_FILE", "", &env_rank_table_file); - if (!env_rank_table_file.empty() && (rankSizeNum > 0)) { - ADP_LOG(INFO) << "[GePlugin] env RANK_TABLE_FILE:" << env_rank_table_file; - is_use_hcom = true; - init_options[ge::OPTION_EXEC_RANK_TABLE_FILE] = env_rank_table_file; - std::string env_pod_name; - (void) ReadStringFromEnvVar("POD_NAME", "", &env_pod_name); - if (!env_pod_name.empty()) { - deploy_mode = true; - init_options[ge::OPTION_EXEC_POD_NAME] = env_pod_name; - } else { - std::string env_rank_id; - (void) ReadStringFromEnvVar("RANK_ID", "", &env_rank_id); - if (!env_rank_id.empty()) { - ADP_LOG(INFO) << "[GePlugin] env RANK_ID:" << env_rank_id; - deploy_mode = false; - init_options[ge::OPTION_EXEC_RANK_ID] = env_rank_id; - } else { - ADP_LOG(ERROR) << "[GePlugin] Can't find rank_id or pod_name in env."; - LOG(ERROR) << "[GePlugin] Can't find rank_id or pod_name in env."; - } - } + (void) ReadInt64FromEnvVar("RANK_SIZE", kInvalidRankSize, &rank_size_num); + if (!cm_chief_ip.empty() && !env_rank_table_file.empty()) { + ADP_LOG(ERROR) << "[GePlugin] CM_CHIEF_IP and RANK_TABLE_FILE cannot be configured at the same time."; + LOG(ERROR) << "[GePlugin] CM_CHIEF_IP and RANK_TABLE_FILE cannot be configured at the same time."; + } else if (!cm_chief_ip.empty()) { + SetCmChiefWorkSizeEnv(init_options, cm_chief_ip); + } else if (!env_rank_table_file.empty()) { + SetRankTableFileEnv(init_options, env_rank_table_file); + } else { + // do nothing; } std::string cluster_info; @@ -266,6 +250,66 @@ void GePlugin::Init(std::map &init_options, const bool isGlobal_ = is_global; } +void GePlugin::SetRankTableFileEnv(std::map &init_options, std::string &rankTableFile) { + rank_size_num = (rank_size_num == kInvalidRankSize) ? kDefaultRankSize : rank_size_num; + if (rank_size_num > UINT32_MAX) { + rank_size_num = UINT32_MAX; + ADP_LOG(WARNING) << "[GePlugin] RANK_SIZE is larger than UINT32_MAX, set to UINT32_MAX."; + LOG(WARNING) << "[GePlugin] RANK_SIZE is larger than UINT32_MAX, set to UINT32_MAX."; + } + if (!rankTableFile.empty() && (rank_size_num > 0) && (work_size_num == kInvalidRankSize)) { + ADP_LOG(INFO) << "[GePlugin] env RANK_TABLE_FILE:" << rankTableFile; + is_use_hcom = true; + init_options[ge::OPTION_EXEC_RANK_TABLE_FILE] = rankTableFile; + std::string env_pod_name; + (void) ReadStringFromEnvVar("POD_NAME", "", &env_pod_name); + if (!env_pod_name.empty()) { + deploy_mode = true; + init_options[ge::OPTION_EXEC_POD_NAME] = env_pod_name; + } else { + std::string env_rank_id; + (void) ReadStringFromEnvVar("RANK_ID", "", &env_rank_id); + if (!env_rank_id.empty()) { + ADP_LOG(INFO) << "[GePlugin] env RANK_ID:" << env_rank_id; + deploy_mode = false; + init_options[ge::OPTION_EXEC_RANK_ID] = env_rank_id; + } else { + ADP_LOG(ERROR) << "[GePlugin] Can't find rank_id or pod_name in env."; + LOG(ERROR) << "[GePlugin] Can't find rank_id or pod_name in env."; + } + } + } +} + +void GePlugin::SetCmChiefWorkSizeEnv(std::map &init_options, std::string &cmChiefIp) { + std::string cm_chief_port; + (void) ReadStringFromEnvVar("CM_CHIEF_PORT", "", &cm_chief_port); + std::string cm_chief_device; + (void) ReadStringFromEnvVar("CM_CHIEF_DEVICE", "", &cm_chief_device); + std::string cm_worker_ip; + (void) ReadStringFromEnvVar("CM_WORKER_IP", "", &cm_worker_ip); + std::string cm_worker_size; + (void) ReadStringFromEnvVar("CM_WORKER_SIZE", "", &cm_worker_size); + work_size_num = (work_size_num == kInvalidRankSize) ? kDefaultRankSize : work_size_num; + if (work_size_num > UINT32_MAX) { + work_size_num = UINT32_MAX; + ADP_LOG(WARNING) << "[GePlugin] RANK_SIZE is larger than UINT32_MAX, set to UINT32_MAX."; + LOG(WARNING) << "[GePlugin] RANK_SIZE is larger than UINT32_MAX, set to UINT32_MAX."; + } + if (!cmChiefIp.empty() && !cm_chief_port.empty() && !cm_chief_device.empty() && (work_size_num > 0) && (rank_size_num == kInvalidRankSize)) { + is_use_hcom = true; + init_options_["ge.cmChiefIp"] = cmChiefIp; + init_options["ge.cmChiefPort"] = cm_chief_port; + init_options["ge.cmChiefWorkerDevice"] = cm_chief_device; + if (!cm_worker_ip.empty()) { + init_options["ge.cmWorkerIp"] = cm_worker_ip; + } + if (!cm_worker_size.empty()) { + init_options["ge.cmWorkerSize"] = cm_worker_size; + } + } +} + std::map GePlugin::GetInitOptions() { return init_options_; } @@ -312,15 +356,12 @@ bool GePlugin::IsGlobal() { } static CancellationManager g_cancellationManager; -Status RegisterNpuCancellationCallback(std::function callback, - std::function* deregister_fn) { +Status RegisterNpuCancellationCallback(std::function callback, std::function *deregister_fn) { CancellationToken token = g_cancellationManager.get_cancellation_token(); if (!g_cancellationManager.RegisterCallback(token, std::move(callback))) { return errors::Cancelled("Operation was cancelled"); } - *deregister_fn = [token]() { - g_cancellationManager.DeregisterCallback(token); - }; + *deregister_fn = [token]() { g_cancellationManager.DeregisterCallback(token); }; return Status::OK(); } @@ -353,8 +394,8 @@ void AoeFinalizeIfNeed() { return; } - (void)aoe_finalize(); - (void)mmDlclose(handle); + (void) aoe_finalize(); + (void) mmDlclose(handle); ADP_LOG(INFO) << "Finish to call aoe finalize when npu close."; } diff --git a/tf_adapter/util/ge_plugin.h b/tf_adapter/util/ge_plugin.h index 41f2a0375..ac25effbb 100644 --- a/tf_adapter/util/ge_plugin.h +++ b/tf_adapter/util/ge_plugin.h @@ -37,6 +37,10 @@ class GePlugin { std::map GetInitOptions(); + void SetRankTableFileEnv(std::map &init_options, std::string &rankTableFile); + + void SetCmChiefWorkSizeEnv(std::map &init_options, std::string &cmChiefIp); + private: GePlugin(); @@ -47,6 +51,10 @@ class GePlugin { uint32_t device_id_; bool isInit_; bool isGlobal_; + bool is_use_hcom = false; + bool deploy_mode = false; + tensorflow::int64 work_size_num; + tensorflow::int64 rank_size_num; std::map init_options_; std::mutex mutex_; static std::atomic_int graph_counter_; diff --git a/tf_adapter/util/npu_attrs.cc b/tf_adapter/util/npu_attrs.cc index 92060e810..90e2f01da 100644 --- a/tf_adapter/util/npu_attrs.cc +++ b/tf_adapter/util/npu_attrs.cc @@ -561,6 +561,8 @@ std::map NpuAttrs::GetInitOptions(const OpKernelConstr std::string model_deploy_devicelist; std::string dump_data = "tensor"; std::string aoe_config_file; + std::string stream_sync_timeout = "-1"; + std::string event_sync_timeout = "-1"; if (ctx != nullptr && ctx->GetAttr("_NpuOptimizer", &npuOptimizer) == Status::OK()) { (void) ctx->GetAttr("_precision_mode", &precision_mode); @@ -596,6 +598,8 @@ std::map NpuAttrs::GetInitOptions(const OpKernelConstr (void) ctx->GetAttr("_model_deploy_devicelist", &model_deploy_devicelist); (void) ctx->GetAttr("_dump_data", &dump_data); (void) ctx->GetAttr("_aoe_config_file", &aoe_config_file); + (void) ctx->GetAttr("_stream_sync_timeout", &stream_sync_timeout); + (void) ctx->GetAttr("_event_sync_timeout", &event_sync_timeout); } if (precision_mode.empty()) { @@ -643,6 +647,8 @@ std::map NpuAttrs::GetInitOptions(const OpKernelConstr init_options_["ge.exec.dumpData"] = dump_data; init_options_["aoe_config_file"] = aoe_config_file; init_options_["ge.aoe_config_file"] = aoe_config_file; + init_options_["stream_sync_timeout"] = stream_sync_timeout; + init_options_["event_sync_timeout"] = event_sync_timeout; return init_options_; } @@ -1010,6 +1016,8 @@ std::map NpuAttrs::GetAllAttrOptions(const AttrSlice & std::string insert_op_file; std::string resource_config_path; std::string aoe_config_file; + std::string stream_sync_timeout = "-1"; + std::string event_sync_timeout = "-1"; auto NpuOptimizer_value = attrs.Find("_NpuOptimizer"); auto enable_data_pre_proc_value = attrs.Find("_enable_data_pre_proc"); @@ -1086,6 +1094,8 @@ std::map NpuAttrs::GetAllAttrOptions(const AttrSlice & auto insert_op_file_value = attrs.Find("_insert_op_file"); auto resource_config_path_value = attrs.Find("_resource_config_path"); auto aoe_config_file_value = attrs.Find("_aoe_config_file"); + auto stream_sync_timeout_value = attrs.Find("_stream_sync_timeout"); + auto event_sync_timeout_value = attrs.Find("_event_sync_timeout"); if (NpuOptimizer_value != nullptr) { do_npu_optimizer = "1"; @@ -1338,6 +1348,12 @@ std::map NpuAttrs::GetAllAttrOptions(const AttrSlice & if (insert_op_file_value != nullptr) { insert_op_file = insert_op_file_value->s(); } + if (stream_sync_timeout_value != nullptr) { + stream_sync_timeout = stream_sync_timeout_value->s(); + } + if (event_sync_timeout_value != nullptr) { + event_sync_timeout = event_sync_timeout_value->s(); + } } all_options["variable_format_optimize"] = variable_format_optimize; @@ -1427,6 +1443,8 @@ std::map NpuAttrs::GetAllAttrOptions(const AttrSlice & all_options["resource_config_path"] = resource_config_path; all_options["ge.aoe_config_file"] = aoe_config_file; all_options["aoe_config_file"] = aoe_config_file; + all_options["stream_sync_timeout"] = stream_sync_timeout; + all_options["event_sync_timeout"] = event_sync_timeout; return all_options; } @@ -1523,6 +1541,8 @@ Status NpuAttrs::SetNpuOptimizerAttr(const GraphOptimizationPassOptions &options std::string model_deploy_devicelist; bool jit_compile = true; std::string aoe_config_file; + int32_t stream_sync_timeout = -1; + int32_t event_sync_timeout = -1; const RewriterConfig &rewrite_options = options.session_options->config.graph_options().rewrite_options(); @@ -1910,6 +1930,12 @@ Status NpuAttrs::SetNpuOptimizerAttr(const GraphOptimizationPassOptions &options if (params.count("aoe_config_file") > 0) { aoe_config_file = params.at("aoe_config_file").s(); } + if (params.count("stream_sync_timeout") > 0) { + stream_sync_timeout = params.at("stream_sync_timeout").i(); + } + if (params.count("event_sync_timeout") > 0) { + event_sync_timeout = params.at("event_sync_timeout").i(); + } } } @@ -2025,6 +2051,8 @@ Status NpuAttrs::SetNpuOptimizerAttr(const GraphOptimizationPassOptions &options init_options_["ge.exec.dumpData"] = dump_data; init_options_["aoe_config_file"] = aoe_config_file; init_options_["ge.aoe_config_file"] = aoe_config_file; + init_options_["stream_sync_timeout"] = std::to_string(stream_sync_timeout); + init_options_["event_sync_timeout"] = std::to_string(event_sync_timeout); pass_options["do_npu_optimizer"] = std::to_string(static_cast(do_npu_optimizer)); pass_options["enable_data_pre_proc"] = std::to_string(static_cast(enable_dp)); diff --git a/tf_adapter_2.x/npu_device/core/npu_wrapper.cpp b/tf_adapter_2.x/npu_device/core/npu_wrapper.cpp index bc97f8a6d..b650412dc 100644 --- a/tf_adapter_2.x/npu_device/core/npu_wrapper.cpp +++ b/tf_adapter_2.x/npu_device/core/npu_wrapper.cpp @@ -111,6 +111,8 @@ const std::map kConfigurableOptions = { {"dump_data", "ge.exec.dumpData"}, {"dump_layer", "ge.exec.dumpLayer"}, {"aoe_config_file", "ge.aoe_config_file"}, + {"stream_sync_timeout", "stream_sync_timeout"}, + {"event_sync_timeout", "event_sync_timeout"}, // private options {"_distribute.rank_id", ge::OPTION_EXEC_RANK_ID}, {"_distribute.rank_table", ge::OPTION_EXEC_RANK_TABLE_FILE}, diff --git a/tf_adapter_2.x/python/npu_device/configs/npu_config.py b/tf_adapter_2.x/python/npu_device/configs/npu_config.py index daa26cfdc..96d9887b1 100644 --- a/tf_adapter_2.x/python/npu_device/configs/npu_config.py +++ b/tf_adapter_2.x/python/npu_device/configs/npu_config.py @@ -58,6 +58,8 @@ class NpuConfig(NpuBaseConfig): self.topo_sorting_mode = OptionValue(None, [0, 1, None]) self.customize_dtypes = OptionValue(None, None) self.overflow_flag = OptionValue(1, [0, 1]) + self.stream_sync_timeout = OptionValue(-1, None) + self.event_sync_timeout = OptionValue(-1, None) # Configuration for experiment self.experimental = NpuExperimentalConfig() diff --git a/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py b/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py index a5b365d4f..f7ca85846 100644 --- a/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py +++ b/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py @@ -51,6 +51,14 @@ def broadcast_keras_model(model, root_rank=0): return model +def get_rank_size(): + if os.getenv("CM_WORK_SIZE") is not None and os.getenv("RANK_SIZE") is not None: + raise ValueError("RANK_SIZE and CM_WORK_SIZE cannot be configured at the same time") + rank_size = os.getenv('RANK_SIZE') if os.getenv( + "RANK_SIZE") is not None else os.getenv('CM_WORK_SIZE', '1') + return rank_size + + class NPUBroadcastGlobalVariablesCallback(keras.callbacks.Callback): """ Keras Callback that will broadcast all global variables from root rank @@ -75,7 +83,7 @@ class NPUBroadcastGlobalVariablesCallback(keras.callbacks.Callback): if self.broadcast_done: return - rank_size = os.getenv("RANK_SIZE", "1") + rank_size = get_rank_size() if int(rank_size) > 1: broadcast_helper(self.model.trainable_variables, self.root_rank) -- Gitee