diff --git a/docs/sample_code/distributed_optimizer_parallel/distributed_optimizer_parallel.py b/docs/sample_code/distributed_optimizer_parallel/distributed_optimizer_parallel.py index c2af2338677ef5c9d7c81517d9cd2fe1df7cea6d..da8221555366e9d73261170f610e0f3f83394892 100644 --- a/docs/sample_code/distributed_optimizer_parallel/distributed_optimizer_parallel.py +++ b/docs/sample_code/distributed_optimizer_parallel/distributed_optimizer_parallel.py @@ -22,7 +22,6 @@ from mindspore import nn from mindspore.communication import init ms.set_context(mode=ms.GRAPH_MODE) -ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, enable_parallel_optimizer=True) init() ms.set_seed(1) @@ -84,6 +83,9 @@ def train_step(inputs, targets): optimizer(grads) return loss_value +parallel_net = AutoParallel(train_step, parallel_mode="semi_auto") +parallel_net.hsdp() + for epoch in range(10): i = 0 for image, label in data_set: diff --git a/docs/sample_code/distributed_pipeline_parallel/distributed_pipeline_parallel.py b/docs/sample_code/distributed_pipeline_parallel/distributed_pipeline_parallel.py index 279c8d44484cfbd4f6b32a80763c236f00881c5f..e512701849f8f5d02bd67450ec815b8b4ccaac8e 100644 --- a/docs/sample_code/distributed_pipeline_parallel/distributed_pipeline_parallel.py +++ b/docs/sample_code/distributed_pipeline_parallel/distributed_pipeline_parallel.py @@ -25,7 +25,6 @@ from mindspore.common.initializer import initializer, HeUniform ms.set_context(mode=ms.GRAPH_MODE) -ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, pipeline_stages=2) init() ms.set_seed(1) @@ -72,13 +71,6 @@ class Network(nn.Cell): logits = self.layer3(x) return logits -net = Network() -net.layer1.pipeline_stage = 0 -net.relu1.pipeline_stage = 0 -net.layer2.pipeline_stage = 0 -net.relu2.pipeline_stage = 1 -net.layer3.pipeline_stage = 1 - def create_dataset(batch_size): """create dataset""" dataset_path = os.getenv("DATA_PATH") @@ -116,6 +108,9 @@ def train_one_step(inputs, target): optimizer(grads) return loss, grads +parallel_net = AutoParallel(train_step, parallel_mode="semi_auto") +parallel_net.pipeline(stages=2) + for epoch in range(10): i = 0 for data, label in data_set: diff --git a/tutorials/source_zh_cn/parallel/data_parallel.md b/tutorials/source_zh_cn/parallel/data_parallel.md new file mode 100644 index 0000000000000000000000000000000000000000..48e8fa8fc617d614c9171b14b26b3f3c1b463d78 --- /dev/null +++ b/tutorials/source_zh_cn/parallel/data_parallel.md @@ -0,0 +1,174 @@ +# 数据并行 + +下面以Ascend或者GPU单机8卡为例,进行数据并行操作说明: + +## 样例代码说明 + +> 您可以在这里下载完整的样例代码: +> +> 。 + +目录结构如下: + +```text +└─ sample_code + ├─ distributed_data_parallel + ├── distributed_data_parallel.py + └── run.sh + ... +``` + +其中,`distributed_data_parallel.py`是定义网络结构和训练过程的脚本。`run.sh`是执行脚本。 + +## 配置分布式环境 + +通过context接口可以指定运行模式、运行设备、运行卡号等。与单卡脚本不同,并行脚本还需指定并行模式`parallel_mode`为数据并行模式,并通过init根据不同的设备需求初始化HCCL、NCCL或者MCCL 通信。在数据并行模式还可以设置`gradients_mean`指定梯度聚合方式。此处未设置`device_target`,会自动指定为MindSpore包对应的后端硬件设备。 + +```python +import mindspore as ms +from mindspore.communication import init + +ms.set_context(mode=ms.GRAPH_MODE) +ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.DATA_PARALLEL, gradients_mean=True) +init() +ms.set_seed(1) +``` + +其中,`gradients_mean=True`是为了在反向计算时,框架内部会将数据并行参数分散在多台机器的梯度值进行聚合,得到全局梯度值后再传入优化器中更新。首先通过AllReduce(op=ReduceOp.SUM)对梯度做规约求和,接着根据gradients_mean的值来判断是否求均值(设置为True则求均值,否则不求,默认为False)。 + +## 数据并行模式加载数据集 + +数据并行模式跟其他模式最大区别在于数据加载方式的不同,数据是以并行的方式导入的。下面我们以MNIST数据集为例,介绍以数据并行方式导入MNIST数据集的方法,`dataset_path`是指数据集的路径。 + +```python +import mindspore.dataset as ds +from mindspore.communication import get_rank, get_group_size + +rank_id = get_rank() +rank_size = get_group_size() +dataset = ds.MnistDataset(dataset_path, num_shards=rank_size, shard_id=rank_id) +``` + +其中,与单卡不同的是,在数据集接口需要传入`num_shards`和`shard_id`参数,分别对应卡的数量和逻辑序号,建议通过`mindspore.communication`接口获取: + +- `get_rank`:获取当前设备在集群中的ID。 +- `get_group_size`:获取集群数量。 + +> 数据并行场景加载数据集时,建议对每卡指定相同的数据集文件,若是各卡加载的数据集不同,可能会影响计算精度。 + +完整的数据处理代码: + +```python +import os +import mindspore.dataset as ds +from mindspore.communication import get_rank, get_group_size + +def create_dataset(batch_size): + dataset_path = os.getenv("DATA_PATH") + rank_id = get_rank() + rank_size = get_group_size() + dataset = ds.MnistDataset(dataset_path, num_shards=rank_size, shard_id=rank_id) + image_transforms = [ + ds.vision.Rescale(1.0 / 255.0, 0), + ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)), + ds.vision.HWC2CHW() + ] + label_transform = ds.transforms.TypeCast(ms.int32) + dataset = dataset.map(image_transforms, 'image') + dataset = dataset.map(label_transform, 'label') + dataset = dataset.batch(batch_size) + return dataset + +data_set = create_dataset(32) +``` + +## 定义网络 + +数据并行模式下,网络定义方式与单卡网络写法一致,网络的主要结构如下: + +```python +from mindspore import nn + +class Network(nn.Cell): + def __init__(self): + super().__init__() + self.flatten = nn.Flatten() + self.dense_relu_sequential = nn.SequentialCell( + nn.Dense(28*28, 512, weight_init="normal", bias_init="zeros"), + nn.ReLU(), + nn.Dense(512, 512, weight_init="normal", bias_init="zeros"), + nn.ReLU(), + nn.Dense(512, 10, weight_init="normal", bias_init="zeros") + ) + + def construct(self, x): + x = self.flatten(x) + logits = self.dense_relu_sequential(x) + return logits + +net = Network() +``` + +## 训练网络 + +在这一步,我们需要定义损失函数、优化器以及训练过程。与单卡模型不同的地方在于,数据并行模式还需要增加`mindspore.nn.DistributedGradReducer()`接口,来对所有卡的梯度进行聚合,该接口第一个参数为需要更新的网络参数: + +```python +from mindspore import nn +import mindspore as ms + +loss_fn = nn.CrossEntropyLoss() +optimizer = nn.SGD(net.trainable_params(), 1e-2) + +def forward_fn(data, label): + logits = net(data) + loss = loss_fn(logits, label) + return loss, logits + +grad_fn = ms.value_and_grad(forward_fn, None, net.trainable_params(), has_aux=True) +grad_reducer = nn.DistributedGradReducer(optimizer.parameters) + +for epoch in range(10): + i = 0 + for data, label in data_set: + (loss, _), grads = grad_fn(data, label) + grads = grad_reducer(grads) + optimizer(grads) + if i % 10 == 0: + print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss)) + i += 1 +``` + +> 此处也可以用[Model.train](https://www.mindspore.cn/docs/zh-CN/master/api_python/train/mindspore.train.Model.html#mindspore.train.Model.train)的方式进行训练。 + +## 运行单机8卡脚本 + +接下来通过命令调用对应的脚本,以8卡的分布式训练脚本为例,使用`mpirun`启动方式进行分布式训练: + +```bash +bash run.sh +``` + +训练完后,日志文件保存到`log_output`目录下,其中部分文件目录结构如下: + +```text +└─ log_output + └─ 1 + ├─ rank.0 + | └─ stdout + ├─ rank.1 + | └─ stdout +... +``` + +关于Loss部分结果保存在`log_output/1/rank.*/stdout`中,示例如下: + +```text +epoch: 0 step: 0, loss is 2.3084016 +epoch: 0 step: 10, loss is 2.3107638 +epoch: 0 step: 20, loss is 2.2864391 +epoch: 0 step: 30, loss is 2.2938071 +... +``` + +其他启动方式如动态组网、`rank table`的启动可参考[启动方式](https://www.mindspore.cn/docs/zh-CN/master/model_train/parallel/startup_method.html)。 \ No newline at end of file diff --git a/tutorials/source_zh_cn/parallel/optimizer_parallel.md b/tutorials/source_zh_cn/parallel/optimizer_parallel.md new file mode 100644 index 0000000000000000000000000000000000000000..f4d6f10341215b29e5d19db8aa21b073b01d007b --- /dev/null +++ b/tutorials/source_zh_cn/parallel/optimizer_parallel.md @@ -0,0 +1,182 @@ +# 优化器并行 + +下面以Ascend或者GPU单机8卡为例,进行优化器并行操作说明: + +## 样例代码说明 + +> 下载完整的样例代码:[distributed_optimizer_parallel](https://gitee.com/mindspore/docs/tree/r2.5.0/docs/sample_code/distributed_optimizer_parallel)。 + +目录结构如下: + +```text +└─ sample_code + ├─ distributed_optimizer_parallel + ├── distributed_optimizer_parallel.py + └── run.sh + ... +``` + +其中,`distributed_optimizer_parallel.py`是定义网络结构和训练过程的脚本。`run.sh`是执行脚本。 + +## 配置分布式环境 + +通过context接口指定运行模式、运行设备、运行卡号等,与单卡脚本不同,并行脚本还需init初始化HCCL或NCCL通信。 + +```python +import mindspore as ms +from mindspore.communication import init + +ms.set_context(mode=ms.GRAPH_MODE) +init() +ms.set_seed(1) +``` + +## 数据集加载 + +在优化器并行场景下,数据集加载方式与单卡加载方式一致,代码如下: + +```python +import os +import mindspore.dataset as ds + +def create_dataset(batch_size): + """create dataset""" + dataset_path = os.getenv("DATA_PATH") + dataset = ds.MnistDataset(dataset_path) + image_transforms = [ + ds.vision.Rescale(1.0 / 255.0, 0), + ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)), + ds.vision.HWC2CHW() + ] + label_transform = ds.transforms.TypeCast(ms.int32) + dataset = dataset.map(image_transforms, 'image') + dataset = dataset.map(label_transform, 'label') + dataset = dataset.batch(batch_size) + return dataset + +data_set = create_dataset(32) +``` + +## 定义网络 + +优化器并行网络结构与单卡网络结构基本一致,区别在于增加了通信算子融合的配置: + +```python +from mindspore import nn + +class Network(nn.Cell): + def __init__(self): + super().__init__() + self.flatten = nn.Flatten() + self.layer1 = nn.Dense(28*28, 512) + self.layer2 = nn.Dense(512, 512) + self.layer3 = nn.Dense(512, 10) + self.relu = nn.ReLU() + + def construct(self, x): + x = self.flatten(x) + x = self.layer1(x) + x = self.relu(x) + x = self.layer2(x) + x = self.relu(x) + logits = self.layer3(x) + return logits + +net = Network() +net.layer1.set_comm_fusion(0) +net.layer2.set_comm_fusion(1) +net.layer3.set_comm_fusion(2) +``` + +> 这里为了减少通信成本,为不同层配置了通信融合,详细可以参考[通信算子融合](https://www.mindspore.cn/docs/zh-CN/r2.5.0/model_train/parallel/comm_fusion.html)。 + +## 训练网络定义 + +在这一步,我们需要定义损失函数、优化器以及训练步骤,这部分与单卡写法一致: + +```python +import mindspore as ms +from mindspore import nn + +optimizer = nn.SGD(net.trainable_params(), 1e-2) +loss_fn = nn.CrossEntropyLoss() + +def forward_fn(data, target): + logits = net(data) + loss = loss_fn(logits, target) + return loss, logits + +grad_fn = ms.value_and_grad(forward_fn, None, net.trainable_params(), has_aux=True) + +@ms.jit +def train_step(inputs, targets): + (loss_value, _), grads = grad_fn(inputs, targets) + optimizer(grads) + return loss_value + +``` + +## 并行配置 + +我们需要进一步设置并行有关的配置,指定并行模式`semi_auto`为半自动并行模式,此外,还需开启优化器并行,配置`hsdp`。 + +```python +parallel_net = AutoParallel(train_step, parallel_mode="semi_auto") +parallel_net.hsdp() + +``` + +## 训练循环 + +这一步进行训练循环,外层循环是训练的epoch数,内层循环遍历数据集,调用parallel_net进行训练并获得损失值。 + +```python + +for epoch in range(10): + i = 0 + for image, label in data_set: + loss_output = parallel_net(image, label) + if i % 10 == 0: + print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss_output)) + i += 1 +``` + +## 运行单机8卡脚本 + +接下来通过命令调用对应的脚本,以`mpirun`启动方式,8卡的分布式训练脚本为例,进行分布式训练: + +```bash +bash run.sh +``` + +训练完后,日志文件保存到`log_output`目录下,其中部分文件目录结构如下: + +```text +└─ log_output + └─ 1 + ├─ rank.0 + | └─ stdout + ├─ rank.1 + | └─ stdout +... +``` + +结果保存在`log_output/1/rank.*/stdout`中,示例如下: + +```text +epoch: 0, step: 0, loss is 2.3024087 +epoch: 0, step: 10, loss is 2.2921634 +epoch: 0, step: 20, loss is 2.278274 +epoch: 0, step: 30, loss is 2.2537143 +epoch: 0, step: 40, loss is 2.1638 +epoch: 0, step: 50, loss is 1.984318 +epoch: 0, step: 60, loss is 1.6061916 +epoch: 0, step: 70, loss is 1.20966 +epoch: 0, step: 80, loss is 0.98156196 +epoch: 0, step: 90, loss is 0.77229893 +epoch: 0, step: 100, loss is 0.6854114 +... +``` + +其他启动方式如动态组网、`rank table`的启动可参考[启动方式](https://www.mindspore.cn/docs/zh-CN/r2.5.0/model_train/parallel/startup_method.html)。 + diff --git a/tutorials/source_zh_cn/parallel/pipeline_parallel.md b/tutorials/source_zh_cn/parallel/pipeline_parallel.md new file mode 100644 index 0000000000000000000000000000000000000000..d973e22db861847172de4a8e7cb4cb38986bc207 --- /dev/null +++ b/tutorials/source_zh_cn/parallel/pipeline_parallel.md @@ -0,0 +1,429 @@ +# 流水线并行 +## 训练操作实践 + +下面以Ascend或者GPU单机8卡为例,进行流水线并行操作说明: + +### 样例代码说明 + +> 下载完整的样例代码:[distributed_pipeline_parallel](https://gitee.com/mindspore/docs/tree/r2.5.0/docs/sample_code/distributed_pipeline_parallel)。 + +目录结构如下: + +```text +└─ sample_code + ├─ distributed_pipeline_parallel + ├── distributed_pipeline_parallel.py + └── run.sh + ... +``` + +其中,`distributed_pipeline_parallel.py`是定义网络结构和训练过程的脚本。`run.sh`是执行脚本。 + +### 配置分布式环境 + +通过context接口指定运行模式、运行设备、运行卡号等,与单卡脚本不同,并行脚本还需init初始化HCCL或NCCL通信。 + +```python +import mindspore as ms +from mindspore.communication import init + +ms.set_context(mode=ms.GRAPH_MODE) +init() +ms.set_seed(1) +``` + + + +### 数据集加载 + +在流水线并行场景下,数据集加载方式与单卡加载方式一致,代码如下: + +```python +import os +import mindspore.dataset as ds + +def create_dataset(batch_size): + dataset_path = os.getenv("DATA_PATH") + dataset = ds.MnistDataset(dataset_path) + image_transforms = [ + ds.vision.Rescale(1.0 / 255.0, 0), + ds.vision.Normalize(mean=(0.1307,), std=(0.3081,)), + ds.vision.HWC2CHW() + ] + label_transform = ds.transforms.TypeCast(ms.int32) + dataset = dataset.map(image_transforms, 'image') + dataset = dataset.map(label_transform, 'label') + dataset = dataset.batch(batch_size) + return dataset + +data_set = create_dataset(32) +``` + +### 定义网络 + +流水线并行网络结构与单卡网络结构基本一致。需要注意的是, + +> - 在pipeline并行下,使能Print/Summary/TensorDump相关算子时,需要把该算子放到有pipeline_stage属性的Cell中使用,否则有概率由pipeline并行切分导致算子不生效。 +> - 在pipeline并行下,网络的输出不支持动态shape。 + +```python +from mindspore import nn, ops, Parameter +from mindspore.common.initializer import initializer, HeUniform + +import math + +class MatMulCell(nn.Cell): + """ + MatMulCell definition. + """ + def __init__(self, param=None, shape=None): + super().__init__() + if shape is None: + shape = [28 * 28, 512] + weight_init = HeUniform(math.sqrt(5)) + self.param = Parameter(initializer(weight_init, shape), name="param") + if param is not None: + self.param = param + self.print = ops.Print() + self.matmul = ops.MatMul() + + def construct(self, x): + out = self.matmul(x, self.param) + self.print("out is:", out) + return out + + +class Network(nn.Cell): + def __init__(self): + super().__init__() + self.flatten = nn.Flatten() + self.layer1 = MatMulCell() + self.relu1 = nn.ReLU() + self.layer2 = nn.Dense(512, 512) + self.relu2 = nn.ReLU() + self.layer3 = nn.Dense(512, 10) + + def construct(self, x): + x = self.flatten(x) + x = self.layer1(x) + x = self.relu1(x) + x = self.layer2(x) + x = self.relu2(x) + logits = self.layer3(x) + return logits + + +# net = Network() +# net.layer1.pipeline_stage = 0 +# net.relu1.pipeline_stage = 0 +# net.layer2.pipeline_stage = 0 +# net.relu2.pipeline_stage = 1 +# net.layer3.pipeline_stage = 1 +``` + +使能interleaved pipeline调度,`pipeline_stage`的非连续模型层需要进行交错式配置,配置如下: + +```python +net.layer1.pipeline_stage = 0 +net.relu1.pipeline_stage = 1 +net.layer2.pipeline_stage = 0 +net.relu2.pipeline_stage = 1 +net.layer3.pipeline_stage = 1 +``` + +stage0有第0层和第2层,stage1有第1层、第3层和第4层。 + +### 训练网络定义 + +在这一步,我们需要定义损失函数、优化器以及训练过程,与单卡模型不同,在这部分需要调用两个接口来配置流水线并行: + +- 首先需要定义LossCell,本例中调用了`nn.WithLossCell`接口封装网络和损失函数。 +- 然后需要在LossCell外包一层`nn.Pipeline`,并指定MicroBatch的size,并通过stage_config配置每个包含训练参数的`Cell`的`pipeline_stage`。详细请参考本章概述中的相关接口。 + +除此之外, 还需要增加 `nn.PipelineGradReducer` 接口,用于处理流水线并行下的梯度,该接口的第一个参数为需要更新的网络参数。 + +```python +import mindspore as ms +from mindspore.parallel import nn, ops + +optimizer = nn.SGD(net.trainable_params(), 1e-2) +loss_fn = nn.CrossEntropyLoss() +net_with_loss = nn.Pipeline(nn.WithLossCell(net, loss_fn), 4) +net_with_loss.set_train() + +def forward_fn(inputs, target): + loss = net_with_loss(inputs, target) + return loss + +grad_fn = ops.value_and_grad(forward_fn, None, optimizer.parameters) +pp_grad_reducer = nn.PipelineGradReducer(optimizer.parameters, opt_shard=True) + +@ms.jit +def train_one_step(inputs, target): + loss, grads = grad_fn(inputs, target) + grads = pp_grad_reducer(grads) + optimizer(grads) + return loss, grads + +``` + +## 并行配置 + +我们需要进一步设置并行有关的配置,指定并行模式`semi_auto`为半自动并行模式,此外,还需开启流水线并行,配置`pipeline`,并通过配置`stages`数来指定stage的总数。 + +```python +parallel_net = AutoParallel(train_step, parallel_mode="semi_auto") +parallel_net.pipeline(stages=2) + +``` +## 训练循环 + +这一步进行训练循环,外层循环是训练的epoch数,内层循环遍历数据集,调用parallel_net进行训练并获得损失值。 + +```python +for epoch in range(10): + i = 0 + for data, label in data_set: + loss, grads = train_one_step(data, label) + if i % 10 == 0: + print("epoch: %s, step: %s, loss is %s" % (epoch, i, loss)) + i += 1 +``` + +> 目前流水线并行不支持自动混合精度特性。 +> +> 流水线并行训练更适合用`model.train`的方式,这是因为流水线并行下的TrainOneStep逻辑复杂,而`model.train`内部封装了针对流水线并行的TrainOneStepCell,易用性更好。 + +### 运行单机8卡脚本 + +接下来通过命令调用对应的脚本,以`mpirun`启动方式,8卡的分布式训练脚本为例,进行分布式训练: + +```bash +bash run.sh +``` + +训练完后,日志文件保存到`log_output`目录下,其中部分文件目录结构如下: + +```text +└─ log_output + └─ 1 + ├─ rank.0 + | └─ stdout + ├─ rank.1 + | └─ stdout +... +``` + +结果保存在`log_output/1/rank.*/stdout`中,示例如下: + +```text +epoch: 0 step: 0, loss is 9.137518 +epoch: 0 step: 10, loss is 8.826559 +epoch: 0 step: 20, loss is 8.675843 +epoch: 0 step: 30, loss is 8.307994 +epoch: 0 step: 40, loss is 7.856993 +epoch: 0 step: 50, loss is 7.0662785 +... +``` + +`Print` 算子的结果为: + +```text +out is: +Tensor(shape=[8, 512], dtype=Float32, value= +[[ 4.61914062e-01 5.78613281e-01 1.34995094e-01 ... 8.54492188e-02 7.91992188e-01 2.13378906e-01] +... +[ 4.89746094e-01 3.56689453e-01 -4.90966797e-01 ... -3.30078125e-e01 -2.38525391e-01 7.33398438e-01]]) +``` + +其他启动方式如动态组网、`rank table`的启动可参考[启动方式](https://www.mindspore.cn/docs/zh-CN/r2.5.0/model_train/parallel/startup_method.html)。 + +## 推理操作实践 + +下面以Ascend或者GPU单机8卡为例,进行流水线并行操作说明: + +### 样例代码说明 + +> 下载完整的样例代码:[distributed_pipeline_parallel](https://gitee.com/mindspore/docs/tree/r2.5.0/docs/sample_code/distributed_pipeline_parallel)。 + +目录结构如下: + +```text + +└─ sample_code + ├─ distributed_pipeline_parallel + ├── distributed_pipeline_parallel_inference.py + └── run_inference.sh + ... + +``` + +其中,`distributed_pipeline_parallel_inference.py`是定义网络结构和推理过程的脚本。`run_inference.sh`是执行脚本。 + +### 配置分布式环境 + +通过context接口指定运行模式、运行设备、运行卡号等,与单卡脚本不同,并行脚本还需通过init初始化HCCL或NCCL通信。 + + + + +```python + +import mindspore as ms +from mindspore.communication import init + +ms.set_context(mode=ms.GRAPH_MODE) +# ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, dataset_strategy="full_batch", +# pipeline_stages=4, pipeline_result_broadcast=True) +init() +ms.set_seed(1) + +``` + +### 定义网络 + +流水线并行需要用户去定义并行的策略,通过调用`pipeline_stage`接口来指定每个layer要在哪个stage上去执行。`pipeline_stage`接口的粒度为`Cell`。所有包含训练参数的`Cell`都需要配置`pipeline_stage`,并且`pipeline_stage`要按照网络执行的先后顺序,从小到大进行配置。在单卡模型基础上,增加`pipeline_stage`配置后如下: + +```python + +import numpy as np +from mindspore import lazy_inline, nn, ops, Tensor, Parameter, sync_pipeline_shared_parameters + +class VocabEmbedding(nn.Cell): + """Vocab Embedding""" + def __init__(self, vocab_size, embedding_size): + super().__init__() + self.embedding_table = Parameter(Tensor(np.ones([vocab_size, embedding_size]), ms.float32), + name='embedding_table') + self.gather = ops.Gather() + + def construct(self, x): + output = self.gather(self.embedding_table, x, 0) + output = output.squeeze(1) + return output, self.embedding_table.value() + + +class Head(nn.Cell): + def __init__(self): + super().__init__() + self.matmul = ops.MatMul(transpose_b=True) + + def construct(self, state, embed): + return self.matmul(state, embed) + + +class Network(nn.Cell): + """Network""" + @lazy_inline + def __init__(self): + super().__init__() + self.word_embedding = VocabEmbedding(vocab_size=32, embedding_size=32) + self.layer1 = nn.Dense(32, 32) + self.layer2 = nn.Dense(32, 32) + self.head = Head() + + def construct(self, x): + x, embed = self.word_embedding(x) + x = self.layer1(x) + x = self.layer2(x) + x = self.head(x, embed) + return x + +# # Define network and set pipeline stage +# net = Network() +# net.word_embedding.pipeline_stage = 0 +# net.layer1.pipeline_stage = 1 +# net.layer2.pipeline_stage = 2 +# net.head.pipeline_stage = 3 + +``` + +### 推理网络 + +在network外包一层`PipelineCellInference`,并指定MicroBatch的size。`PipelineCellInference`中将输入切分为若干个micro batch,执行推理网络,最后将若干个micro batch推理结果通过`ops.Concat`算子沿batch轴拼接后返回。 + +在上一步中,`embed`被`self.word_embedding`和`self.head`两层共享,并且这两层被切分到了不同的stage上。在执行推理前,先编译计算图`inference_network.compile()`,再调用`sync_pipeline_shared_parameters(inference_network)`接口,框架自动同步stage间的共享权重。 + +```python + +from mindspore import nn, ops + +class PipelineCellInference(nn.Cell): + """Pipeline Cell Inference wrapper""" + def __init__(self, network, micro_batch_num): + super().__init__() + self.network = network + self.micro_batch_num = micro_batch_num + self.concat = ops.Concat() + + def construct(self, x): + """Apply the pipeline inference""" + ret = () + for i in range(self.micro_batch_num): + micro_batch_size = x.shape[0] // self.micro_batch_num + start = micro_batch_size * i + end = micro_batch_size * (i + 1) + + micro_input = x[start:end] + micro_output = self.network(micro_input) + ret = ret + (micro_output,) + + ret = self.concat(ret) + return ret + +inference_network = PipelineCellInference(network=net, micro_batch_num=4) +inference_network.set_train(False) + +# Compile and synchronize shared parameter. +input_ids = Tensor(np.random.randint(low=0, high=32, size=(8, 1)), ms.int32) +inference_network.compile(input_ids) +sync_pipeline_shared_parameters(inference_network) + +# Execute the inference network +logits = inference_network(input_ids) +print(logits.asnumpy()) + +``` + +### 运行单机8卡脚本 + +接下来通过命令调用对应的脚本,以`msrun`启动方式,8卡的分布式推理脚本为例,进行分布式训练: + +```bash + +bash run_inference.sh + +``` + +训练完后,日志文件保存到`pipeline_inference_logs`目录下,其中部分文件目录结构如下: + +```text + +└─ pipeline_inference_logs + ├── scheduler.log + ├── worker_0.log + ├── worker_1.log + ├── worker_2.log +... + +``` + +结果保存在`pipeline_inference_logs/worker_0.log`中,示例如下: + +```text + +[[0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 + 0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 + 0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 + 0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 + 0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 0.01181556 + 0.01181556 0.01181556] + ...] + +``` \ No newline at end of file diff --git a/tutorials/source_zh_cn/parallel/startup_methods.md b/tutorials/source_zh_cn/parallel/startup_methods.md new file mode 100644 index 0000000000000000000000000000000000000000..752bdc073ec49adf9ae04bd4bc283dc659fe42bf --- /dev/null +++ b/tutorials/source_zh_cn/parallel/startup_methods.md @@ -0,0 +1,10 @@ +# 分布式并行启动方式启动方式 + +MindSpore目前支持四种启动方式: + +- **msrun**:是动态组网的封装,允许用户使用单命令行指令在各节点拉起分布式任务,安装MindSpore后即可使用不依赖外部配置或者模块,支持Ascend/GPU/CPU。 +- **动态组网**:通过MindSpore内部动态组网模块启动,不依赖外部配置或者模块,支持Ascend/GPU/CPU。 +- **mpirun**:通过多进程通信库OpenMPI启动,支持Ascend/GPU。 +- **rank table**:配置rank_table表后,通过脚本启动和卡数对应的进程,支持Ascend。 + +详细可参考[分布式并行启动方式](https://www.mindspore.cn/docs/zh-CN/master/model_train/parallelstartup_method.html)章节。 \ No newline at end of file