diff --git a/docs/mindspore/source_zh_cn/model_train/parallel/data_parallel.md b/docs/mindspore/source_zh_cn/model_train/parallel/data_parallel.md index 3695eb358a9bd5b612efb9f58537a4f7412bc692..ff2fe2ba8290a8c14c2242afb24428a2e54a3d8f 100644 --- a/docs/mindspore/source_zh_cn/model_train/parallel/data_parallel.md +++ b/docs/mindspore/source_zh_cn/model_train/parallel/data_parallel.md @@ -36,178 +36,3 @@ 5. 参数更新(Parameter update) 因为引入了梯度聚合操作,所以各卡的模型会以相同的梯度值一起进入参数更新步骤。 - -## 操作实践 - -下面以Ascend或者GPU单机8卡为例,进行数据并行操作说明: - -### 样例代码说明 - -> 您可以在这里下载完整的样例代码: -> -> 。 - -目录结构如下: - -```text -└─ sample_code - ├─ distributed_data_parallel - ├── distributed_data_parallel.py - └── run.sh - ... -``` - -其中,`distributed_data_parallel.py`是定义网络结构和训练过程的脚本。`run.sh`是执行脚本。 - -### 配置分布式环境 - -通过context接口可以指定运行模式、运行设备、运行卡号等。与单卡脚本不同,并行脚本还需指定并行模式`parallel_mode`为数据并行模式,并通过init根据不同的设备需求初始化HCCL、NCCL或者MCCL 通信。在数据并行模式还可以设置`gradients_mean`指定梯度聚合方式。此处未设置`device_target`,会自动指定为MindSpore包对应的后端硬件设备。 - -```python -import mindspore as ms -from mindspore.communication import init - -ms.set_context(mode=ms.GRAPH_MODE) -ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.DATA_PARALLEL, gradients_mean=True) -init() -ms.set_seed(1) -``` - -其中,`gradients_mean=True`是为了在反向计算时,框架内部会将数据并行参数分散在多台机器的梯度值进行聚合,得到全局梯度值后再传入优化器中更新。首先通过AllReduce(op=ReduceOp.SUM)对梯度做规约求和,接着根据gradients_mean的值来判断是否求均值(设置为True则求均值,否则不求,默认为False)。 - -### 数据并行模式加载数据集 - -数据并行模式跟其他模式最大区别在于数据加载方式的不同,数据是以并行的方式导入的。下面我们以MNIST数据集为例,介绍以数据并行方式导入MNIST数据集的方法,`dataset_path`是指数据集的路径。 - -```python -import mindspore.dataset as ds -from mindspore.communication import get_rank, get_group_size - -rank_id = get_rank() -rank_size = get_group_size() -dataset = ds.MnistDataset(dataset_path, num_shards=rank_size, shard_id=rank_id) -``` - -其中,与单卡不同的是,在数据集接口需要传入`num_shards`和`shard_id`参数,分别对应卡的数量和逻辑序号,建议通过`mindspore.communication`接口获取: - -- `get_rank`:获取当前设备在集群中的ID。 -- `get_group_size`:获取集群数量。 - -> 数据并行场景加载数据集时,建议对每卡指定相同的数据集文件,若是各卡加载的数据集不同,可能会影响计算精度。 - -完整的数据处理代码: - -```python -import os -import mindspore.dataset as ds -from mindspore.communication import get_rank, get_group_size - -def create_dataset(batch_size): - dataset_path = os.getenv("DATA_PATH") - rank_id = get_rank() - rank_size = get_group_size() - dataset = ds.MnistDataset(dataset_path, num_shards=rank_size, shard_id=rank_id) - image_transforms = [ - ds.vision.Rescale(1.0 / 255.0, 0), - ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)), - ds.vision.HWC2CHW() - ] - label_transform = ds.transforms.TypeCast(ms.int32) - dataset = dataset.map(image_transforms, 'image') - dataset = dataset.map(label_transform, 'label') - dataset = dataset.batch(batch_size) - return dataset - -data_set = create_dataset(32) -``` - -### 定义网络 - -数据并行模式下,网络定义方式与单卡网络写法一致,网络的主要结构如下: - -```python -from mindspore import nn - -class Network(nn.Cell): - def __init__(self): - super().__init__() - self.flatten = nn.Flatten() - self.dense_relu_sequential = nn.SequentialCell( - nn.Dense(28*28, 512, weight_init="normal", bias_init="zeros"), - nn.ReLU(), - nn.Dense(512, 512, weight_init="normal", bias_init="zeros"), - nn.ReLU(), - nn.Dense(512, 10, weight_init="normal", bias_init="zeros") - ) - - def construct(self, x): - x = self.flatten(x) - logits = self.dense_relu_sequential(x) - return logits - -net = Network() -``` - -### 训练网络 - -在这一步,我们需要定义损失函数、优化器以及训练过程。与单卡模型不同的地方在于,数据并行模式还需要增加`mindspore.nn.DistributedGradReducer()`接口,来对所有卡的梯度进行聚合,该接口第一个参数为需要更新的网络参数: - -```python -from mindspore import nn -import mindspore as ms - -loss_fn = nn.CrossEntropyLoss() -optimizer = nn.SGD(net.trainable_params(), 1e-2) - -def forward_fn(data, label): - logits = net(data) - loss = loss_fn(logits, label) - return loss, logits - -grad_fn = ms.value_and_grad(forward_fn, None, net.trainable_params(), has_aux=True) -grad_reducer = nn.DistributedGradReducer(optimizer.parameters) - -for epoch in range(10): - i = 0 - for data, label in data_set: - (loss, _), grads = grad_fn(data, label) - grads = grad_reducer(grads) - optimizer(grads) - if i % 10 == 0: - print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss)) - i += 1 -``` - -> 此处也可以用[Model.train](https://www.mindspore.cn/docs/zh-CN/master/api_python/train/mindspore.train.Model.html#mindspore.train.Model.train)的方式进行训练。 - -### 运行单机8卡脚本 - -接下来通过命令调用对应的脚本,以8卡的分布式训练脚本为例,使用`mpirun`启动方式进行分布式训练: - -```bash -bash run.sh -``` - -训练完后,日志文件保存到`log_output`目录下,其中部分文件目录结构如下: - -```text -└─ log_output - └─ 1 - ├─ rank.0 - | └─ stdout - ├─ rank.1 - | └─ stdout -... -``` - -关于Loss部分结果保存在`log_output/1/rank.*/stdout`中,示例如下: - -```text -epoch: 0 step: 0, loss is 2.3084016 -epoch: 0 step: 10, loss is 2.3107638 -epoch: 0 step: 20, loss is 2.2864391 -epoch: 0 step: 30, loss is 2.2938071 -... -``` - -其他启动方式如动态组网、`rank table`的启动可参考[启动方式](https://www.mindspore.cn/docs/zh-CN/master/model_train/parallel/startup_method.html)。 diff --git a/docs/mindspore/source_zh_cn/model_train/parallel/distributed_case.rst b/docs/mindspore/source_zh_cn/model_train/parallel/distributed_case.rst deleted file mode 100644 index baa0d4a5cee90c80e3b2bdbebfc659f74746095a..0000000000000000000000000000000000000000 --- a/docs/mindspore/source_zh_cn/model_train/parallel/distributed_case.rst +++ /dev/null @@ -1,13 +0,0 @@ -分布式高阶配置案例 -======================== - -.. image:: https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/master/resource/_static/logo_source.svg - :target: https://gitee.com/mindspore/docs/blob/master/docs/mindspore/source_zh_cn/model_train/parallel/distributed_case.rst - :alt: 查看源文件 - -.. toctree:: - :maxdepth: 1 - - pangu_alpha - multiple_mix - ms_operator diff --git a/docs/mindspore/source_zh_cn/model_train/parallel/fault_recover.md b/docs/mindspore/source_zh_cn/model_train/parallel/fault_recover.md deleted file mode 100644 index 69b4055317f60f6414b894b56afae554516f53cf..0000000000000000000000000000000000000000 --- a/docs/mindspore/source_zh_cn/model_train/parallel/fault_recover.md +++ /dev/null @@ -1,219 +0,0 @@ -# 基于冗余信息的故障恢复 - -[![查看源文件](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/master/resource/_static/logo_source.svg)](https://gitee.com/mindspore/docs/blob/master/docs/mindspore/source_zh_cn/model_train/parallel/fault_recover.md) - -## 概述 - -在进行分布式训练时,遇到故障非常普遍,与单卡训练类似,通过加载训练过程中保存的权重信息,可以继续训练。区别于纯数据并行训练,应用了模型并行后,权重被切分,各卡之间保存的权重信息可能不一致。 - -为解决上述问题,可以在保存权重checkpoint文件之前,将权重通过[AllGather](https://www.mindspore.cn/docs/zh-CN/master/api_python/samples/ops/communicate_ops.html#allgather)算子进行汇聚,每张卡均存储一个完整的权重信息。此功能即`mindspore.train.CheckpointConfig(integrated_save=True)`接口中的合并保存。 - -但是,在大模型场景下,使用合并保存时各种资源的开销过于巨大。因此,本文档介绍了一种每张卡仅保存自身权重信息的恢复方案。大模型训练往往会同时应用数据并行与模型并行,基于数据并行的维度,其划分的设备持有的权重信息完全一致,这也为大模型提供了冗余备份。本文档将介绍如何获取上述冗余信息。 - -并行策略与权重切片划分的映射关系如下: - -- 数据并行 + 不开启优化器并行:并行通信域内的rank持有相同权重切片。 -- 模型并行:并行通信域内的rank持有不同权重切片。 - -> - 关于数据并行,模型并行的概念,请参考[算子级并行](https://www.mindspore.cn/docs/zh-CN/master/model_train/parallel/operator_parallel.html)。 -> - 关于优化器并行,请参考[优化器并行](https://www.mindspore.cn/docs/zh-CN/master/model_train/parallel/optimizer_parallel.html)。 - -另外,需要注意的是,本文档介绍的分布式故障恢复方案,需要在[下沉模式](https://www.mindspore.cn/docs/zh-CN/master/model_train/train_process/optimize/sink_mode.html)下使用。 - -相关环境变量: - -`GROUP_INFO_FILE=./group_info.pb`:保存切片的权重信息。解析该文件将得到一个列表,该列表中的值为rank_id,表示这些rank_id中的权重是相同的。 - -## 操作实践 - -下面以单机8卡为例,进行分布式训练下故障恢复的操作说明: - -### 样例代码说明 - ->下载完整的样例代码:[fault_recover](https://gitee.com/mindspore/docs/tree/master/docs/sample_code/fault_recover) - -目录结构如下: - -```text -└─ sample_code - ├─ fault_recover - ├── train.py - ├── run.sh - └── recover.sh -``` - -其中,`train.py`是定义网络结构和训练过程的脚本。`run.sh`是执行脚本,`recover.sh`是节点故障后的恢复脚本。 - -### 配置分布式环境 - -通过context接口指定运行模式、运行设备、运行卡号等。与单卡脚本不同,并行脚本还需指定并行模式`parallel_mode`,并通过init初始化HCCL或NCCL通信。此处未设置`device_target`,会自动指定为MindSpore包对应的后端硬件设备。 - -```python -import mindspore as ms -from mindspore.communication import init, get_rank - -ms.set_context(mode=ms.GRAPH_MODE) -ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL) -init() -os.environ['GROUP_INFO_FILE'] = "./checkpoints/rank_{}/group_info.pb".format(get_rank()) -ms.set_seed(1) -``` - -> 此处配置环境变量GROUP_INFO_FILE存储权重的冗余信息。 - -### 数据集加载 - -在当前样例中,数据集加载方式与单卡加载方式一致,代码如下: - -```python -import os -import mindspore.dataset as ds - -def create_dataset(batch_size): - dataset_path = os.getenv("DATA_PATH") - dataset = ds.MnistDataset(dataset_path) - image_transforms = [ - ds.vision.Rescale(1.0 / 255.0, 0), - ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)), - ds.vision.HWC2CHW() - ] - label_transform = ds.transforms.TypeCast(ms.int32) - dataset = dataset.map(image_transforms, 'image') - dataset = dataset.map(label_transform, 'label') - dataset = dataset.batch(batch_size) - return dataset - -data_set = create_dataset(32) -``` - -### 定义网络 - -此处对算子配置一些切分策略,配置策略后的网络结构为: - -```python -import mindspore as ms -from mindspore import nn, ops - -class Network(nn.Cell): - def __init__(self): - super().__init__() - self.flatten = ops.Flatten() - self.fc1_weight = ms.Parameter(initializer("normal", [28*28, 512], ms.float32)) - self.fc2_weight = ms.Parameter(initializer("normal", [512, 512], ms.float32)) - self.fc3_weight = ms.Parameter(initializer("normal", [512, 10], ms.float32)) - self.matmul1 = ops.MatMul() - self.relu1 = ops.ReLU() - self.matmul2 = ops.MatMul() - self.relu2 = ops.ReLU() - self.matmul3 = ops.MatMul() - - def construct(self, x): - x = self.flatten(x) - x = self.matmul1(x, self.fc1_weight) - x = self.relu1(x) - x = self.matmul2(x, self.fc2_weight) - x = self.relu2(x) - logits = self.matmul3(x, self.fc3_weight) - return logits - -net = Network() -net.matmul1.shard(((2, 4), (4, 1))) -net.relu1.shard(((4, 1),)) -``` - -### 训练网络 - -在这一步,我们需要定义损失函数、优化器以及训练过程: - -```python -import mindspore as ms -from mindspore import nn, train -from mindspore.communication import get_rank - -optimizer = nn.SGD(net.trainable_params(), 1e-2) -loss_fn = nn.CrossEntropyLoss() -loss_cb = train.LossMonitor() -ckpt_config = train.CheckpointConfig(save_checkpoint_steps=1000, keep_checkpoint_max=4, integrated_save=False) -ckpoint_cb = train.ModelCheckpoint(prefix="checkpoint", directory="./checkpoints/rank_{}".format(get_rank()), config=ckpt_config) -model = ms.Model(net, loss_fn=loss_fn, optimizer=optimizer) -model.train(2, data_set, callbacks=[loss_cb, ckpoint_cb], dataset_sink_mode=True) -``` - -> 训练时通过指定dataset_sink_mode为True以配置为下沉模式,CheckpointConfig中需配置`integrated_save`为`False`。 - -### 故障恢复 - -分布式的故障恢复,需要事先获取切分的信息,因此需要先调用`model.infer_train_layout`得到切分策略信息,然后再执行训练。 - -```python -import mindspore as ms -from mindspore.communication import get_rank - -# model create -# checkpoint load -if bool(args_opt.is_recover): - param_dict = ms.load_checkpoint("./checkpoints/rank_{}/checkpoint-2_1875.ckpt".format(get_rank())) - model.infer_train_layout(data_set) - ms.load_param_into_net(net, param_dict) -model.train(2, data_set, callbacks=[loss_cb, ckpoint_cb], dataset_sink_mode=True) -``` - -### 运行单机8卡脚本 - -接下来通过命令调用对应的脚本,以8卡的分布式脚本为例,使用`mpirun`启动方式运行并行训练脚本: - -```bash -bash run.sh -``` - -训练完成后,可以看到以下文件: - -```text -├─ log_output -| └─ 1 -| ├─ rank.0 -| | └─ stdout -| ├─ rank.1 -| | └─ stdout -| ... -├─ checkpoints -| ├─ rank_0 -| | ├─ checkpoint-1_1875.ckpt -| | ├─ checkpoint-2_1875.ckpt -| | ├─ checkpoint-graph.meta -| | └─ group_info.pb -| ├─ rank_1 -| | ├─ checkpoint-1_1875.ckpt -| | ... -| ... -... -``` - -在`log_output/1/rank.*/stdout`中,可以看到当前训练后的loss值,类似如下: - -```text -epoch: 1 step: 1875, loss is 0.71328689217567444 -epoch: 2 step: 1875, loss is 0.32782320742607117 -``` - -读取group_info.pb,获取权重的冗余信息。解析该文件得到一个列表,该列表中的值为rank_id,表示这些列表中的rank_id对应的权重切片都是相同的,可以相互替换。 -如下面的例子,0卡的group_info.pb解析出来后,发现0卡和4卡的权重切分是完全一致的。当0卡的checkpoint丢失时,可以直接复制4卡checkpoint作为0卡的checkpoint,进行恢复。 - -```python -import mindspore as ms -rank_list = ms.restore_group_info_list("./checkpoints/rank_0/group_info.pb") -print(rank_list) // [0, 4] -``` - -然后,执行故障恢复训练脚本。 - -```bash -bash recover.sh -``` - -恢复训练结束后,查看loss如下,说明加载成功了。 - -```text -epoch: 1 step: 1875, loss is 0.598689079284668 -epoch: 2 step: 1875, loss is 0.266701698332226 -``` diff --git a/docs/mindspore/source_zh_cn/model_train/parallel/manual_parallel.md b/docs/mindspore/source_zh_cn/model_train/parallel/manual_parallel.md deleted file mode 100644 index 1d449d9ee6a8add7ccefdaf88b099d8d7f3faf7c..0000000000000000000000000000000000000000 --- a/docs/mindspore/source_zh_cn/model_train/parallel/manual_parallel.md +++ /dev/null @@ -1,178 +0,0 @@ -# 手动并行 - -[![查看源文件](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/master/resource/_static/logo_source.svg)](https://gitee.com/mindspore/docs/blob/master/docs/mindspore/source_zh_cn/model_train/parallel/manual_parallel.md) - -## 概述 - -除了MindSpore提供的自动并行和半自动并行,用户还可以基于通信原语来编码并行过程,手动把模型切分到多个节点上并行。在这种手动并行模式中,用户需要感知图切分、算子切分和集群拓扑,才能实现最优性能。 - -## 基本原理 - -MindSpore的集合通信算子包括`AllReduce`、`AllGather`、`ReduceScatter`、`Broadcast`、`NeighborExchange`、`NeighborExchangeV2`、`AlltoAll`,这些算子是分布式训练中集合通信的基本组成单元。所谓集合通信是指模型切分后,通过集合通信算子来实现不同模型切片之间的数据交互。用户可以手动调用这些算子进行数据传输,实现分布式训练。 - -集合通信算子的详细介绍参见[分布式集合通信原语](https://www.mindspore.cn/docs/zh-CN/master/api_python/samples/ops/communicate_ops.html)。 - -## 操作实践 - -下面以Ascend或者GPU单机8卡为例,进行手动数据并行操作说明: - -### 样例代码说明 - -> 下载完整的样例代码:[manual_parallel](https://gitee.com/mindspore/docs/tree/master/docs/sample_code/manual_parallel)。 - -目录结构如下: - -```text -└─ sample_code - ├─ manual_parallel - ├── train.py - └── run.sh - ... -``` - -其中,`train.py`是定义网络结构和训练过程的脚本。`run.sh`是执行脚本。 - -### 配置分布式环境 - -通过init初始化HCCL或NCCL通信,并设置随机种子。由于是手动并行,此处不指定任何并行模式。`get_rank()`接口可以获取当前设备在通信组中的rank_id,`get_group_size()`接口获取当前通信组的设备数量,通信组默认为全局通信组,包含所有设备。 - -```python -import mindspore as ms -from mindspore.communication import init, get_rank, get_group_size - -ms.set_context(mode=ms.GRAPH_MODE) -init() -cur_rank = get_rank() -batch_size = 32 -device_num = get_group_size() -shard_size = batch_size // device_num -``` - -### 网络定义 - -在单卡网络的基础上,增加了对输入数据的切分: - -```python -from mindspore import nn -from mindspore.communication import get_rank, get_group_size - -class Network(nn.Cell): - def __init__(self): - super().__init__() - self.flatten = nn.Flatten() - self.layer1 = nn.Dense(28*28, 512) - self.relu1 = nn.ReLU() - self.layer2 = nn.Dense(512, 512) - self.relu2 = nn.ReLU() - self.layer3 = nn.Dense(512, 10) - - def construct(self, x): - x = x[cur_rank*shard_size:cur_rank*shard_size + shard_size] - x = self.flatten(x) - x = self.layer1(x) - x = self.relu1(x) - x = self.layer2(x) - x = self.relu2(x) - logits = self.layer3(x) - return logits - -net = Network() -``` - -### 数据集加载 - -数据集加载方式与单卡网络一致: - -```python -import os -import mindspore.dataset as ds - -def create_dataset(): - dataset_path = os.getenv("DATA_PATH") - dataset = ds.MnistDataset(dataset_path) - image_transforms = [ - ds.vision.Rescale(1.0 / 255.0, 0), - ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)), - ds.vision.HWC2CHW() - ] - label_transform = ds.transforms.TypeCast(ms.int32) - dataset = dataset.map(image_transforms, 'image') - dataset = dataset.map(label_transform, 'label') - dataset = dataset.batch(batch_size) - return dataset - -data_set = create_dataset() -``` - -### 损失函数定义 - -在损失函数中,需要增加对label的切分,以及通信原语算子`ops.AllReduce`来聚合各卡的损失: - -```python -from mindspore import nn, ops -from mindspore.communication import get_rank, get_group_size - -class ReduceLoss(nn.Cell): - def __init__(self): - super().__init__() - self.loss = nn.CrossEntropyLoss() - self.all_reduce = ops.AllReduce() - - def construct(self, data, label): - label = label[cur_rank*shard_size:cur_rank*shard_size + shard_size] - loss_value = self.loss(data, label) - loss_value = self.all_reduce(loss_value) / device_num - return loss_value - -loss_fn = ReduceLoss() -``` - -### 训练过程定义 - -优化器、训练过程与单卡网络一致: - -```python -import mindspore as ms -from mindspore import nn, train - -optimizer = nn.SGD(net.trainable_params(), 1e-2) -loss_cb = train.LossMonitor(20) -model = ms.Model(net, loss_fn=loss_fn, optimizer=optimizer) -model.train(10, data_set, callbacks=[loss_cb]) -``` - -### 运行单机8卡脚本 - -接下来通过命令调用对应的脚本,以8卡的分布式训练脚本为例,使用`mpirun`启动方式进行分布式训练: - -```bash -bash run.sh -``` - -训练完后,日志文件保存到`log_output`目录下,通过设置环境变量`MS_DEV_SAVE_GRAPHS`的值为2,可以打印出编译过程中的IR图,其中部分文件目录结构如下: - -```text -└─ log_output - └─ 1 - ├─ rank.0 - | └─ stdout - ├─ rank.1 - | └─ stdout - ... -``` - -关于Loss部分结果保存在`log_output/1/rank.*/stdout`中,示例如下: - -```text -epoch: 1 step: 20, loss is 2.241283893585205 -epoch: 1 step: 40, loss is 2.1842331886291504 -epoch: 1 step: 60, loss is 2.0627782344818115 -epoch: 1 step: 80, loss is 1.9561686515808105 -epoch: 1 step: 100, loss is 1.8991656303405762 -epoch: 1 step: 120, loss is 1.6239635944366455 -epoch: 1 step: 140, loss is 1.465965747833252 -epoch: 1 step: 160, loss is 1.3662006855010986 -epoch: 1 step: 180, loss is 1.1562917232513428 -epoch: 1 step: 200, loss is 1.116426944732666 -... -``` diff --git a/docs/mindspore/source_zh_cn/model_train/parallel/optimize_technique.rst b/docs/mindspore/source_zh_cn/model_train/parallel/optimize_technique.rst deleted file mode 100644 index aef354d491ff85ee421bcfdc07446e0a1259fd89..0000000000000000000000000000000000000000 --- a/docs/mindspore/source_zh_cn/model_train/parallel/optimize_technique.rst +++ /dev/null @@ -1,44 +0,0 @@ -优化方法 -======================== - -.. image:: https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/master/resource/_static/logo_source.svg - :target: https://gitee.com/mindspore/docs/blob/master/docs/mindspore/source_zh_cn/model_train/parallel/optimize_technique.rst - :alt: 查看源文件 - -.. toctree:: - :maxdepth: 1 - :hidden: - - strategy_select - split_technique - multiple_copy - high_dimension_tensor_parallel - distributed_gradient_accumulation - recompute - dataset_slice - host_device_training - memory_offload - comm_fusion - comm_subgraph - -考虑到实际并行训练中,可能会对训练性能、吞吐量或规模有要求,可以从三个方面考虑优化:并行策略优化、内存优化和通信优化 - -- 并行策略优化:并行策略优化主要包括并行策略的选择、算子级并行下的切分技巧以及多副本技巧。 - - - `策略选择 `_:根据模型规模和数据量大小,可以选择不同的并行策略,以提高训练效率和资源利用率。 - - `切分技巧 `_:切分技巧是指通过手动配置某些关键算子的切分策略,减少张量重排布来提升训练效率。 - - `多副本 `_:多副本是指在一个迭代步骤中,将一个训练batch拆分成多个,将模型并行通信与计算进行并发,提升资源利用率。 - - `高维张量并行 `_:高维张量并行是指对于模型并行中的MatMul计算中的激活、权重张量进行多维度切分,通过优化切分策略降低通信量,提高训练效率。 - -- 内存优化:内存优化包括梯度累加、重计算、数据集切分、Host&Device异构和异构存储,主要目标是节省内存空间。 - - - `梯度累加 `_:梯度累加通过在多个MicroBatch上计算梯度并将它们累加起来,然后一次性应用这个累加梯度来更新神经网络的参数。通过这种方法少量设备也能训练大Batch,有效减低内存峰值。 - - `重计算 `_:重计算是一种以时间换空间的技术,通过不保存某些正向算子的计算结果,以节省内存空间,在计算反向算子时,需要用到正向结果再重新计算正向算子。 - - `数据集切分 `_:数据集单个数据过大甚至无法加载到单个设备的时候,可以对数据进行切分,进行分布式训练。数据集切分配合模型并行是有效降低显存占用的方式。 - - `Host&Device异构 `_:在遇到参数量超过Device内存上限的时候,可以把一些内存占用量大且计算量少的算子放在Host端,这样能同时利用Host端内存大,Device端计算快的特性,提升了设备的利用率。 - - `异构存储 `_:异构存储可以将暂时不需要用到的参数或中间结果拷贝到Host端内存或者硬盘,在需要时再恢复至Device端,从而减少显存占用。 - -- 通信优化:通信优化包括通信融合和通信子图提取与复用,主要目标是减少通信延时,提升性能。 - - - `通信融合 `_:通信融合可以将相同源节点和目标节点的通信算子合并到一次通信过程,避免多次通信带来额外开销。 - - `通信子图提取与复用 `_:通过对通信算子提取通信子图,替换原本的通信算子,可以减少通信耗时,同时减少模型编译时间。 diff --git a/docs/mindspore/source_zh_cn/model_train/parallel/others.rst b/docs/mindspore/source_zh_cn/model_train/parallel/others.rst deleted file mode 100644 index 69a20be91fe3ecba26457f3a6898b3bc117e322e..0000000000000000000000000000000000000000 --- a/docs/mindspore/source_zh_cn/model_train/parallel/others.rst +++ /dev/null @@ -1,13 +0,0 @@ -实验特性 -======================== - -.. image:: https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/master/resource/_static/logo_source.svg - :target: https://gitee.com/mindspore/docs/blob/master/docs/mindspore/source_zh_cn/model_train/parallel/others.rst - :alt: 查看源文件 - -.. toctree:: - :maxdepth: 1 - - distributed_graph_partition - shard_function_parallel - support_dynamic_shape_in_parallel diff --git a/docs/mindspore/source_zh_cn/model_train/parallel/overview.md b/docs/mindspore/source_zh_cn/model_train/parallel/overview.md index 34d59a71503cc2ac1723fc374c5ce95d87d8ce2b..59df4de1ff5f507501d36feaf15e8fc47b250fad 100644 --- a/docs/mindspore/source_zh_cn/model_train/parallel/overview.md +++ b/docs/mindspore/source_zh_cn/model_train/parallel/overview.md @@ -6,17 +6,6 @@ 要实现分布式并行训练和推理,您可以参考以下指引: -## 启动方式 - -MindSpore目前支持四种启动方式: - -- **msrun**:是动态组网的封装,允许用户使用单命令行指令在各节点拉起分布式任务,安装MindSpore后即可使用,不依赖外部配置或者模块,支持Ascend/GPU/CPU。 -- **动态组网**:通过MindSpore内部动态组网模块启动,不依赖外部配置或者模块,支持Ascend/GPU/CPU。 -- **mpirun**:通过多进程通信库OpenMPI启动,支持Ascend/GPU。 -- **rank table**:配置rank_table表后,通过脚本启动和卡数对应的进程,支持Ascend。 - -详细可参考[分布式并行启动方式](https://www.mindspore.cn/docs/zh-CN/master/model_train/parallel/startup_method.html)章节。 - ## 并行模式 目前MindSpore可以采取下述的几种并行模式,您可以按需求选择: diff --git a/docs/mindspore/source_zh_cn/model_train/parallel/parameter_server_training.md b/docs/mindspore/source_zh_cn/model_train/parallel/parameter_server_training.md deleted file mode 100644 index e134230e506917da663dd31e20bcfec87bc58509..0000000000000000000000000000000000000000 --- a/docs/mindspore/source_zh_cn/model_train/parallel/parameter_server_training.md +++ /dev/null @@ -1,260 +0,0 @@ -# 参数服务器 - -[![查看源文件](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/master/resource/_static/logo_source.svg)](https://gitee.com/mindspore/docs/blob/master/docs/mindspore/source_zh_cn/model_train/parallel/parameter_server_training.md) - -## 概述 - -Parameter Server(参数服务器)是一种在分布式训练中广泛使用的架构。该架构一共包含三个独立的组件,分别是Server、Worker和Scheduler,其作用分别是: - -- Server:保存模型的权重和反向计算的梯度值,并使用优化器通过Worker上传的梯度值对模型进行更新。 - -- Worker:执行网络的正反向计算,通过Push接口将反向计算的梯度值上传至Server中,通过Pull接口将Server更新好的模型下载到Worker本地。 - -- Scheduler:用于建立Server和Worker的通信关系。 - -相较于同步的AllReduce训练方法,参数服务器具有更好的灵活性、可扩展性。原因有三点: - -1. 参数服务器既支持同步SGD(Stochastic Gradient Descent,随机梯度下降),也支持异步SGD的训练算法。 -2. 在扩展性上,参数服务器将模型的计算与更新分别部署在Worker和Server两类进程中,使得两者的资源可以独立地横向扩缩(新增或者删除Worker和Server资源)。 -3. 在大规模数据中心的环境下,计算设备、网络以及存储经常会出现各种故障,导致部分节点异常。而在参数服务器的架构下,处理此类故障更容易,且不会对训练中的任务产生影响。 - -MindSpore的参数服务器采用了自研的通信框架作为基础架构。基于该框架提供了远程通信能力以及抽象的Send/Broadcast等原语,实现了同步SGD的分布式训练算法。另外,结合Ascend和GPU中的高性能集合通信库(HCCL 和 NCCL),MindSpore还提供了参数服务器和AllReduce的混合训练模式,支持将部分权重通过参数服务器进行存储和更新,其余权重仍然通过AllReduce算法进行训练。 - -> 参数服务器支持Ascend、GPU硬件平台,不支持`PyNative`模式。 - -相关接口: - -1. `mindspore.set_ps_context(enable_ps=True)`开启参数服务器训练模式。 - - - 此接口需在`mindspore.communication.init()`之前调用。 - - 若没有调用此接口,下面的环境变量设置不会生效。 - - 调用`mindspore.reset_ps_context()`可以关闭参数服务器训练模式。 - -2. 在该训练模式下,有以下两种调用接口方式,可以控制训练参数是否通过参数服务器进行更新,以及控制参数初始化位置: - - - 通过`mindspore.nn.Cell.set_param_ps()`对`nn.Cell`中所有权重进行递归设置。 - - 通过`mindspore.Parameter.set_param_ps()`对`mindspore.Parameter`权重进行设置。 - - 注意: - - - 对于通过参数服务器更新的训练参数,其单个权重大小不得超过INT_MAX(2^31 - 1)字节。 - - 接口`set_param_ps`可接收一个`bool`型参数:`init_in_server`,表示该训练参数是否在Server端初始化,其默认值为`False`,表示在Worker上初始化该训练参数。 - - 当前仅支持`EmbeddingLookup`算子的训练参数`embedding_table`在Server端初始化,以解决超大shape的`embedding_table`在Worker上初始化导致内存不足的问题。该算子的`target`属性需要设置为'CPU'。在Server端初始化的训练参数将不再同步到Worker上,如果涉及到多Server训练并保存CheckPoint,则训练结束后每个Server均会保存一个 CheckPoint。上述`embedding_table`表示一个二维表,用于储存和管理学习模型中使用到的嵌入向量。 - -3. (可选配置)针对超大shape的`embedding_table`,由于设备上无法全量存放,可以配置[EmbeddingLookup 算子](https://www.mindspore.cn/docs/zh-CN/master/api_python/nn/mindspore.nn.EmbeddingLookup.html)的`vocab_cache_size`参数,开启参数服务器训练模式下的**分布式特征缓存功能**。该功能将在设备上使用一块`vocab_cache_size`大小的独占空间作为缓存 (Embedding Cache),供部分`embedding_table`在设备上训练,以达到提升训练性能的目的。而全量`embedding_table`仍旧存储在Server上。在训练过程中,将下批次训练用到的`embedding_table`提前放入Embedding Cache,当Embedding Cache已满,过期的`embedding_table`将会被放回至Server。训练结束后,可在Server上导出CheckPoint,保存训练后的全量`embedding_table`。Embedding Cache支持sparse模式。针对配置了`vocab_cache_size`的`EmbeddingLookup`算子,通过将其`sparse`参数都设为True,sparse模式会对该算子输入的特征id去重,以降低计算与通信量。 - -相关环境变量配置: - -MindSpore通过读取环境变量,控制参数服务器训练。环境变量包括以下选项(所有脚本中的`MS_SCHED_HOST`及`MS_SCHED_PORT`值需保持一致): - -```text -export MS_SERVER_NUM=1 # Server number -export MS_WORKER_NUM=1 # Worker number -export MS_SCHED_HOST=XXX.XXX.XXX.XXX # Scheduler IP address -export MS_SCHED_PORT=XXXX # Scheduler port -export MS_ROLE=MS_SCHED # The role of this process: MS_SCHED represents the scheduler, MS_WORKER represents the worker, MS_PSERVER represents the Server -``` - -更多详细说明请查看[动态组网环境变量](https://www.mindspore.cn/docs/zh-CN/master/model_train/parallel/dynamic_cluster.html)。 - -## 操作实践 - -参数服务器支持GPU和Ascend,下面以Ascend为例进行操作说明: - -### 样例代码说明 - -> 下载完整的样例代码:[parameter_server](https://gitee.com/mindspore/docs/tree/master/docs/sample_code/parameter_server)。 - -目录结构如下: - -```text -└─ sample_code - ├─ parameter_server - ├── train.py - └── run.sh - ... -``` - -其中,`train.py`是定义网络结构和训练过程的脚本。`run.sh`是执行脚本。 - -### 配置分布式环境 - -通过context接口指定运行模式、运行设备、运行卡号等。与单卡脚本不同,并行脚本还需指定并行模式`parallel_mode`,使能`enable_ps`开启参数服务器训练模式,并通过init初始化HCCL或NCCL通信。此处未设置`device_target`,会自动指定为MindSpore包对应的后端硬件设备。 - -```python -import mindspore as ms -from mindspore.communication import init - -ms.set_context(mode=ms.GRAPH_MODE) -ms.set_auto_parallel_context(full_batch=True, parallel_mode=ms.ParallelMode.AUTO_PARALLEL) -ms.set_ps_context(enable_ps=True) -init() -ms.set_seed(1) -``` - -- `full_batch`:是否全量导入数据集。为`True`时表示全量导入,每卡的数据相同,在多Worker场景中必须设置为`True`。 -- `parallel_mode`:并行模式。多Worker场景下,需要开启自动并行模式,通过设置`parallel_mode=ParallelMode.AUTO_PARALLEL`实现。 - -### 网络定义 - -参数服务器模式的网络定义是在单卡模式的基础上配置 net.set_param_ps(): - -```python -from mindspore import nn - -class Network(nn.Cell): - def __init__(self): - super().__init__() - self.flatten = nn.Flatten() - self.fc1 = nn.Dense(28*28, 10, weight_init="normal", bias_init="zeros") - self.relu = nn.ReLU() - self.fc2 = nn.Dense(10, 1, weight_init="normal", bias_init="zeros") - - def construct(self, x): - x = self.flatten(x) - logits = self.fc2(self.relu(self.fc1(x))) - return logits - -net = Network() -net.set_param_ps() -``` - -### 数据集加载 - -数据集加载方式与单卡模型一致,代码如下: - -```python -import os -import mindspore.dataset as ds - -def create_dataset(batch_size): - dataset_path = os.getenv("DATA_PATH") - dataset = ds.MnistDataset(dataset_path) - image_transforms = [ - ds.vision.Rescale(1.0 / 255.0, 0), - ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)), - ds.vision.HWC2CHW() - ] - label_transform = ds.transforms.TypeCast(ms.int32) - dataset = dataset.map(image_transforms, 'image') - dataset = dataset.map(label_transform, 'label') - dataset = dataset.batch(batch_size) - return dataset - -data_set = create_dataset(32) -``` - -### 训练网络 - -在这一部分,定义优化器,损失函数和训练网络。此处采用函数式写法来定义网络,代码与单卡模式一致: - -```python -import mindspore as ms -from mindspore import nn - -optimizer = nn.SGD(net.trainable_params(), 1e-2) -loss_fn = nn.MSELoss() - -def forward_fn(data, target): - logits = net(data) - loss = loss_fn(logits, target) - return loss, logits - -grad_fn = ms.value_and_grad(forward_fn, None, net.trainable_params(), has_aux=True) - -@ms.jit -def train_step(inputs, targets): - (loss_value, _), grads = grad_fn(inputs, targets) - optimizer(grads) - return loss_value - -for epoch in range(10): - i = 0 - for image, label in data_set: - loss_output = train_step(image, label) - if i % 10 == 0: - print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss_output)) - i += 1 -``` - -### 运行单机 8 卡脚本 - -接下来通过命令调用对应的脚本,以8卡的分布式训练脚本为例,进行分布式训练。Scheduler、Server和Worker三个角色分别启动对应数量的进程。命令如下: - -```bash -EXEC_PATH=$(pwd) - -if [ ! -d "${EXEC_PATH}/MNIST_Data" ]; then - if [ ! -f "${EXEC_PATH}/MNIST_Data.zip" ]; then - wget http://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/MNIST_Data.zip - fi - unzip MNIST_Data.zip -fi -export DATA_PATH=${EXEC_PATH}/MNIST_Data/train/ - -rm -rf output -mkdir output - -# run Scheduler process -export MS_SERVER_NUM=8 -export MS_WORKER_NUM=8 -export MS_SCHED_HOST=127.0.0.1 -export MS_SCHED_PORT=8118 -export MS_ROLE=MS_SCHED -python train.py > output/scheduler.log 2>&1 & - -# run Server processes -export MS_SERVER_NUM=8 -export MS_WORKER_NUM=8 -export MS_SCHED_HOST=127.0.0.1 -export MS_SCHED_PORT=8118 -export MS_ROLE=MS_PSERVER -for((server_id=0;server_id<${MS_SERVER_NUM};server_id++)) -do - python train.py > output/server_${server_id}.log 2>&1 & -done - -# run Wroker processes -export MS_SERVER_NUM=8 -export MS_WORKER_NUM=8 -export MS_SCHED_HOST=127.0.0.1 -export MS_SCHED_PORT=8118 -export MS_ROLE=MS_WORKER -for((worker_id=0;worker_id<${MS_WORKER_NUM};worker_id++)) -do - python train.py > output/worker_${worker_id}.log 2>&1 & -done -``` - -或者直接执行: - -```bash -bash run.sh -``` - -每个进程的输出结果保存在`output`文件夹中,可以在`output/scheduler.log`中查看Server与Worker通信日志: - -```text -... -Assign rank id of node id: 2fa9d1ab-10b8-4a61-9acf-217a04439287, role: MS_WORKER, with host ip: 127.0.0.1, old rank id: 6, new rank id: 0 -... -Assign rank id of node id: 02fb1169-edc3-465e-b307-ccaf62d1f0b3, role: MS_PSERVER, with host ip: 127.0.0.1, old rank id: 4, new rank id: 0 -... -Cluster is successfully initialized. -``` - -训练结果保存在`output/worker_0.log`中,示例如下: - -```text -epoch: 0, step: 0, loss is 26.743706 -epoch: 0, step: 10, loss is 17.507723 -epoch: 0, step: 20, loss is 9.616591 -epoch: 0, step: 30, loss is 8.589715 -epoch: 0, step: 40, loss is 8.23479 -epoch: 0, step: 50, loss is 10.431321 -epoch: 0, step: 60, loss is 7.7080607 -epoch: 0, step: 70, loss is 8.599786 -epoch: 0, step: 80, loss is 7.669814 -epoch: 0, step: 90, loss is 8.584343 -epoch: 0, step: 100, loss is 8.803712 -```