From 48320367b0f5e381ccaa847844061cec1b1c86a7 Mon Sep 17 00:00:00 2001 From: CLAY-panjw <1330286576@qq.com> Date: Wed, 9 Nov 2022 14:26:31 +0800 Subject: [PATCH] hccl env --- .../inc/external/ge/ge_api_types.h | 6 +++ .../npu_bridge/estimator/npu/npu_callbacks.py | 3 +- .../npu_bridge/estimator/npu/npu_estimator.py | 3 +- .../npu_bridge/estimator/npu/npu_hook.py | 3 +- .../python/npu_bridge/estimator/npu/util.py | 8 +++ tf_adapter/util/ge_plugin.cc | 51 +++++++++++++++++-- tf_adapter/util/ge_plugin.h | 2 + .../npu_device/distribute/npu_callbacks.py | 9 +++- 8 files changed, 78 insertions(+), 7 deletions(-) diff --git a/inc/graphengine/inc/external/ge/ge_api_types.h b/inc/graphengine/inc/external/ge/ge_api_types.h index d6bd3c457..cefdae0ce 100644 --- a/inc/graphengine/inc/external/ge/ge_api_types.h +++ b/inc/graphengine/inc/external/ge/ge_api_types.h @@ -75,6 +75,12 @@ const char *const OPTION_EXEC_LOGICAL_DEVICE_ID = "ge.exec.logicalDeviceId"; const char *const OPTION_EXEC_MODEL_DEPLOY_MODE = "ge.exec.modelDeployMode"; const char *const OPTION_EXEC_MODEL_DEPLOY_DEVICELIST = "ge.exec.modelDeployDevicelist"; +const char *const OPTION_EXEC_CM_CHIEF_IP = "ge.cmChiefIp"; +const char *const OPTION_EXEC_CM_CHIEF_PORT = "ge.cmChiefPort"; +const char *const OPTION_EXEC_CM_CHIEF_DEVICE = "ge.cmChiefWorkerDevice"; +const char *const OPTION_EXEC_CM_WORKER_IP = "ge.cmWorkerIp"; +const char *const OPTION_EXEC_CM_WORKER_SIZE = "ge.cmWorkerSize"; + // Option key: memory init const char *const GRAPH_MEMORY_MAX_SIZE = "ge.graphMemoryMaxSize"; const char *const VARIABLE_MEMORY_MAX_SIZE = "ge.variableMemoryMaxSize"; diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py index 89e33519c..697945b5e 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_callbacks.py @@ -22,6 +22,7 @@ from tensorflow.python.keras import backend from tensorflow.python.ops import state_ops from tensorflow.python.ops import control_flow_ops from npu_bridge.hccl import hccl_ops +from npu_bridge.estimator.npu import util as util_lib def broadcast_global_variables(root_rank): @@ -54,7 +55,7 @@ class BroadcastGlobalVariablesCallbackImpl: if self.broadcast_done: return - rank_size = os.getenv("RANK_SIZE", "1") + rank_size = util_lib.get_rank_size() if int(rank_size) > 1: bcast_op = broadcast_global_variables(self.root_rank) backend.get_session().run(bcast_op) diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py index de4e7c11f..d4fa751cd 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_estimator.py @@ -37,6 +37,7 @@ from tensorflow.python.estimator import estimator as estimator_lib from tensorflow.python.estimator import model_fn as model_fn_lib from tensorflow.python.util import function_utils from tensorflow.python.util import tf_inspect +from npu_bridge.estimator.npu import util as util_lib from npu_bridge.estimator.npu import util from npu_bridge.estimator.npu.npu_config import NPURunConfig @@ -380,7 +381,7 @@ class NPUEstimator(estimator_lib.Estimator): raise RuntimeError('estimator_spec used by NPU train must have type ' '`NPUEstimatorSpec` or `EstimatorSpec`. Got {}'.format(type(estimator_spec))) # 1. NPUBroadcastGlobalVariablesHook - rank_size = os.getenv('RANK_SIZE') + rank_size = util_lib.get_rank_size() if rank_size is not None and rank_size.isdigit() and int(rank_size) > 1 and not config.horovod_mode: npu_hooks.append( NPUBroadcastGlobalVariablesHook(self.__device_info._root_rank, self.__device_info._index)) diff --git a/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py b/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py index 4a6455a6e..d60df35c8 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/npu_hook.py @@ -28,6 +28,7 @@ from tensorflow.python.ops import summary_ops_v2 as contrib_summary from tensorflow.python.platform import tf_logging as logging from tensorflow.python.training import session_run_hook from tensorflow.python.training import basic_session_run_hooks +from npu_bridge.estimator.npu import util as util_lib from npu_bridge.estimator import npu_ops from npu_bridge.hccl import hccl_ops @@ -89,7 +90,7 @@ class NPUBroadcastGlobalVariablesHook(session_run_hook.SessionRunHook): self._root_rank = root_rank self._index = index self._bcast_op = None - rank_size = os.getenv('RANK_SIZE', "1") + rank_size = util_lib.get_rank_size() if rank_size.isdigit(): self._rank_size = int(rank_size) else: diff --git a/tf_adapter/python/npu_bridge/estimator/npu/util.py b/tf_adapter/python/npu_bridge/estimator/npu/util.py index 7fb62822e..e22f5fa41 100644 --- a/tf_adapter/python/npu_bridge/estimator/npu/util.py +++ b/tf_adapter/python/npu_bridge/estimator/npu/util.py @@ -232,6 +232,14 @@ def set_iteration_per_loop(sess, train_op, iterations_per_loop=1): return group_train_op +def get_rank_size(): + if os.getenv("CM_WORK_SIZE") is not None and os.getenv("RANK_SIZE") is not None: + raise ValueError("RANK_SIZE and CM_WORK_SIZE cannot be configured at the same time") + rank_size = os.getenv('RANK_SIZE') if os.getenv( + "RANK_SIZE") is not None else os.getenv('CM_WORK_SIZE', '1') + return rank_size + + class IterationPerLoop(): """ An object provide two API to create and set iterations_per_loop diff --git a/tf_adapter/util/ge_plugin.cc b/tf_adapter/util/ge_plugin.cc index 0743023ed..4afbf4c1c 100644 --- a/tf_adapter/util/ge_plugin.cc +++ b/tf_adapter/util/ge_plugin.cc @@ -36,6 +36,7 @@ using namespace tdt; using namespace tensorflow; namespace { const int kFatalSleepTime = 3000; +const int64 kDefaultRankSize = -1LL; inline string ToString(ge::Status status) { return ::ge::StatusFactory::Instance()->GetErrDesc(status); } @@ -127,9 +128,10 @@ void GePlugin::Init(std::map &init_options, const bool ADP_LOG(WARNING) << "[GePlugin] can not find Environment variable : JOB_ID"; LOG(WARNING) << "[GePlugin] can not find Environment variable : JOB_ID"; } - - int64 rankSizeNum = 1; - (void) ReadInt64FromEnvVar("RANK_SIZE", 1, &rankSizeNum); + int64_t rankSizeNum = GetRankSizeNum(); + if (rankSizeNum == kDefaultRankSize) { + return; + } if (rankSizeNum > UINT32_MAX) { rankSizeNum = UINT32_MAX; ADP_LOG(WARNING) << "[GePlugin] RANK_SIZE is larger than UINT32_MAX, set to UINT32_MAX."; @@ -138,6 +140,12 @@ void GePlugin::Init(std::map &init_options, const bool bool is_use_hcom = false; bool deploy_mode = false; + std::string cm_chief_ip; + (void) ReadStringFromEnvVar("CM_CHIEF_IP", "", &cm_chief_ip); + std::string cm_chief_port; + (void) ReadStringFromEnvVar("CM_CHIEF_PORT", "", &cm_chief_port); + std::string cm_chief_device; + (void) ReadStringFromEnvVar("CM_CHIEF_DEVICE", "", &cm_chief_device); std::string env_rank_table_file; (void) ReadStringFromEnvVar("RANK_TABLE_FILE", "", &env_rank_table_file); if (!env_rank_table_file.empty() && (rankSizeNum > 0)) { @@ -161,6 +169,16 @@ void GePlugin::Init(std::map &init_options, const bool LOG(ERROR) << "[GePlugin] Can't find rank_id or pod_name in env."; } } + } else if (!cm_chief_ip.empty() && !cm_chief_port.empty() && !cm_chief_device.empty() && (rankSizeNum > 0)) { + ADP_LOG(INFO) << "[GePlugin] env CM_CHIEF_IP:" << cm_chief_ip; + ADP_LOG(INFO) << "[GePlugin] env CM_CHIEF_PORT:" << cm_chief_port; + ADP_LOG(INFO) << "[GePlugin] env CM_CHIEF_DEVICE:" << cm_chief_device; + is_use_hcom = true; + init_options[ge::OPTION_EXEC_CM_CHIEF_IP] = cm_chief_ip; + init_options[ge::OPTION_EXEC_CM_CHIEF_PORT] = cm_chief_port; + init_options[ge::OPTION_EXEC_CM_CHIEF_DEVICE] = cm_chief_device; + } else { + // do nothing } std::string cluster_info; @@ -172,6 +190,17 @@ void GePlugin::Init(std::map &init_options, const bool init_options[ge::OPTION_EXEC_IS_USEHCOM] = std::to_string(is_use_hcom); init_options[ge::OPTION_EXEC_DEPLOY_MODE] = std::to_string(deploy_mode); + std::string cm_worker_ip; + (void) ReadStringFromEnvVar("CM_WORKER_IP", "", &cm_worker_ip); + if (!cm_worker_ip.empty()) { + init_options[ge::OPTION_EXEC_CM_WORKER_IP] = cm_worker_ip; + } + std::string cm_worker_size; + (void) ReadStringFromEnvVar("CM_WORKER_SIZE", "", &cm_worker_size); + if (!cm_worker_size.empty()) { + init_options[ge::OPTION_EXEC_CM_WORKER_SIZE] = cm_worker_size; + } + // is use hcom configuration ADP_LOG(INFO) << "[GePlugin] is_usehcom : " << init_options[ge::OPTION_EXEC_IS_USEHCOM] << ", deploy_mode :" << init_options[ge::OPTION_EXEC_DEPLOY_MODE]; @@ -266,6 +295,22 @@ void GePlugin::Init(std::map &init_options, const bool isGlobal_ = is_global; } +int64_t GePlugin::GetRankSizeNum() { + int64 cmWorkSize; + (void) ReadInt64FromEnvVar("CM_WORK_SIZE", kDefaultRankSize, &cmWorkSize); + int64 rankSize; + (void) ReadInt64FromEnvVar("RANK_SIZE", kDefaultRankSize, &rankSize); + if ((cmWorkSize != kDefaultRankSize) && (rankSize != kDefaultRankSize)) { + ADP_LOG(ERROR) << "[GePlugin] RANK_SIZE and CM_WORK_SIZE cannot be configured at the same time."; + LOG(ERROR) << "[GePlugin] RANK_SIZE and CM_WORK_SIZE cannot be configured at the same time."; + return -1; + } else if ((cmWorkSize == kDefaultRankSize) && (rankSize == kDefaultRankSize)) { + return 1; // cann't get rank_size or cm_work_size from env, set to default 1 + } else { + return (rankSize != kDefaultRankSize) ? rankSize : cmWorkSize; + } +} + std::map GePlugin::GetInitOptions() { return init_options_; } diff --git a/tf_adapter/util/ge_plugin.h b/tf_adapter/util/ge_plugin.h index 41f2a0375..f59f9d31d 100644 --- a/tf_adapter/util/ge_plugin.h +++ b/tf_adapter/util/ge_plugin.h @@ -37,6 +37,8 @@ class GePlugin { std::map GetInitOptions(); + int64_t GetRankSizeNum(); + private: GePlugin(); diff --git a/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py b/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py index a5b365d4f..516a2cdb1 100644 --- a/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py +++ b/tf_adapter_2.x/python/npu_device/distribute/npu_callbacks.py @@ -50,6 +50,13 @@ def broadcast_keras_model(model, root_rank=0): broadcast_helper(model.trainable_variables, root_rank) return model +def get_rank_size(): + if os.getenv("CM_WORK_SIZE") is not None and os.getenv("RANK_SIZE") is not None: + raise ValueError("RANK_SIZE and CM_WORK_SIZE cannot be configured at the same time") + rank_size = os.getenv('RANK_SIZE') if os.getenv( + "RANK_SIZE") is not None else os.getenv('CM_WORK_SIZE', '1') + return rank_size + class NPUBroadcastGlobalVariablesCallback(keras.callbacks.Callback): """ @@ -75,7 +82,7 @@ class NPUBroadcastGlobalVariablesCallback(keras.callbacks.Callback): if self.broadcast_done: return - rank_size = os.getenv("RANK_SIZE", "1") + rank_size = get_rank_size() if int(rank_size) > 1: broadcast_helper(self.model.trainable_variables, self.root_rank) -- Gitee