From cb5af54445ca049d6d4daa6ebfde570abee19756 Mon Sep 17 00:00:00 2001 From: zhaowenxuan Date: Wed, 19 Mar 2025 10:14:42 +0800 Subject: [PATCH] tutorials modify optimizer parallel --- .../distributed_optimizer_parallel.py | 63 +++--- .../distributed_optimizer_parallel/run.sh | 8 +- tutorials/source_zh_cn/index.rst | 1 + .../parallel/optimizer_parallel.md | 189 ++++++++++++++++++ 4 files changed, 236 insertions(+), 25 deletions(-) create mode 100644 tutorials/source_zh_cn/parallel/optimizer_parallel.md diff --git a/docs/sample_code/distributed_optimizer_parallel/distributed_optimizer_parallel.py b/docs/sample_code/distributed_optimizer_parallel/distributed_optimizer_parallel.py index c2af233867..f4672f08b0 100644 --- a/docs/sample_code/distributed_optimizer_parallel/distributed_optimizer_parallel.py +++ b/docs/sample_code/distributed_optimizer_parallel/distributed_optimizer_parallel.py @@ -20,9 +20,10 @@ import mindspore as ms import mindspore.dataset as ds from mindspore import nn from mindspore.communication import init +from mindspore.nn.utils import no_init_parameters +from mindspore.parallel.auto_parallel import AutoParallel ms.set_context(mode=ms.GRAPH_MODE) -ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, enable_parallel_optimizer=True) init() ms.set_seed(1) @@ -45,7 +46,9 @@ class Network(nn.Cell): logits = self.layer3(x) return logits -net = Network() +with no_init_parameters: + net = Network() + optimizer = nn.SGD(net.trainable_params(), 1e-2) net.layer1.set_comm_fusion(0) net.layer2.set_comm_fusion(1) net.layer3.set_comm_fusion(2) @@ -65,29 +68,41 @@ def create_dataset(batch_size): dataset = dataset.batch(batch_size) return dataset -data_set = create_dataset(32) -optimizer = nn.SGD(net.trainable_params(), 1e-2) -loss_fn = nn.CrossEntropyLoss() +def test_distributed_optimizer_parallel(): + """ + Tests the distributed optimizer parallel functionality. -def forward_fn(data, target): - """forward propagation""" - logits = net(data) - loss = loss_fn(logits, target) - return loss, logits + This function runs a test case to verify that the distributed optimizer + works correctly in a parallel environment. -grad_fn = ms.value_and_grad(forward_fn, None, net.trainable_params(), has_aux=True) + Returns: + bool: True if the test passes, False otherwise. + """ + data_set = create_dataset(32) + loss_fn = nn.CrossEntropyLoss() -@ms.jit -def train_step(inputs, targets): - """train_step""" - (loss_value, _), grads = grad_fn(inputs, targets) - optimizer(grads) - return loss_value + def forward_fn(data, target): + """forward propagation""" + logits = net(data) + loss = loss_fn(logits, target) + return loss, logits -for epoch in range(10): - i = 0 - for image, label in data_set: - loss_output = train_step(image, label) - if i % 10 == 0: - print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss_output)) - i += 1 + grad_fn = ms.value_and_grad(forward_fn, None, net.trainable_params(), has_aux=True) + + @ms.jit + def train_step(inputs, targets): + """train_step""" + (loss_value, _), grads = grad_fn(inputs, targets) + optimizer(grads) + return loss_value + + parallel_net = AutoParallel(train_step, parallel_mode="semi_auto") + parallel_net.hsdp() + + for epoch in range(10): + i = 0 + for image, label in data_set: + loss_output = parallel_net(image, label) + if i % 10 == 0: + print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss_output)) + i += 1 diff --git a/docs/sample_code/distributed_optimizer_parallel/run.sh b/docs/sample_code/distributed_optimizer_parallel/run.sh index 92f98f4945..f8d1a15293 100644 --- a/docs/sample_code/distributed_optimizer_parallel/run.sh +++ b/docs/sample_code/distributed_optimizer_parallel/run.sh @@ -15,4 +15,10 @@ if [ ! -d "${EXEC_PATH}/MNIST_Data" ]; then fi export DATA_PATH=${EXEC_PATH}/MNIST_Data/train/ -mpirun -n 8 --output-filename log_output --merge-stderr-to-stdout python distributed_optimizer_parallel.py +msrun --worker_num=8 \ + --local_worker_num=8 \ + --master_addr=127.0.0.1 \ + --master_port=10969 \ + --join=True \ + --log_dir=./log_output \ + pytest -s -v distributed_optimizer_parallel.py::test_distributed_optimizer_parallel diff --git a/tutorials/source_zh_cn/index.rst b/tutorials/source_zh_cn/index.rst index deb1815ab9..d7a310e8b2 100644 --- a/tutorials/source_zh_cn/index.rst +++ b/tutorials/source_zh_cn/index.rst @@ -59,6 +59,7 @@ MindSpore教程 parallel/distributed_case parallel/optimize_technique + parallel/optimize_parallel .. toctree:: :glob: diff --git a/tutorials/source_zh_cn/parallel/optimizer_parallel.md b/tutorials/source_zh_cn/parallel/optimizer_parallel.md new file mode 100644 index 0000000000..1affee7ea0 --- /dev/null +++ b/tutorials/source_zh_cn/parallel/optimizer_parallel.md @@ -0,0 +1,189 @@ +# 优化器并行 + +在进行数据并行训练时,模型的参数更新部分在各卡间存在冗余计算,优化器并行通过将优化器的计算量分散到数据并行维度的卡上,在大规模网络上(比如Bert、GPT)可以有效减少内存消耗并提升网络性能。 + +[![查看源文件](https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/website-images/master/resource/_static/logo_source.svg)] +(https://gitee.com/mindspore/docs/blob/master/tutorials/source_zh_cn/parallel/optimizer_parallel.md) + +下面以Ascend单机8卡为例,进行优化器并行操作说明: + +## 样例代码说明 + +> 下载完整的样例代码:[distributed_optimizer_parallel](https://gitee.com/mindspore/docs/tree/master/docs/sample_code/distributed_optimizer_parallel)。 + +目录结构如下: + +```text +└─ sample_code + ├─ distributed_optimizer_parallel + ├── distributed_optimizer_parallel.py + └── run.sh + ... +``` + +其中,`distributed_optimizer_parallel.py`是定义网络结构和训练过程的脚本。`run.sh`是执行脚本。 + +## 配置分布式环境 + +通过context接口指定运行模式、运行设备、运行卡号等,与单卡脚本不同,并行脚本还需init初始化HCCL或NCCL通信。 + +```python +import mindspore as ms +from mindspore.communication import init + +ms.set_context(mode=ms.GRAPH_MODE) +init() +ms.set_seed(1) +``` + +## 数据集加载 + +在优化器并行场景下,数据集加载方式与单卡加载方式一致,代码如下: + +```python +import os +import mindspore.dataset as ds + +def create_dataset(batch_size): + """create dataset""" + dataset_path = os.getenv("DATA_PATH") + dataset = ds.MnistDataset(dataset_path) + image_transforms = [ + ds.vision.Rescale(1.0 / 255.0, 0), + ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)), + ds.vision.HWC2CHW() + ] + label_transform = ds.transforms.TypeCast(ms.int32) + dataset = dataset.map(image_transforms, 'image') + dataset = dataset.map(label_transform, 'label') + dataset = dataset.batch(batch_size) + return dataset + +data_set = create_dataset(32) +``` + +## 定义网络和优化器 + +优化器并行网络结构与单卡网络结构基本一致,区别在于增加了通信算子融合的配置, 以及需要对网络和优化器进行延后初始化: + +```python +from mindspore import nn +from mindspore.nn.utils import no_init_parameters + +class Network(nn.Cell): + def __init__(self): + super().__init__() + self.flatten = nn.Flatten() + self.layer1 = nn.Dense(28*28, 512) + self.layer2 = nn.Dense(512, 512) + self.layer3 = nn.Dense(512, 10) + self.relu = nn.ReLU() + + def construct(self, x): + x = self.flatten(x) + x = self.layer1(x) + x = self.relu(x) + x = self.layer2(x) + x = self.relu(x) + logits = self.layer3(x) + return logits + +with no_init_parameters: + net = Network() + optimizer = nn.SGD(net.trainable_params(), 1e-2) +net.layer1.set_comm_fusion(0) +net.layer2.set_comm_fusion(1) +net.layer3.set_comm_fusion(2) +``` + +> 这里为了减少通信成本,为不同层配置了通信融合,详细可以参考[通信算子融合](https://www.mindspore.cn/docs/zh-CN/master/model_train/parallel/comm_fusion.html)。 + +## 训练网络定义 + +在这一步,我们需要定义损失函数以及训练步骤,这部分与单卡写法一致: + +```python +import mindspore as ms +from mindspore import nn + +optimizer = nn.SGD(net.trainable_params(), 1e-2) +loss_fn = nn.CrossEntropyLoss() + +def forward_fn(data, target): + logits = net(data) + loss = loss_fn(logits, target) + return loss, logits + +grad_fn = ms.value_and_grad(forward_fn, None, net.trainable_params(), has_aux=True) + +@ms.jit +def train_step(inputs, targets): + (loss_value, _), grads = grad_fn(inputs, targets) + optimizer(grads) + return loss_value + +``` + +## 并行配置 + +我们需要进一步设置并行有关的配置,指定并行模式`semi_auto`为半自动并行模式,此外,还需开启优化器并行,配置`hsdp`。 + +```python +from mindspore.parallel.auto_parallel import AutoParallel + +parallel_net = AutoParallel(train_step, parallel_mode="semi_auto") +parallel_net.hsdp() + +``` + +## 训练循环 + +这一步进行训练循环,外层循环是训练的epoch数,内层循环遍历数据集,调用parallel_net进行训练并获得损失值。 + +```python +for epoch in range(10): + i = 0 + for image, label in data_set: + loss_output = parallel_net(image, label) + if i % 10 == 0: + print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss_output)) + i += 1 +``` + +## 运行单机八卡脚本 + +接下来通过命令调用对应的脚本,以`msrun`启动方式,8卡的分布式训练脚本为例,进行分布式训练: + +```bash +bash run.sh +``` + +训练完后,日志文件保存到`log_output`目录下,其中部分文件目录结构如下: + +```text +└─ log_output + ├─ scheduler.log + ├─ worker_0.log + ├─ worker_1.log +... +``` + +结果保存在`log_output/worker_*.py`中,示例如下: + +```text +epoch: 0, step: 0, loss is 2.3024087 +epoch: 0, step: 10, loss is 2.2921634 +epoch: 0, step: 20, loss is 2.278274 +epoch: 0, step: 30, loss is 2.2537143 +epoch: 0, step: 40, loss is 2.1638 +epoch: 0, step: 50, loss is 1.984318 +epoch: 0, step: 60, loss is 1.6061916 +epoch: 0, step: 70, loss is 1.20966 +epoch: 0, step: 80, loss is 0.98156196 +epoch: 0, step: 90, loss is 0.77229893 +epoch: 0, step: 100, loss is 0.6854114 +... +``` + +其他启动方式如`mpirun`、`rank table`的启动可参考[启动方式](https://www.mindspore.cn/tutorials/zh-CN/master/parallel/startup_method.html)。 + -- Gitee