diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py index 89e33519c95bb9e3dd4f99fd8a06ea6d2301f433..697945b5e75df70bd7605e6581410d1d340eb05b 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py @@ -22,6 +22,7 @@ from tensorflow.python.keras import backend from tensorflow.python.ops import state_ops from tensorflow.python.ops import control_flow_ops from npu_bridge.hccl import hccl_ops +from npu_bridge.estimator.npu import util as util_lib def broadcast_global_variables(root_rank): @@ -54,7 +55,7 @@ class BroadcastGlobalVariablesCallbackImpl: if self.broadcast_done: return - rank_size = os.getenv("RANK_SIZE", "1") + rank_size = util_lib.get_rank_size() if int(rank_size) > 1: bcast_op = broadcast_global_variables(self.root_rank) backend.get_session().run(bcast_op) diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py index de4e7c11ffffd158553e3501ec7f67c3ad3fc87f..d4fa751cd720ed88f0fad80833b6bfb4a0fd5f3e 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py @@ -37,6 +37,7 @@ from tensorflow.python.estimator import estimator as estimator_lib from tensorflow.python.estimator import model_fn as model_fn_lib from tensorflow.python.util import function_utils from tensorflow.python.util import tf_inspect +from npu_bridge.estimator.npu import util as util_lib from npu_bridge.estimator.npu import util from npu_bridge.estimator.npu.npu_config import NPURunConfig @@ -380,7 +381,7 @@ class NPUEstimator(estimator_lib.Estimator): raise RuntimeError('estimator_spec used by NPU train must have type ' '`NPUEstimatorSpec` or `EstimatorSpec`. Got {}'.format(type(estimator_spec))) # 1. NPUBroadcastGlobalVariablesHook - rank_size = os.getenv('RANK_SIZE') + rank_size = util_lib.get_rank_size() if rank_size is not None and rank_size.isdigit() and int(rank_size) > 1 and not config.horovod_mode: npu_hooks.append( NPUBroadcastGlobalVariablesHook(self.__device_info._root_rank, self.__device_info._index)) diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py index 4a6455a6ea642b6d922f04f99911fbd04e1de1ea..d60df35c83c96b61d71187efc28e8c61e0ba1765 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py @@ -28,6 +28,7 @@ from tensorflow.python.ops import summary_ops_v2 as contrib_summary from tensorflow.python.platform import tf_logging as logging from tensorflow.python.training import session_run_hook from tensorflow.python.training import basic_session_run_hooks +from npu_bridge.estimator.npu import util as util_lib from npu_bridge.estimator import npu_ops from npu_bridge.hccl import hccl_ops @@ -89,7 +90,7 @@ class NPUBroadcastGlobalVariablesHook(session_run_hook.SessionRunHook): self._root_rank = root_rank self._index = index self._bcast_op = None - rank_size = os.getenv('RANK_SIZE', "1") + rank_size = util_lib.get_rank_size() if rank_size.isdigit(): self._rank_size = int(rank_size) else: diff --git a/tf_adapter/python/npu_bridge/estimator/npu/util.py b/tf_adapter/python/npu_bridge/estimator/npu/util.py index 7fb62822ebb339ba2c3347770dffb4906f70d097..e22f5fa419a7a2a240365d81e0b28ea4ac0a5411 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/util.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/util.py @@ -232,6 +232,14 @@ def set_iteration_per_loop(sess, train_op, iterations_per_loop=1): return group_train_op +def get_rank_size(): + if os.getenv("CM_WORK_SIZE") is not None and os.getenv("RANK_SIZE") is not None: + raise ValueError("RANK_SIZE and CM_WORK_SIZE cannot be configured at the same time") + rank_size = os.getenv('RANK_SIZE') if os.getenv( + "RANK_SIZE") is not None else os.getenv('CM_WORK_SIZE', '1') + return rank_size + + class IterationPerLoop(): """ An object provide two API to create and set iterations_per_loop diff --git a/tf_adapter/tests/st/util/testcase/ge_plugin_test.cc b/tf_adapter/tests/st/util/testcase/ge_plugin_test.cc index a89b4de00e53c86be2a5d37ec16ccb49bc03fd39..754faf743f5a7cd95f592f1735e046b006c0b6d4 100644 --- a/tf_adapter/tests/st/util/testcase/ge_plugin_test.cc +++ b/tf_adapter/tests/st/util/testcase/ge_plugin_test.cc @@ -30,6 +30,52 @@ TEST_F(GePluginTest, PluginInitTest) { PluginInit(init_options); } +TEST_F(GePluginTest, PluginInitTest_fail) { + std::map init_options; + setenv("JOB_ID", "1000", true); + setenv("CM_WORK_SIZE", "1", true); + setenv("RANK_ID", "0", true); + setenv("POD_NAME", "0", true); + setenv("CM_CHIEF_IP", "11", true); + setenv("CM_CHIEF_PORT", "22", true); + setenv("CM_CHIEF_DEVICE", "8", true); + setenv("CM_WORKER_IP", "127.0.0.1", true); + setenv("FUSION_TENSOR_SIZE", "524288000", true); + std::string tf_config = "{'task':{'type':'a'}, 'cluster':{'chief':['1']}}"; + setenv("TF_CONFIG", tf_config.c_str(), true); + init_options["ge.exec.profilingMode"] = "1"; + init_options["ge.exec.profilingOptions"] = "trace"; + init_options["ge.exec.precision_mode"] = "allow_fp32_to_fp16"; + init_options["ge.autoTuneMode"] = "GA"; + init_options["ge.opDebugLevel"] = "1"; + init_options["ge.jobType"] = "2"; + PluginInit(init_options); +} + +TEST_F(GePluginTest, PluginInitTest_hccl) { + std::map init_options; + unsetenv("RANK_SIZE"); + unsetenv("RANK_TABLE_FILE"); + setenv("JOB_ID", "1000", true); + setenv("CM_WORK_SIZE", "1", true); + setenv("RANK_ID", "0", true); + setenv("POD_NAME", "0", true); + setenv("CM_CHIEF_IP", "11", true); + setenv("CM_CHIEF_PORT", "22", true); + setenv("CM_CHIEF_DEVICE", "8", true); + setenv("CM_WORKER_IP", "127.0.0.1", true); + setenv("FUSION_TENSOR_SIZE", "524288000", true); + std::string tf_config = "{'task':{'type':'a'}, 'cluster':{'chief':['1']}}"; + setenv("TF_CONFIG", tf_config.c_str(), true); + init_options["ge.exec.profilingMode"] = "1"; + init_options["ge.exec.profilingOptions"] = "trace"; + init_options["ge.exec.precision_mode"] = "allow_fp32_to_fp16"; + init_options["ge.autoTuneMode"] = "GA"; + init_options["ge.opDebugLevel"] = "1"; + init_options["ge.jobType"] = "2"; + PluginInit(init_options); +} + TEST_F(GePluginTest, PluginFinalizeTest) { PluginFinalize(); } @@ -112,6 +158,5 @@ TEST_F(GePluginTest, RdmaInitAndRegisterOKTest) { int32_t ret = RdmaInitAndRegister(var_info, size); EXPECT_EQ(ret, 0); } - } } // end tensorflow \ No newline at end of file diff --git a/tf_adapter/tests/ut/util/testcase/ge_plugin_test.cc b/tf_adapter/tests/ut/util/testcase/ge_plugin_test.cc index a89b4de00e53c86be2a5d37ec16ccb49bc03fd39..bb0bf3c4ee5e7079d7f5c8ba5f69520d4027df43 100644 --- a/tf_adapter/tests/ut/util/testcase/ge_plugin_test.cc +++ b/tf_adapter/tests/ut/util/testcase/ge_plugin_test.cc @@ -11,6 +11,24 @@ class GePluginTest : public testing::Test { virtual void TearDown() {} }; +TEST_F(GePluginTest, PluginInitTest_1) { + std::map init_options; + setenv("JOB_ID", "1000", true); + setenv("RANK_SIZE", "1", true); + setenv("RANK_ID", "0", true); + setenv("RANK_TABLE_FILE", "rank_table", true); + setenv("FUSION_TENSOR_SIZE", "524288000", true); + std::string tf_config = "{'task':{'type':'a'}, 'cluster':{'chief':['1']}}"; + setenv("TF_CONFIG", tf_config.c_str(), true); + init_options["ge.exec.profilingMode"] = "1"; + init_options["ge.exec.profilingOptions"] = "trace"; + init_options["ge.exec.precision_mode"] = "allow_fp32_to_fp16"; + init_options["ge.autoTuneMode"] = "GA"; + init_options["ge.opDebugLevel"] = "1"; + init_options["ge.jobType"] = "2"; + PluginInit(init_options); +} + TEST_F(GePluginTest, PluginInitTest) { std::map init_options; setenv("JOB_ID", "1000", true); @@ -30,6 +48,29 @@ TEST_F(GePluginTest, PluginInitTest) { PluginInit(init_options); } +TEST_F(GePluginTest, PluginInitTest_hccl) { + std::map init_options; + unsetenv("RANK_SIZE"); + unsetenv("RANK_TABLE_FILE"); + setenv("JOB_ID", "1000", true); + setenv("CM_WORKER_SIZE", "1", true); + setenv("RANK_ID", "0", true); + setenv("CM_CHIEF_IP", "11", true); + setenv("CM_CHIEF_PORT", "22", true); + setenv("CM_CHIEF_DEVICE", "8", true); + setenv("CM_WORKER_IP", "127.0.0.1", true); + setenv("FUSION_TENSOR_SIZE", "524288000", true); + std::string tf_config = "{'task':{'type':'a'}, 'cluster':{'chief':['1']}}"; + setenv("TF_CONFIG", tf_config.c_str(), true); + init_options["ge.exec.profilingMode"] = "1"; + init_options["ge.exec.profilingOptions"] = "trace"; + init_options["ge.exec.precision_mode"] = "allow_fp32_to_fp16"; + init_options["ge.autoTuneMode"] = "GA"; + init_options["ge.opDebugLevel"] = "1"; + init_options["ge.jobType"] = "2"; + PluginInit(init_options); +} + TEST_F(GePluginTest, PluginFinalizeTest) { PluginFinalize(); } diff --git a/tf_adapter/util/ge_plugin.cc b/tf_adapter/util/ge_plugin.cc index 0743023edf4b62d7494b90dac071f88a29473cb4..2523117c17cd99040836140527ca64d684e08889 100644 --- a/tf_adapter/util/ge_plugin.cc +++ b/tf_adapter/util/ge_plugin.cc @@ -36,6 +36,8 @@ using namespace tdt; using namespace tensorflow; namespace { const int kFatalSleepTime = 3000; +const int64 kInvalidRankSize = -1; +const int64 kDefaultRankSize = 1; inline string ToString(ge::Status status) { return ::ge::StatusFactory::Instance()->GetErrDesc(status); } @@ -128,39 +130,21 @@ void GePlugin::Init(std::map &init_options, const bool LOG(WARNING) << "[GePlugin] can not find Environment variable : JOB_ID"; } - int64 rankSizeNum = 1; - (void) ReadInt64FromEnvVar("RANK_SIZE", 1, &rankSizeNum); - if (rankSizeNum > UINT32_MAX) { - rankSizeNum = UINT32_MAX; - ADP_LOG(WARNING) << "[GePlugin] RANK_SIZE is larger than UINT32_MAX, set to UINT32_MAX."; - LOG(WARNING) << "[GePlugin] RANK_SIZE is larger than UINT32_MAX, set to UINT32_MAX."; - } - - bool is_use_hcom = false; - bool deploy_mode = false; + std::string cm_chief_ip; + (void) ReadStringFromEnvVar("CM_CHIEF_IP", "", &cm_chief_ip); + (void) ReadInt64FromEnvVar("CM_WORKER_SIZE", kInvalidRankSize, &work_size_num); std::string env_rank_table_file; (void) ReadStringFromEnvVar("RANK_TABLE_FILE", "", &env_rank_table_file); - if (!env_rank_table_file.empty() && (rankSizeNum > 0)) { - ADP_LOG(INFO) << "[GePlugin] env RANK_TABLE_FILE:" << env_rank_table_file; - is_use_hcom = true; - init_options[ge::OPTION_EXEC_RANK_TABLE_FILE] = env_rank_table_file; - std::string env_pod_name; - (void) ReadStringFromEnvVar("POD_NAME", "", &env_pod_name); - if (!env_pod_name.empty()) { - deploy_mode = true; - init_options[ge::OPTION_EXEC_POD_NAME] = env_pod_name; - } else { - std::string env_rank_id; - (void) ReadStringFromEnvVar("RANK_ID", "", &env_rank_id); - if (!env_rank_id.empty()) { - ADP_LOG(INFO) << "[GePlugin] env RANK_ID:" << env_rank_id; - deploy_mode = false; - init_options[ge::OPTION_EXEC_RANK_ID] = env_rank_id; - } else { - ADP_LOG(ERROR) << "[GePlugin] Can't find rank_id or pod_name in env."; - LOG(ERROR) << "[GePlugin] Can't find rank_id or pod_name in env."; - } - } + (void) ReadInt64FromEnvVar("RANK_SIZE", kInvalidRankSize, &rank_size_num); + if (!cm_chief_ip.empty() && !env_rank_table_file.empty()) { + ADP_LOG(ERROR) << "[GePlugin] CM_CHIEF_IP and RANK_TABLE_FILE cannot be configured at the same time."; + LOG(ERROR) << "[GePlugin] CM_CHIEF_IP and RANK_TABLE_FILE cannot be configured at the same time."; + } else if (!cm_chief_ip.empty()) { + SetCmChiefWorkSizeEnv(init_options, cm_chief_ip); + } else if (!env_rank_table_file.empty()) { + SetRankTableFileEnv(init_options, env_rank_table_file); + } else { + // do nothing; } std::string cluster_info; @@ -266,6 +250,66 @@ void GePlugin::Init(std::map &init_options, const bool isGlobal_ = is_global; } +void GePlugin::SetRankTableFileEnv(std::map &init_options, std::string &rankTableFile) { + rank_size_num = (rank_size_num == kInvalidRankSize) ? kDefaultRankSize : rank_size_num; + if (rank_size_num > UINT32_MAX) { + rank_size_num = UINT32_MAX; + ADP_LOG(WARNING) << "[GePlugin] RANK_SIZE is larger than UINT32_MAX, set to UINT32_MAX."; + LOG(WARNING) << "[GePlugin] RANK_SIZE is larger than UINT32_MAX, set to UINT32_MAX."; + } + if (!rankTableFile.empty() && (rank_size_num > 0) && (work_size_num == kInvalidRankSize)) { + ADP_LOG(INFO) << "[GePlugin] env RANK_TABLE_FILE:" << rankTableFile; + is_use_hcom = true; + init_options[ge::OPTION_EXEC_RANK_TABLE_FILE] = rankTableFile; + std::string env_pod_name; + (void) ReadStringFromEnvVar("POD_NAME", "", &env_pod_name); + if (!env_pod_name.empty()) { + deploy_mode = true; + init_options[ge::OPTION_EXEC_POD_NAME] = env_pod_name; + } else { + std::string env_rank_id; + (void) ReadStringFromEnvVar("RANK_ID", "", &env_rank_id); + if (!env_rank_id.empty()) { + ADP_LOG(INFO) << "[GePlugin] env RANK_ID:" << env_rank_id; + deploy_mode = false; + init_options[ge::OPTION_EXEC_RANK_ID] = env_rank_id; + } else { + ADP_LOG(ERROR) << "[GePlugin] Can't find rank_id or pod_name in env."; + LOG(ERROR) << "[GePlugin] Can't find rank_id or pod_name in env."; + } + } + } +} + +void GePlugin::SetCmChiefWorkSizeEnv(std::map &init_options, std::string &cmChiefIp) { + std::string cm_chief_port; + (void) ReadStringFromEnvVar("CM_CHIEF_PORT", "", &cm_chief_port); + std::string cm_chief_device; + (void) ReadStringFromEnvVar("CM_CHIEF_DEVICE", "", &cm_chief_device); + std::string cm_worker_ip; + (void) ReadStringFromEnvVar("CM_WORKER_IP", "", &cm_worker_ip); + std::string cm_worker_size; + (void) ReadStringFromEnvVar("CM_WORKER_SIZE", "", &cm_worker_size); + work_size_num = (work_size_num == kInvalidRankSize) ? kDefaultRankSize : work_size_num; + if (work_size_num > UINT32_MAX) { + work_size_num = UINT32_MAX; + ADP_LOG(WARNING) << "[GePlugin] RANK_SIZE is larger than UINT32_MAX, set to UINT32_MAX."; + LOG(WARNING) << "[GePlugin] RANK_SIZE is larger than UINT32_MAX, set to UINT32_MAX."; + } + if (!cmChiefIp.empty() && !cm_chief_port.empty() && !cm_chief_device.empty() && (work_size_num > 0) && (rank_size_num == kInvalidRankSize)) { + is_use_hcom = true; + init_options_["ge.cmChiefIp"] = cmChiefIp; + init_options["ge.cmChiefPort"] = cm_chief_port; + init_options["ge.cmChiefWorkerDevice"] = cm_chief_device; + if (!cm_worker_ip.empty()) { + init_options["ge.cmWorkerIp"] = cm_worker_ip; + } + if (!cm_worker_size.empty()) { + init_options["ge.cmWorkerSize"] = cm_worker_size; + } + } +} + std::map GePlugin::GetInitOptions() { return init_options_; } @@ -312,15 +356,12 @@ bool GePlugin::IsGlobal() { } static CancellationManager g_cancellationManager; -Status RegisterNpuCancellationCallback(std::function callback, - std::function* deregister_fn) { +Status RegisterNpuCancellationCallback(std::function callback, std::function *deregister_fn) { CancellationToken token = g_cancellationManager.get_cancellation_token(); if (!g_cancellationManager.RegisterCallback(token, std::move(callback))) { return errors::Cancelled("Operation was cancelled"); } - *deregister_fn = [token]() { - g_cancellationManager.DeregisterCallback(token); - }; + *deregister_fn = [token]() { g_cancellationManager.DeregisterCallback(token); }; return Status::OK(); } @@ -353,8 +394,8 @@ void AoeFinalizeIfNeed() { return; } - (void)aoe_finalize(); - (void)mmDlclose(handle); + (void) aoe_finalize(); + (void) mmDlclose(handle); ADP_LOG(INFO) << "Finish to call aoe finalize when npu close."; } diff --git a/tf_adapter/util/ge_plugin.h b/tf_adapter/util/ge_plugin.h index 41f2a03753a7facc090a3b74879665dd981a71a7..ac25effbb09f5b541247bf654f0a11508c8443d3 100644 --- a/tf_adapter/util/ge_plugin.h +++ b/tf_adapter/util/ge_plugin.h @@ -37,6 +37,10 @@ class GePlugin { std::map GetInitOptions(); + void SetRankTableFileEnv(std::map &init_options, std::string &rankTableFile); + + void SetCmChiefWorkSizeEnv(std::map &init_options, std::string &cmChiefIp); + private: GePlugin(); @@ -47,6 +51,10 @@ class GePlugin { uint32_t device_id_; bool isInit_; bool isGlobal_; + bool is_use_hcom = false; + bool deploy_mode = false; + tensorflow::int64 work_size_num; + tensorflow::int64 rank_size_num; std::map init_options_; std::mutex mutex_; static std::atomic_int graph_counter_; diff --git a/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py b/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py index a5b365d4f30d38502cbbe27e549c9c9e085930b1..f7ca858463a392144ac0bdccd8a2a2896922b74d 100644 --- a/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py +++ b/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py @@ -51,6 +51,14 @@ def broadcast_keras_model(model, root_rank=0): return model +def get_rank_size(): + if os.getenv("CM_WORK_SIZE") is not None and os.getenv("RANK_SIZE") is not None: + raise ValueError("RANK_SIZE and CM_WORK_SIZE cannot be configured at the same time") + rank_size = os.getenv('RANK_SIZE') if os.getenv( + "RANK_SIZE") is not None else os.getenv('CM_WORK_SIZE', '1') + return rank_size + + class NPUBroadcastGlobalVariablesCallback(keras.callbacks.Callback): """ Keras Callback that will broadcast all global variables from root rank @@ -75,7 +83,7 @@ class NPUBroadcastGlobalVariablesCallback(keras.callbacks.Callback): if self.broadcast_done: return - rank_size = os.getenv("RANK_SIZE", "1") + rank_size = get_rank_size() if int(rank_size) > 1: broadcast_helper(self.model.trainable_variables, self.root_rank)