diff --git a/tutorials/experts/source_en/index.rst b/tutorials/experts/source_en/index.rst index 7f7a01d2b709851fbdd684cbbfe40ef4ee213ee2..4be570fbe5a3ac83950afa134a59e64774409f9c 100644 --- a/tutorials/experts/source_en/index.rst +++ b/tutorials/experts/source_en/index.rst @@ -79,6 +79,7 @@ For Experts parallel/save_load parallel/fault_recover parallel/multi_dimensional + parallel/resilience_train_and_predict parallel/other_features .. toctree:: diff --git a/tutorials/experts/source_en/parallel/fault_recover.md b/tutorials/experts/source_en/parallel/fault_recover.md index 820b4dbd1287b1de69f552fabcfc19573c8dda41..435c4262b2b687b65c91f49560d958f8ac8ae463 100644 --- a/tutorials/experts/source_en/parallel/fault_recover.md +++ b/tutorials/experts/source_en/parallel/fault_recover.md @@ -85,7 +85,7 @@ rank_list = ms.restore_group_info_list("./ckpt_dir0/group_info.pb") print(rank_list) // [0, 4] ``` -Distributed fault recovery requires prior access to the slicing scores, thus, it is necessary to first call [model.build](https://www.mindspore.cn/docs/zh-CN/master/api_python/train/mindspore.train.Model.html# mindspore.train.model.build) to compile and then perform the training. +Distributed fault recovery requires prior access to the slicing scores, thus, it is necessary to first call [model.build](https://www.mindspore.cn/docs/zh-CN/master/api_python/train/mindspore.train.Model.html#mindspore.train.model.build) to compile and then perform the training. ```python import os @@ -102,7 +102,7 @@ def recover_train(): ## Running the Code -First, please refer to the [Preparation Session](https://www.mindspore.cn/tutorials/experts/en/master/parallel/transformer.html#Preparation) in the Distributed Parallel Training Transformer Model tutorial to prepare the dataset. +First, please refer to the [Preparation Session](https://www.mindspore.cn/tutorials/experts/en/master/parallel/transformer.html#preparation) in the Distributed Parallel Training Transformer Model tutorial to prepare the dataset. After entering the code directory, execute the training script that saves the slice weights. ```bash diff --git a/tutorials/experts/source_en/parallel/introduction.md b/tutorials/experts/source_en/parallel/introduction.md index 1fa1908513fd2e35d8f1ac17c434f899090e67ec..e1741c9f654b67848db12dcd47e169a532c91c94 100644 --- a/tutorials/experts/source_en/parallel/introduction.md +++ b/tutorials/experts/source_en/parallel/introduction.md @@ -58,7 +58,7 @@ The following involves automatic parallel interfaces, such as the interface conf | Parallel modes | Configuration | Dynamic graph | Static graph | Supported devices | | ---------- | ------ | ------ | ---------- | ---------- | -| Data parallel | DATA_PARALLEL | Support | Not support | GPU, Ascend 910 | +| Data parallel | DATA_PARALLEL | Support | Support | CPU, GPU, Ascend 910 | | Semi-automatic parallel | SEMI_AUTO_PARALLEL | Not support | Support | GPU, Ascend 910 | | Automatic parallel | AUTO_PARALLEL | Not support | Support | GPU, Ascend 910 | | Hybrid parallel | HYBRID_PARALLEL | Not support | Support | GPU, Ascend 910 | @@ -270,10 +270,10 @@ Currently GPU and Ascend support multiple startup methods respectively. The two - Multi-process startup method. The user needs to start the processes corresponding to the number of cards, as well as configure the rank_table table. You can visit [Running Script](https://www.mindspore.cn/tutorials/experts/en/master/parallel/train_ascend.html#running-the-script) to learn how to start multi-card tasks by multi-processing. - OpenMPI. The user can start running the script with the mpirun command, at which point the user needs to provide the host file. Users can visit [Run Scripts via OpenMPI](https://www.mindspore.cn/tutorials/experts/en/master/parallel/train_ascend.html#running-the-script-through-openmpi) to learn how to use OpenMPI to start multi-card tasks. -| | GPU | Ascend| -| ------------ | ---- | ----- | -| OpenMPI | support | not support | -| Multi-process startup | not support | support | +| | GPU | Ascend| CPU | +| ------------ | ---- | ----- | ------------ | +| OpenMPI | Support | Support | Not support | +| Multi-process startup | Not support | Support | Support | ## Data Import Method diff --git a/tutorials/experts/source_en/parallel/multi_dimensional.rst b/tutorials/experts/source_en/parallel/multi_dimensional.rst index 7f8a08c5bbf15998574d1edef07afb48fb5a2020..3e7e07a052c2220478a908a1f43edbca89d978f4 100644 --- a/tutorials/experts/source_en/parallel/multi_dimensional.rst +++ b/tutorials/experts/source_en/parallel/multi_dimensional.rst @@ -8,6 +8,7 @@ Multi Dimensional :maxdepth: 1 :hidden: + operator_parallel pipeline_parallel optimizer_parallel host_device_training @@ -44,8 +45,8 @@ MindSpore provides the following advanced features to support distributed training of large models, and users can flexibly combine them according to their own needs. -Operator Parallel ------------------ +`Operator Parallel `__ +--------------------------------------------------------------------------------------------------------------- Operator-level parallelism is a distributed computation of operators by splitting their input tensors into multiple devices in units. On the one diff --git a/tutorials/experts/source_en/parallel/operator_parallel.md b/tutorials/experts/source_en/parallel/operator_parallel.md new file mode 100644 index 0000000000000000000000000000000000000000..2b1d660f5311c569498206ef20a95761499744ea --- /dev/null +++ b/tutorials/experts/source_en/parallel/operator_parallel.md @@ -0,0 +1,123 @@ +# Operator-level Parallelism + + + +## Overview + +With the development of deep learning, network models are becoming larger and larger, such as trillions of parametric models have emerged in the field of NLP, and the model capacity far exceeds the memory capacity of a single device, making it impossible to train on a single card or data parallel. Operator-level parallelism is used to reduce the memory consumption of individual devices by sharding the tensor involved in each operator in the network model, thus making the training of large models possible. + +## Basic Principle + +MindSpore models each operator independently, and the user can set the shard strategy for each operator in the forward network (the unset operators are sharded by data parallelism by default). + +In the graph construction phase, the framework will traverse the forward graph, and shard and model each operator and its input tensor according to the shard strategy of the operator, such that the compute logic of that operator remains mathematically equivalent before and after the sharding. The framework internally uses Tensor Layout to express the distribution of the input and output tensors in the cluster. The Tensor Layout contains the mapping relationship between the tensor and the device, and the user does not need to perceive how each slice of the model is distributed in the cluster. The framework will automatically schedule the distribution. The framework will also traverse the Tensor Layout of the tensor between adjacent operators. If the output tensor of the previous operator is used as the input tensor of the next operator, and the Tensor Layout of the output tensor in the previous operator is different from that of the input tensor in the next operator, tensor redistribution is required between the two operators. For the training network, after the framework processes the distributed sharding of the forward operator, it can automatically complete the distributed sharding of the inverse operator by relying on the automatic differentiation capability of the framework. + +Tensor Layout is used to describe the distribution information about the Tensor in the cluster. Tensor can be sliced into clusters by certain dimensions and can also be replicated on clusters. In the following example, a two-dimensional matrix is sliced into two nodes in three ways: row slicing, column slicing and replication (each slicing corresponds to a Tensor Layout), as shown in the following figure: + +If the two-dimensional matrix is sliced to four nodes, there are four types of slices: simultaneously slices both row and column, replication, row slicing + replication, and column slicing + replication, as shown below: + +Tensor Redistribution is used to handle the conversion between different Tensor Layout, which can convert the Tensor from one layout to another in the cluster. All redistribution operations are decomposed into combinations of operators such as "set communication+split+concat". The following two figures illustrate several Tensor Redistribution operations. + +*Figure: Tensor is sliced to redistribution of two nodes* + +*Figure: Tensor is sliced to redistribution of four nodes* + +Users can set the sharding strategy of the operator by using the shard() interface, which describes how each dimension of each input tensor of the operator is sliced. For example, MatMul.shard(((a, b), (b, c))) means that MatMul has two input tensors, and the rows of the first input tensor are uniformly sliced in a copies and the columns are uniformly sliced in b copies. The rows of the second input tensor are uniformly sliced in b copies and the columns are uniformly sliced in a copies. + +```python +import mindspore.nn as nn +from mindspore import ops +import mindspore as ms + +ms.set_auto_parallel_context(parallel_mode="semi_auto_parallel", device_num=4) + +class DenseMatMulNet(nn.Cell): + def __init__(self): + super(DenseMutMulNet, self).__init__() + self.matmul1 = ops.MatMul.shard(((4, 1), (1, 1))) + self.matmul2 = ops.MatMul.shard(((1, 1), (1, 4))) + def construct(self, x, w, v): + y = self.matmul1(x, w) + z = self.matmul2(y, v) + return z +``` + +In the above example, the user computes two consecutive two-dimensional matrix multiplications on 4 cards: `Z = (X * W) * V` . For the first matrix multiplication `Y = X * W`, the user wants to slice X by rows in 4 parts (i.e. data parallelism), while for the second matrix multiplication `Z = Y * V`, the user wants to slice V by columns in 4 parts (i.e. model parallelism): + +Since the Tensor Layout output from the first operator is the 0th dimensional sliced to the cluster, while the second operator requires the first input Tensor to be replicated on the cluster. So in the graph compilation stage, the difference in Tensor Layout between the two operator outputs/inputs is automatically recognized, thus the algorithm for Tensor redistribution is automatically derived. The Tensor redistribution required for this example is an AllGather operator (note: MindSpore AllGather operator automatically merges multiple input Tensors in dimension 0) + +## Special Instructions + +In operator-level parallelism, to meet the requirements of different scenarios, some operators can configure their distributed implementations through the add_prim_attr() interface, and these configurations are only available for the `SEMI_AUTO_PARALLEL` and `AUTO_PARALLEL` modes: + +- Gather operator: add_prim_attr("manual_split", split_tuple). This interface configures non-uniform slicing to the first input of the Gather operator, which is only valid for axis=0. `split_tuple` is a tuple of type int. The sum of the elements must be equal to the length of the 0th dimension in the first input to the Gather operator, and the number of tuples must be equal to the number of slices of the 0th dimension in the first input to the Gather operator. +- Gather operator: add_prim_attr("primitive_target", "CPU"). This interface configures Gather operator to execute on the CPU for heterogeneous scenarios. + +## Operation Practices + +The following is an illustration of operator-level parallelism by taking an Ascend single 8-card as an example. + +### Sample Code Description + +> Download the complete sample code here: [operator_parallel](https://gitee.com/mindspore/docs/tree/master/docs/sample_code/operator_parallel). + +The directory structure is as follows: + +```text +└─sample_code + ├─operator_parallel + │ rank_table_8pcs.json + │ train.py + │ run.sh + ... +``` + +The `rank_table_8pcs.json` is the networking information file to configure the Ascend 8 card environment, the `train.py` file is the script to define the network structure, and `run.sh` is the execution script. + +### Configuring the Distributed Environment + +The configuration of the distributed environment can be found in: [Configuring Distributed Environment Variables](https://www.mindspore.cn/tutorials/experts/en/master/parallel/train_ascend.html#configuring-distributed-environment-variables) tutorial. + +### Defining the Network + +```python +from mindspore.nn import Cell +from mindspore.ops import operations as ops +import mindspore as ms +from mindspore.common.initializer import initializer + + +class Net(Cell): + def __init__(self): + super().__init__() + self.matmul = ops.MatMul().shard(((2, 4), (4, 1))) + self.weight = ms.Parameter(initializer("normal", [32, 16]), "w1") + + self.relu = ops.ReLU().shard(((8, 1),)) + + def construct(self, x): + out = self.matmul(x, self.weight) + out = self.relu(out) + return out +``` + +The above network has two operators, MatMul and ReLU. + +The sharding strategy for MatMul is: The rows of the first input are sliced to 2 copies and the columns to 4 copies, while the rows of the second input are sliced to 4 copies and the columns are not sliced. The sharding strategy for ReLU is: the rows of the first input are sliced to 8 copies and the columns are not sliced. + +### Running the Code + +Using the sample code, an 8-card operator-level parallel training script can be run with the following command: + +```bash +sh run.sh 8 +``` + +After execution, the following results can be seen in the log file corresponding to device0: + +```bash +epoch: 1 step:1, loss is 23.02248764038086 +epoch: 1 step:2, loss is 23.00420570373535 +epoch: 1 step:3, loss is 22.97960090637207 +epoch: 1 step:4, loss is 22.96306419372558 +``` diff --git a/tutorials/experts/source_en/parallel/resilience_train_and_predict.md b/tutorials/experts/source_en/parallel/resilience_train_and_predict.md new file mode 100644 index 0000000000000000000000000000000000000000..6b3d2bfa49e94575d9356ac2c6a33b7818d75952 --- /dev/null +++ b/tutorials/experts/source_en/parallel/resilience_train_and_predict.md @@ -0,0 +1,358 @@ +# Distributed Resilience Training and Inference + + + +## Overview + +### Background + +When using MindSpore for distributed training, it is often necessary to convert the distributed Checkpoint obtained from training for the next step, such as inference, fine-tuning, multi-stage training. This tutorial will introduce how to convert the Checkpoint obtained from distributed training to carry out resilient training and inference with distributed strategies and changed number of cards in the cluster. +This function only supports semi_auto_parallel/auto_parallel mode, and temporarily does not support pipeline parallel dimension conversion. + +### Usage Scenarios + +If you encounter the following scenarios, you need to refer to this tutorial for resilience training and inference. + +Scenario1: M number of cards perform training, while N number of cards perform fine-tuning training. M and N can have no multiplicative relationship. +Scenario2: The training is divided into multiple phases, each with a different cluster size. +Scenario3: M number of cards perform training, while N number of cards perform inference. M and N can have no multiplicative relationship. +Scenario4: Changes need to be made to the network's sharding strategy. + +Using the example of training on 8 cards and fine-tuning on 4 cards, the overall procedure is as follows: + +1. Execute training, configure the storage location of model parameter sharding strategy files, and automatically generate Checkpoint files and model parameter sharding strategy files. + +2. Compile fine-tuned networks, configure distributed strategy file storage locations, and automatically generate model parameter sharding strategy files. + +3. The user converts the saved Checkpoint file based on the strategy file involved in the training and inference. + +4. After compiling the fine-tuned network, load the distributed Checkpoint file obtained by the conversion. + +5. Perform fine-tuning of the network. + +Note that loading a distributed Checkpoint requires that the network be compiled before it can be loaded. + +> For dataset download, please refer to the [Preparation](https://www.mindspore.cn/tutorials/experts/en/master/parallel/transformer.html#preparation) in the Distributed Parallel Training Transformer Model tutorial. +> +> Download the complete sample code: [Distributed Resilience Training](https://gitee.com/mindspore/docs/tree/master/docs/sample_code/distributed_resilience_training). + +## Converting Distributed Checkpoint Files + +### Overall Process + +First, perform distributed training with parallel mode set to `semi_auto_parallel`/`auto_parallel`, and also custom `strategy_ckpt_save_file` parameter and configure the model sharding strategy file storage path by calling the `set_auto_parallel_context` interface. +After a period of training, the callback function that stores Checkpoint is called to store the distributed Checkpoint. And then compile the network under the new number of cards/sharding strategy, generate the model sharding strategy file of the target network, and call the distributed checkpoint conversion interface for distributed checkpoint conversion. + +### Executing the Distributed Training + +Define the network, perform distributed initialization, and get the number of devices and card numbers. For the non-pipelined parallel case, the content of the sharding strategy file is the same for each card, so just call `set_auto_parallel_context(strategy_ckpt_save_file=". /src_strategy.ckpt")` on card 0 to save the strategy file. + +Add a callback function to save Checkpoint, first define configuration object `CheckpointConfig` related to the Checkpoint storage. Note that `integrated_save` is configured to `False`, which means that aggregated saving is not performed on distributed training weights to accommodate the memory overhead under large models. +And then define the callback function `ModelCheckpoint` that saves the checkpoint. Finally, call `model.train` to perform the training. +For the basic usage of distributed training, please refer to [Distributed Training Ascend](https://www.mindspore.cn/tutorials/experts/en/master/parallel/train_ascend.html) or [Distributed Training GPU](https://www.mindspore.cn/tutorials/experts/en/master/parallel/train_gpu.html). + +```python +import mindspore as ms +from mindspore.train import Model, ModelCheckpoint, CheckpointConfig, TimeMonitor, LossMonitor +import mindspore.communication as D +D.init() +device_num = D.get_group_size() +rank_id = D.get_rank() +net = Net() +ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL) +if rank_id == 0: + ms.set_auto_parallel_context(strategy_ckpt_save_file="../src_strategy.ckpt") +opt = Momentum(learning_rate=0.01, momentum=0.9, params=net.get_parameters()) +model = Model(net, optimizer=opt) +ckpt_config = CheckpointConfig(save_checkpoint_steps=callback_size, keep_checkpoint_max=1, + integrated_save=False) +ckpoint_cb = ModelCheckpoint(prefix="src_checkpoint", + directory = "../src_checkpoints/rank_{}".format(rank_id), + config=ckpt_config) +callback = [TimeMonitor(callback_size), LossMonitor(callback_size), ckpoint_cb] +model.train(2, dataset, callbacks=callback, dataset_sink_mode=True) +``` + +- `dataset`: MindData objects, which need to be constructed in advance to feed into `model.train`. + +The 8-card training script execution command executed in the example is as follows: + +```bash +bash run_train_8p.sh ../output/wmt14.en_fr.txt +``` + +After execution, the source Checkpoint file directory and the source sharding strategy file will be generated: + +```text +src_checkpoints/ +src_strategy.ckpt +``` + +### Converting the Distributed Checkpoint + +#### Executing Compilation on the Target Network + +Perform the distributed checkpoint conversion, which depends on the original distributed strategy file and the target distributed strategy file. When the network training under the original strategy is executed, the distributed strategy file is already stored, so the distributed strategy file under the target strategy needs to be obtained separately. +The distributed strategy file of the target strategy network can be obtained by performing compilation on the network of the target strategy. Compilation is performed on the network alone by the `model.build` interface. + +```python +import mindspore as ms +from mindspore.train import Model +import mindspore.communication as D +D.init() +device_num = D.get_group_size() +rank_id = D.get_rank() +net = Net() +ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL) +if rank_id == 0: + ms.set_auto_parallel_context(strategy_ckpt_save_file="../dst_strategy.ckpt") +opt = Momentum(learning_rate=0.01, momentum=0.9, params=net.get_parameters()) +model = Model(net, optimizer=opt) +model.build(train_dataset=dataset, epoch=1) +``` + +- `dataset`: MindData objects, which need to be constructed in advance to feed into `model.train`. + +When the target network is for inference, `model.build` is replaced with `model.infer_preict_layout` to perform compilation. + +```python +import mindspore as ms +from mindspore.train import Model +import mindspore.communication as D +D.init() +device_num = D.get_group_size() +rank_id = D.get_rank() +net = Net() +ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL) +if rank_id == 0: + ms.set_auto_parallel_context(strategy_ckpt_save_file="../dst_strategy.ckpt") +opt = Momentum(learning_rate=0.01, momentum=0.9, params=net.get_parameters()) +model = Model(net, optimizer=opt) +model.infer_predict_layout(Tensor(np.ones(shape=data_shape))) +``` + +- `data_shape`: Shape of the inference data. + +The 4-card target network compilation command executed in the example is as follows: + +```bash +bash run_compile_4p.sh ../output/wmt14.en_fr.txt +``` + +After execution, the target sharding strategy files will be generated: + +```text +dst_strategy.ckpt +``` + +#### Performing Distributed Checkpoint Conversions + +In the example, the original strategy is trained with 8-card, model parallelism with 4-card, data parallelism with 2-card. The optimizer parallelism is turned on, and the strategy file is named `src_strategy.ckpt`. +The target strategy is trained with 4-card, model parallelism with 4-card, data parallelism with 1-card. The optimizer parallelism is turned off, and the strategy file is named `dst_stategy.ckpt`. + +Distributed Checkpoint provides two interfaces to convert Checkpoint. The first interface, `transform_checkpoints`, requires the user to place all Checkpoints in one directory, and the subdirectories must be named in the format "rank_0, rank_1, rank_2, ... ". +The user calls this interface to convert the entire directory directly. This approach is easier to use, but the memory overhead required for conversion is slightly higher. The second interface, `transform_checkpoint_by_rank`, is used to get the Checkpoint of a specific rank, which has more flexibility and lower memory overhead. +It needs to be used with the `rank_list_for_transform` interface to get which original Checkpoint is needed for the target Checkpoint of this rank. + +1. Use interface `transform_checkpoints`. + + ```python + import mindspore as ms + ms.transform_checkpoints(src_checkpoints_dir, dst_checkpoints_dir, + "transformed", src_strategy_file, dst_strategy_file) + ``` + + > The subdirectories in src_checkpoints_dir are required to be stored in the format "rank_x/checkpoint_x.ckpt". + + In the example, the script execution command for the conversion of the entire Checkpoint directory is: + + ```bash + python transform_checkpoint_dir.py --src_strategy_file=./src_strategy.ckpt --dst_strategy_file=./dst_strategy.ckpt --src_checkpoints_dir=./src_checkpoints --dst_checkpoints_dir=./dst_checkpoints + ``` + +2. Call `transform_checkpoint_by_rank` interface to merge the parameters of `transform_rank`. + + ```python + import os + import mindspore as ms + rank_list = ms.rank_list_for_transform(transform_rank, src_strategy_file, dst_strategy_file) + checkpoint_file_map = {} + for rank_id in rank_list: + checkpoint_file_map[rank_id] = os.path.join(src_checkpoints_dir, "rank_{}".format(rank_id), "src_checkpoint{}.ckpt".format(rank_id)) + save_checkpoint_path = os.path.join(dst_checkpoints_dir, "rank_{}".format(transform_rank), + "dst_checkpoint{}.ckpt".format(transform_rank)) + ms.transform_checkpoint_by_rank(transform_rank, checkpoint_file_map, save_checkpoint_path, + src_strategy_file, dst_strategy_file) + ``` + + In the example, the script execution command to convert Checkpoint by rank one by one is: + + ```bash + bash transform_by_rank.sh ./src_strategy.ckpt ./dst_strategy.ckpt ./src_checkpoints ./dst_checkpoints + ``` + +After execution, the following directory of converted target Checkpoint files will be generated: + +```text +dst_checkpoints/ +``` + +## Loading the Checkpoint Files Obtained from Conversion + +### Overall Process + +Compile the network for the target strategy and call the `load_checkpoint` interface to load the model parameter data from the converted Checkpoint file. + +### Compiling and Executing the Target Network + +Compile the network by using the `model.build` (for training) or `model.infer_predict_layout` (for inference) interfaces. At this time, the weight Shape is sliced in the compilation process. Call the `load_checkpoint` interface to load the model parameter data of each card from the Checkpoint file. + +The target network is the training scenario: + +```python +import mindspore as ms +from mindspore.train import Model +import mindspore.communication as D +D.init() +device_num = D.get_group_size() +rank_id = D.get_rank() +net = Net() +ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL) +if rank_id == 0: + ms.set_auto_parallel_context(strategy_ckpt_save_file="../dst_strategy.ckpt") +opt = Momentum(learning_rate=0.01, momentum=0.9, params=net.get_parameters()) +model = Model(net, optimizer=opt) +param_dict = ms.load_checkpoint(ckpt_file) +model.build(train_dataset=dataset, epoch=2) +ms.load_param_into_net(net, param_dict) +model.train(2, dataset, callbacks=callback, dataset_sink_mode=True) +``` + +- `ckpt_file`: The name of the Checkpoint model parameter file to be loaded. + +The target network is the inference scenario: + +```python +import mindspore as ms +from mindspore.train import Model +import mindspore.communication as D +D.init() +device_num = D.get_group_size() +rank_id = D.get_rank() +net = Net() +ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL) +if rank_id == 0: + ms.set_auto_parallel_context(strategy_ckpt_save_file="../dst_strategy.ckpt") +opt = Momentum(learning_rate=0.01, momentum=0.9, params=net.get_parameters()) +model = Model(net, optimizer=opt) +param_dict = ms.load_checkpoint(ckpt_file) +model.infer_predict_layout(predict_data) +ms.load_param_into_net(net, param_dict) +model.predict(2, predict_data) +``` + +- `predict_data`: Tensor data used to infer. + +In the example, the script execution command to load the converted Checkpoint for two-stage fine-tuning training is: + +```bash +bash run_train_4p.sh ../output/wmt14.en_fr.txt +``` + +After the execution is completed, the loss can be seen to decrease from 6.45. + +```text +epoch: 1 step: 73, loss is 6.45995 +epoch: 1 step: 73, loss is 6.13733 +``` + +## Pipeline Parallel Dimension Conversion + +[Pipeline Parallelism](https://www.mindspore.cn/tutorials/experts/en/master/parallel/pipeline_parallel.html) is to slice the linear network to get multiple sub-networks, which are pipelined between multiple cards. Therefore the sharding strategy file stored down for each subgraph is inconsistent, and all sharding strategies are aggregated to get the complete sharding information of the network. +Therefore, for the pipelined parallel dimensions, compared to the conversion of other dimensions, it is necessary to perform an aggregated shardig strategy file operation in advance to obtain the aggregated sharding strategy file, and use this file as the strategy file for the distributed Checkpoint conversion dependency. In addition, there is no difference from the previous section [Sharding Strategy Conversion](https://www.mindspore.cn/tutorials/experts/en/master/parallel/resilience_train_and_predict.html#converting-the-distributed-checkpoint). + +First, execute an 8-card pipeline parallel training, where the pipeline parallel dimension is 2, the operator-level model parallel dimension is 4, and the data parallel dimension is 1. + +```python +import mindspore as ms +import mindspore.communication as D +D.init() +device_num = D.get_group_size() +rank_id = D.get_rank() +net = Net() +net = PipelineCell(net, 4) # micro_batch=4 +ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.SEMI_AUTO_PARALLEL, pipeline_stages=2) +ms.set_auto_parallel_context(strategy_ckpt_save_file="../src_pipeline_strategys/src_strategy{}.ckpt") +opt = Momentum(learning_rate=0.01, momentum=0.9, params=net.get_parameters()) +model = ms.Model(net, optimizer=opt) +ckpt_config = ms.CheckpointConfig(save_checkpoint_steps=callback_size, keep_checkpoint_max=1, + integrated_save=False) +ckpoint_cb = ms.ModelCheckpoint(prefix="src_checkpoint", + directory = "../src_checkpoints/rank_{}".format(rank_id), + config=ckpt_config) +callback = [ms.TimeMonitor(callback_size), ms.LossMonitor(callback_size), ckpoint_cb] +model.train(2, dataset, callbacks=callback, dataset_sink_mode=True) +``` + +- `dataset`: MindData objects, which need to be constructed in advance to feed into `model.train`. + +The 8-card training script execution command executed in the example is as follows: + +```bash +bash run_train_8p_pipeline.sh ../output/wmt14.en_fr.txt +``` + +After execution, the source Checkpoint file directory and the source sharding strategy file will be generated: + +```text +src_checkpoints_pipeline/ +src_pipeline_strategys/ +``` + +Refer to "performing compilation on target network module" of [Sharding Strategy Conversion](https://www.mindspore.cn/tutorials/experts/en/master/parallel/resilience_train_and_predict.html#converting-the-distributed-checkpoint) section to also compile the target network to get the sharding strategy file for the target network. + +The 4-card target network compilation command executed in the example is as follows: + +```bash +bash run_compile_4p.sh ../output/wmt14.en_fr.txt +``` + +After execution, the target sharding strategy files will be generated: + +```text +dst_strategy.ckpt +``` + +The next step unfolds the distributed Checkpoint dimension conversion that contains the pipeline parallel dimension. First, the `merge_pipeline_strategys` interface is used to merge the sharding strategy files obtained from pipeline training, and then the distributed checkpoint conversion is performed by using the interface `transform_checkpoints` or `transform_checkpoint_by_rank`. + +The example gives the interface using `transform_checkpoints`. For the interface using `transform_checkpoint_by_rank` please refer to introduction in [sharding strategy conversion](https://www.mindspore.cn/tutorials/experts/en/master/parallel/resilience_train_and_predict.html#converting-the-distributed-checkpoint). + +```python +import mindspore as ms +ms.merge_pipeline_strategys(src_pipeline_strategys_dir, src_strategy_file) +ms.transform_checkpoints(src_checkpoints_dir, dst_checkpoints_dir, + "transformed", src_strategy_file, dst_strategy_file) +``` + +> The subdirectories in src_checkpoints_dir are required to be stored in the format "rank_x/checkpoint_x.ckpt". + +In the example, the script execution command for the entire Checkpoint directory conversion is: + +```bash +python transform_checkpoint_dir_pipeline.py --src_strategy_dir=./src_pipeline_strategys --dst_strategy_file=dst_strategy.ckpt --src_checkpoints_dir=./src_checkpoints --dst_checkpoints_dir=./dst_checkpoints +``` + +After the conversion is complete, refer to the [Execute Target Network Chapter](https://www.mindspore.cn/tutorials/experts/en/master/parallel/resilience_train_and_predict.html#loading-the-checkpoint-files-obtained-from-conversion). Load the distributed checkpoint obtained from the conversion and execute the distributed network without the pipeline dimension. + +In the example, the script execution command to load the converted Checkpoint for two-stage fine-tuning training is: + +```bash +bash run_train_4p.sh ../output/wmt14.en_fr.txt +``` + +After the execution is completed, the loss can be seen to decrease from 6.45. + +```text +epoch: 1 step: 73, loss is 6.45995 +epoch: 1 step: 73, loss is 6.13733 +``` diff --git a/tutorials/experts/source_zh_cn/parallel/operator_parallel.md b/tutorials/experts/source_zh_cn/parallel/operator_parallel.md index 7d83f9f7e930b96be4fc2e12a5d5dee894752bda..336de79ed6ba3f0546e4a10567c793f5ec55c689 100644 --- a/tutorials/experts/source_zh_cn/parallel/operator_parallel.md +++ b/tutorials/experts/source_zh_cn/parallel/operator_parallel.md @@ -49,7 +49,7 @@ class DenseMatMulNet(nn.Cell): return z ``` -在以上例子中,用户在4个卡上计算两个连续的二维矩阵乘:`Z = (X * W) * V` 。第一个矩阵乘`Y = X * W`,用户想把X按行切4份(即数据并行);而第二个矩阵乘`Z = Y * V`,用户想把V按列切4份(即模型并行): +在以上例子中,用户在4个卡上计算两个连续的二维矩阵乘:`Z = (X * W) * V` 。第一个矩阵乘`Y = X * W`,用户想把X按行切4份(即数据并行);而第二个矩阵乘`Z = Y * V`,用户想把V按列切4份(即模型并行): 由于第一个算子输出的Tensor Layout是第0维切分到集群,而第二个算子要求第一个输入Tensor在集群上复制。所以在图编译阶段,会自动识别两个算子输出/输入之间Tensor Layout的不同,从而自动推导出Tensor重排布的算法。而这个例子所需要的Tensor重排布是一个AllGather算子(注:MindSpore的AllGather算子会自动把多个输入Tensor在第0维进行合并) diff --git a/tutorials/experts/source_zh_cn/parallel/resilience_train_and_predict.md b/tutorials/experts/source_zh_cn/parallel/resilience_train_and_predict.md index 835a6aedc74ccc2bba7338599f45df183510bb18..dc6ee0d4161bc9a69f01df9aa0d3ce1393bab446 100644 --- a/tutorials/experts/source_zh_cn/parallel/resilience_train_and_predict.md +++ b/tutorials/experts/source_zh_cn/parallel/resilience_train_and_predict.md @@ -40,7 +40,7 @@ ### 整体流程 -首先,执行分布式训练,并行模式设置为`semi_auto_parallel`/`auto_parallel`,同时通过调用`set_auto_parallel_context`接口自定义`strategy_ckpt_save_file`参数配置模型切分策略文件存储路径, +首先,执行分布式训练,并行模式设置为`semi_auto_parallel`/`auto_parallel`,同时通过调用`set_auto_parallel_context`接口自定义`strategy_ckpt_save_file`参数配置模型切分策略文件存储路径, 训练一段时间后,调用存储Checkpoint的callback函数,将分布式的Checkpoint存储下来。而后编译新的卡数/切分策略下的网络,生成目标网络的模型切分策略文件,调用分布式Checkpoint转换的接口进行分布式Checkpoint的转换。 ### 执行分布式训练 @@ -49,7 +49,7 @@ 添加保存Checkpoint的回调函数,首先定义Checkpoint存储相关的配置对象`CheckpointConfig`,注意`integrated_save`配置为`False`,意味着不对分布式训练的权重做聚合保存,以适应大模型下的内存开销。 而后定义保存Checkpoint的回调函数`ModelCheckpoint`。最后,调用`model.train`执行训练。 关于分布式训练的基本使用方法,请参考[分布式训练Ascend](https://www.mindspore.cn/tutorials/experts/zh-CN/master/parallel/train_ascend.html)。 -或者[分布式训练GPU](https://www.mindspore.cn/tutorials/experts/zh-CN/master/parallel/train_gpu.html) 。 +或者[分布式训练GPU](https://www.mindspore.cn/tutorials/experts/zh-CN/master/parallel/train_gpu.html)。 ```python import mindspore as ms @@ -83,7 +83,7 @@ model.train(2, dataset, callbacks=callback, dataset_sink_mode=True) bash run_train_8p.sh ../output/wmt14.en_fr.txt ``` -执行后,将会生成源Checkpoint文件目录以及源切分策略文件: +执行后,将会生成源Checkpoint文件目录以及源切分策略文件: ```text src_checkpoints/ @@ -145,7 +145,7 @@ model.infer_predict_layout(Tensor(np.ones(shape=data_shape))) bash run_compile_4p.sh ../output/wmt14.en_fr.txt ``` -执行后,将会生成目标切分策略文件: +执行后,将会生成目标切分策略文件: ```text dst_strategy.ckpt @@ -191,13 +191,13 @@ dst_strategy.ckpt src_strategy_file, dst_strategy_file) ``` - 示例中,对Checkpoint按照rank逐个转换的脚本执行命令为: + 示例中,对Checkpoint按照rank逐个转换的脚本执行命令为: - ```bash - bash transform_by_rank.sh ./src_strategy.ckpt ./dst_strategy.ckpt ./src_checkpoints ./dst_checkpoints - ``` + ```bash + bash transform_by_rank.sh ./src_strategy.ckpt ./dst_strategy.ckpt ./src_checkpoints ./dst_checkpoints + ``` -执行后,将会生成转换后的目标Checkpoint文件目录: +执行后,将会生成转换后的目标Checkpoint文件目录: ```text dst_checkpoints/ @@ -310,7 +310,7 @@ model.train(2, dataset, callbacks=callback, dataset_sink_mode=True) bash run_train_8p_pipeline.sh ../output/wmt14.en_fr.txt ``` -执行后,将会生成源Checkpoint文件目录以及源切分策略文件: +执行后,将会生成源Checkpoint文件目录以及源切分策略文件: ```text src_checkpoints_pipeline/ @@ -325,7 +325,7 @@ src_pipeline_strategys/ bash run_compile_4p.sh ../output/wmt14.en_fr.txt ``` -执行后,将会生成目标切分策略文件: +执行后,将会生成目标切分策略文件: ```text dst_strategy.ckpt