diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py index 89e33519c95bb9e3dd4f99fd8a06ea6d2301f433..697945b5e75df70bd7605e6581410d1d340eb05b 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py @@ -22,6 +22,7 @@ from tensorflow.python.keras import backend from tensorflow.python.ops import state_ops from tensorflow.python.ops import control_flow_ops from npu_bridge.hccl import hccl_ops +from npu_bridge.estimator.npu import util as util_lib def broadcast_global_variables(root_rank): @@ -54,7 +55,7 @@ class BroadcastGlobalVariablesCallbackImpl: if self.broadcast_done: return - rank_size = os.getenv("RANK_SIZE", "1") + rank_size = util_lib.get_rank_size() if int(rank_size) > 1: bcast_op = broadcast_global_variables(self.root_rank) backend.get_session().run(bcast_op) diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py index de4e7c11ffffd158553e3501ec7f67c3ad3fc87f..d4fa751cd720ed88f0fad80833b6bfb4a0fd5f3e 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py @@ -37,6 +37,7 @@ from tensorflow.python.estimator import estimator as estimator_lib from tensorflow.python.estimator import model_fn as model_fn_lib from tensorflow.python.util import function_utils from tensorflow.python.util import tf_inspect +from npu_bridge.estimator.npu import util as util_lib from npu_bridge.estimator.npu import util from npu_bridge.estimator.npu.npu_config import NPURunConfig @@ -380,7 +381,7 @@ class NPUEstimator(estimator_lib.Estimator): raise RuntimeError('estimator_spec used by NPU train must have type ' '`NPUEstimatorSpec` or `EstimatorSpec`. Got {}'.format(type(estimator_spec))) # 1. NPUBroadcastGlobalVariablesHook - rank_size = os.getenv('RANK_SIZE') + rank_size = util_lib.get_rank_size() if rank_size is not None and rank_size.isdigit() and int(rank_size) > 1 and not config.horovod_mode: npu_hooks.append( NPUBroadcastGlobalVariablesHook(self.__device_info._root_rank, self.__device_info._index)) diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py index 4a6455a6ea642b6d922f04f99911fbd04e1de1ea..d60df35c83c96b61d71187efc28e8c61e0ba1765 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py @@ -28,6 +28,7 @@ from tensorflow.python.ops import summary_ops_v2 as contrib_summary from tensorflow.python.platform import tf_logging as logging from tensorflow.python.training import session_run_hook from tensorflow.python.training import basic_session_run_hooks +from npu_bridge.estimator.npu import util as util_lib from npu_bridge.estimator import npu_ops from npu_bridge.hccl import hccl_ops @@ -89,7 +90,7 @@ class NPUBroadcastGlobalVariablesHook(session_run_hook.SessionRunHook): self._root_rank = root_rank self._index = index self._bcast_op = None - rank_size = os.getenv('RANK_SIZE', "1") + rank_size = util_lib.get_rank_size() if rank_size.isdigit(): self._rank_size = int(rank_size) else: diff --git a/tf_adapter/python/npu_bridge/estimator/npu/util.py b/tf_adapter/python/npu_bridge/estimator/npu/util.py index 7fb62822ebb339ba2c3347770dffb4906f70d097..e22f5fa419a7a2a240365d81e0b28ea4ac0a5411 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/util.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/util.py @@ -232,6 +232,14 @@ def set_iteration_per_loop(sess, train_op, iterations_per_loop=1): return group_train_op +def get_rank_size(): + if os.getenv("CM_WORK_SIZE") is not None and os.getenv("RANK_SIZE") is not None: + raise ValueError("RANK_SIZE and CM_WORK_SIZE cannot be configured at the same time") + rank_size = os.getenv('RANK_SIZE') if os.getenv( + "RANK_SIZE") is not None else os.getenv('CM_WORK_SIZE', '1') + return rank_size + + class IterationPerLoop(): """ An object provide two API to create and set iterations_per_loop diff --git a/tf_adapter/tests/st/util/testcase/ge_plugin_test.cc b/tf_adapter/tests/st/util/testcase/ge_plugin_test.cc index a89b4de00e53c86be2a5d37ec16ccb49bc03fd39..0f59c68d5c43c02efb0f367a56058cdf5929e58f 100644 --- a/tf_adapter/tests/st/util/testcase/ge_plugin_test.cc +++ b/tf_adapter/tests/st/util/testcase/ge_plugin_test.cc @@ -30,6 +30,28 @@ TEST_F(GePluginTest, PluginInitTest) { PluginInit(init_options); } +TEST_F(GePluginTest, PluginInitTest_hccl) { + std::map init_options; + setenv("JOB_ID", "1000", true); + setenv("CM_WORK_SIZE", "1", true); + setenv("RANK_ID", "0", true); + setenv("POD_NAME", "0", true); + setenv("CM_CHIEF_IP", "11", true); + setenv("CM_CHIEF_PORT", "22", true); + setenv("CM_CHIEF_DEVICE", "8", true); + setenv("CM_WORKER_IP", "127.0.0.1", true); + setenv("FUSION_TENSOR_SIZE", "524288000", true); + std::string tf_config = "{'task':{'type':'a'}, 'cluster':{'chief':['1']}}"; + setenv("TF_CONFIG", tf_config.c_str(), true); + init_options["ge.exec.profilingMode"] = "1"; + init_options["ge.exec.profilingOptions"] = "trace"; + init_options["ge.exec.precision_mode"] = "allow_fp32_to_fp16"; + init_options["ge.autoTuneMode"] = "GA"; + init_options["ge.opDebugLevel"] = "1"; + init_options["ge.jobType"] = "2"; + PluginInit(init_options); +} + TEST_F(GePluginTest, PluginFinalizeTest) { PluginFinalize(); } @@ -112,6 +134,5 @@ TEST_F(GePluginTest, RdmaInitAndRegisterOKTest) { int32_t ret = RdmaInitAndRegister(var_info, size); EXPECT_EQ(ret, 0); } - } } // end tensorflow \ No newline at end of file diff --git a/tf_adapter/tests/ut/util/testcase/ge_plugin_test.cc b/tf_adapter/tests/ut/util/testcase/ge_plugin_test.cc index a89b4de00e53c86be2a5d37ec16ccb49bc03fd39..ff97636c2b6d638d308077b2fe2eb9688d71b72f 100644 --- a/tf_adapter/tests/ut/util/testcase/ge_plugin_test.cc +++ b/tf_adapter/tests/ut/util/testcase/ge_plugin_test.cc @@ -30,6 +30,52 @@ TEST_F(GePluginTest, PluginInitTest) { PluginInit(init_options); } +TEST_F(GePluginTest, PluginInitTest_fail) { + std::map init_options; + setenv("JOB_ID", "1000", true); + setenv("CM_WORK_SIZE", "1", true); + setenv("RANK_ID", "0", true); + setenv("POD_NAME", "0", true); + setenv("CM_CHIEF_IP", "11", true); + setenv("CM_CHIEF_PORT", "22", true); + setenv("CM_CHIEF_DEVICE", "8", true); + setenv("CM_WORKER_IP", "127.0.0.1", true); + setenv("FUSION_TENSOR_SIZE", "524288000", true); + std::string tf_config = "{'task':{'type':'a'}, 'cluster':{'chief':['1']}}"; + setenv("TF_CONFIG", tf_config.c_str(), true); + init_options["ge.exec.profilingMode"] = "1"; + init_options["ge.exec.profilingOptions"] = "trace"; + init_options["ge.exec.precision_mode"] = "allow_fp32_to_fp16"; + init_options["ge.autoTuneMode"] = "GA"; + init_options["ge.opDebugLevel"] = "1"; + init_options["ge.jobType"] = "2"; + PluginInit(init_options); +} + +TEST_F(GePluginTest, PluginInitTest_hccl) { + std::map init_options; + unsetenv("RANK_SIZE"); + unsetenv("RANK_TABLE_FILE"); + setenv("JOB_ID", "1000", true); + setenv("CM_WORK_SIZE", "1", true); + setenv("RANK_ID", "0", true); + setenv("POD_NAME", "0", true); + setenv("CM_CHIEF_IP", "11", true); + setenv("CM_CHIEF_PORT", "22", true); + setenv("CM_CHIEF_DEVICE", "8", true); + setenv("CM_WORKER_IP", "127.0.0.1", true); + setenv("FUSION_TENSOR_SIZE", "524288000", true); + std::string tf_config = "{'task':{'type':'a'}, 'cluster':{'chief':['1']}}"; + setenv("TF_CONFIG", tf_config.c_str(), true); + init_options["ge.exec.profilingMode"] = "1"; + init_options["ge.exec.profilingOptions"] = "trace"; + init_options["ge.exec.precision_mode"] = "allow_fp32_to_fp16"; + init_options["ge.autoTuneMode"] = "GA"; + init_options["ge.opDebugLevel"] = "1"; + init_options["ge.jobType"] = "2"; + PluginInit(init_options); +} + TEST_F(GePluginTest, PluginFinalizeTest) { PluginFinalize(); } diff --git a/tf_adapter/util/ge_plugin.cc b/tf_adapter/util/ge_plugin.cc index 0743023edf4b62d7494b90dac071f88a29473cb4..40a9a5eb9b2e0f7619c99f44d6a7db9bd9f96751 100644 --- a/tf_adapter/util/ge_plugin.cc +++ b/tf_adapter/util/ge_plugin.cc @@ -36,6 +36,7 @@ using namespace tdt; using namespace tensorflow; namespace { const int kFatalSleepTime = 3000; +const int64 kDefaultRankSize = -1LL; inline string ToString(ge::Status status) { return ::ge::StatusFactory::Instance()->GetErrDesc(status); } @@ -127,9 +128,10 @@ void GePlugin::Init(std::map &init_options, const bool ADP_LOG(WARNING) << "[GePlugin] can not find Environment variable : JOB_ID"; LOG(WARNING) << "[GePlugin] can not find Environment variable : JOB_ID"; } - - int64 rankSizeNum = 1; - (void) ReadInt64FromEnvVar("RANK_SIZE", 1, &rankSizeNum); + int64_t rankSizeNum = GetRankSizeNum(); + if (rankSizeNum == kDefaultRankSize) { + return; + } if (rankSizeNum > UINT32_MAX) { rankSizeNum = UINT32_MAX; ADP_LOG(WARNING) << "[GePlugin] RANK_SIZE is larger than UINT32_MAX, set to UINT32_MAX."; @@ -138,6 +140,12 @@ void GePlugin::Init(std::map &init_options, const bool bool is_use_hcom = false; bool deploy_mode = false; + std::string cm_chief_ip; + (void) ReadStringFromEnvVar("CM_CHIEF_IP", "", &cm_chief_ip); + std::string cm_chief_port; + (void) ReadStringFromEnvVar("CM_CHIEF_PORT", "", &cm_chief_port); + std::string cm_chief_device; + (void) ReadStringFromEnvVar("CM_CHIEF_DEVICE", "", &cm_chief_device); std::string env_rank_table_file; (void) ReadStringFromEnvVar("RANK_TABLE_FILE", "", &env_rank_table_file); if (!env_rank_table_file.empty() && (rankSizeNum > 0)) { @@ -161,6 +169,16 @@ void GePlugin::Init(std::map &init_options, const bool LOG(ERROR) << "[GePlugin] Can't find rank_id or pod_name in env."; } } + } else if (!cm_chief_ip.empty() && !cm_chief_port.empty() && !cm_chief_device.empty() && (rankSizeNum > 0)) { + ADP_LOG(INFO) << "[GePlugin] env CM_CHIEF_IP:" << cm_chief_ip; + ADP_LOG(INFO) << "[GePlugin] env CM_CHIEF_PORT:" << cm_chief_port; + ADP_LOG(INFO) << "[GePlugin] env CM_CHIEF_DEVICE:" << cm_chief_device; + is_use_hcom = true; + init_options["ge.cmChiefIp"] = cm_chief_ip; + init_options["ge.cmChiefPort"] = cm_chief_port; + init_options["ge.cmChiefWorkerDevice"] = cm_chief_device; + } else { + // do nothing } std::string cluster_info; @@ -172,6 +190,17 @@ void GePlugin::Init(std::map &init_options, const bool init_options[ge::OPTION_EXEC_IS_USEHCOM] = std::to_string(is_use_hcom); init_options[ge::OPTION_EXEC_DEPLOY_MODE] = std::to_string(deploy_mode); + std::string cm_worker_ip; + (void) ReadStringFromEnvVar("CM_WORKER_IP", "", &cm_worker_ip); + if (!cm_worker_ip.empty()) { + init_options["ge.cmWorkerIp"] = cm_worker_ip; + } + std::string cm_worker_size; + (void) ReadStringFromEnvVar("CM_WORKER_SIZE", "", &cm_worker_size); + if (!cm_worker_size.empty()) { + init_options["ge.cmWorkerSize"] = cm_worker_size; + } + // is use hcom configuration ADP_LOG(INFO) << "[GePlugin] is_usehcom : " << init_options[ge::OPTION_EXEC_IS_USEHCOM] << ", deploy_mode :" << init_options[ge::OPTION_EXEC_DEPLOY_MODE]; @@ -266,6 +295,22 @@ void GePlugin::Init(std::map &init_options, const bool isGlobal_ = is_global; } +int64_t GePlugin::GetRankSizeNum() { + int64 cmWorkSize; + (void) ReadInt64FromEnvVar("CM_WORK_SIZE", kDefaultRankSize, &cmWorkSize); + int64 rankSize; + (void) ReadInt64FromEnvVar("RANK_SIZE", kDefaultRankSize, &rankSize); + if ((cmWorkSize != kDefaultRankSize) && (rankSize != kDefaultRankSize)) { + ADP_LOG(ERROR) << "[GePlugin] RANK_SIZE and CM_WORK_SIZE cannot be configured at the same time."; + LOG(ERROR) << "[GePlugin] RANK_SIZE and CM_WORK_SIZE cannot be configured at the same time."; + return -1; + } else if ((cmWorkSize == kDefaultRankSize) && (rankSize == kDefaultRankSize)) { + return 1; // cann't get rank_size or cm_work_size from env, set to default 1 + } else { + return (rankSize != kDefaultRankSize) ? rankSize : cmWorkSize; + } +} + std::map GePlugin::GetInitOptions() { return init_options_; } diff --git a/tf_adapter/util/ge_plugin.h b/tf_adapter/util/ge_plugin.h index 41f2a03753a7facc090a3b74879665dd981a71a7..f59f9d31d320066c4d4c96bfa7f31094a7d55dc0 100644 --- a/tf_adapter/util/ge_plugin.h +++ b/tf_adapter/util/ge_plugin.h @@ -37,6 +37,8 @@ class GePlugin { std::map GetInitOptions(); + int64_t GetRankSizeNum(); + private: GePlugin(); diff --git a/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py b/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py index a5b365d4f30d38502cbbe27e549c9c9e085930b1..f7ca858463a392144ac0bdccd8a2a2896922b74d 100644 --- a/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py +++ b/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py @@ -51,6 +51,14 @@ def broadcast_keras_model(model, root_rank=0): return model +def get_rank_size(): + if os.getenv("CM_WORK_SIZE") is not None and os.getenv("RANK_SIZE") is not None: + raise ValueError("RANK_SIZE and CM_WORK_SIZE cannot be configured at the same time") + rank_size = os.getenv('RANK_SIZE') if os.getenv( + "RANK_SIZE") is not None else os.getenv('CM_WORK_SIZE', '1') + return rank_size + + class NPUBroadcastGlobalVariablesCallback(keras.callbacks.Callback): """ Keras Callback that will broadcast all global variables from root rank @@ -75,7 +83,7 @@ class NPUBroadcastGlobalVariablesCallback(keras.callbacks.Callback): if self.broadcast_done: return - rank_size = os.getenv("RANK_SIZE", "1") + rank_size = get_rank_size() if int(rank_size) > 1: broadcast_helper(self.model.trainable_variables, self.root_rank) diff --git a/tf_adapter_2.x/tests/stub/include/stub/defines.h b/tf_adapter_2.x/tests/stub/include/stub/defines.h index dbb14608743508d61a30c58b5e55e4a8a1d09344..1a8e4bcbf5655946dc255b6b84069deda6b6e7ca 100644 --- a/tf_adapter_2.x/tests/stub/include/stub/defines.h +++ b/tf_adapter_2.x/tests/stub/include/stub/defines.h @@ -20,7 +20,7 @@ const char *const OPTION_EXEC_JOB_ID = "ge.exec.jobId"; const char *const OPTION_EXEC_IS_USEHCOM = "ge.exec.isUseHcom"; const char *const OPTION_EXEC_IS_USEHVD = "ge.exec.isUseHvd"; const char *const OPTION_EXEC_RANK_ID = "ge.exec.rankId"; -const char *const OPTION_EXEC_POD_NAME = "ge.exec.podName"; +//const char *const OPTION_EXEC_POD_NAME = "ge.exec.podName"; const char *const OPTION_EXEC_DEPLOY_MODE = "ge.exec.deployMode"; const char *const OPTION_EXEC_RANK_TABLE_FILE = "ge.exec.rankTableFile"; const char *const GE_AICPU_FLAG = "ge.aicpuFlag";