Script Adaptation

Adaptation Description

Script adaptation is classified into three types: fault recovery, dying gasp, and hybrid parallel model. Select a script adaptation example based on your requirements. The example structure is as follows: For details about the YAML parameters used in some examples, see YAML Parameters.

TensorFlow-based Fault Recovery Code Adaptation Example

  1. Download the corresponding ResNet-50 model code from ModelZoo as the training code, and decompress the downloaded training code to the created code directory.
  2. Go to the MindXDL-deploy repository, select the master branch, and obtain the train_start.sh, utils.sh, and rank_table.sh files in the samples/train directory. Then, combine the files with the scripts directory in the training code to construct the following directory structure on the master node:
    /data/atlas_dls/code/ResNet50_for_TensorFlow_1.7_code
    ├── scripts
    │   ├──  train_start.sh
    │   ├──  utils.sh
    │   ├──  rank_table.sh
    │    ...
    │        ...
    ├──  EnvPerformCheck (folder)
    ├──  infer
    ├──  on_platform
    ├──  src
    ├──  test
     ...
    └──  configs
  3. Modify the corresponding configuration file: /data/atlas_dls/code/ResNet50_for_TensorFlow_1.7_code/src/configs/res50_256bs_1p.py (set the configuration file name based on your needs) and add the following parameters:
    'restore_path': '/xxx/ d_solution/ckpt0 ', # Set this parameter based on the pre-trained CKPT. In this example, the model saved in card 0 is used.
  4. Modify model loading: /data/atlas_dls/code/ResNet50_for_TensorFlow_1.7_code/src/models/resnet50/res50_model.py. Add the following code line to assert (mode == tf.estimator.ModeKeys.TRAIN):
    import os
    variables_to_restore = tf.contrib.slim.get_variables_to_restore()
    restore_path = self.config.get('restore_path', "")
    if restore_path and os.path.exists(restore_path):
        print(f"load pre-trained model from: {restore_path}")
        tf.train.init_from_checkpoint(restore_path, {v.name.split(':')[0]: v for v in variables_to_restore})
  5. Create a YAML file. The YAML file will be used for job startup. For details, see TensorFlow YAML Template. Replace the startup command with the following:
    ...
    command:
    - "/bin/bash"
    - "-c"
    - "cd /job/code/ResNet50_for_TensorFlow_1.7_code/scripts;chmod +x train_start.sh;bash train_start.sh /job/code/ResNet50_for_TensorFlow_1.7_code/ /job/output/logs src/mains/res50.py --config_file=..." # Not all parameters are listed here.
    ...

Pytorch-based Fault Recovery Code Adaptation Example

  1. Download the corresponding ResNet-50 model code from ModelZoo as the training code.
  2. Go to the MindXDL-deploy repository, select the master branch, and obtain the train_start.sh, utils.sh, and rank_table.sh files in the samples/pytorch/resnet50 directory. Then, combine the files with the scripts directory in the training code to construct the following directory structure on the master node:
    root@ubuntu:/data/atlas_dls/code/ResNet50_for_PyTorch_1.4_code/scripts/#
    scripts/
    ├── rank_table.sh
    ├── utils.sh
    └── train_start.sh
  3. For single-device training, modify the pytorch_resnet50_apex.py file in the training code directory. The modification involves adjusting the logic of model saving and loading.
    import argparse
    import glob
    import os
    ...
        if args.resume:
            candidate_ckpt_path = ""
            for p in glob.glob(f"./rank*"):
                best_ckpt_path = os.path.join(p, "model_best.pth.tar")
                if os.path.exists(best_ckpt_path):
                    candidate_ckpt_path = best_ckpt_path
                    break
    
            if candidate_ckpt_path:
                print("[gpu id:", args.gpu, "]", "=> loading checkpoint '{}'".format(candidate_ckpt_path))
                loc = ""
                if args.gpu is None:
                    checkpoint = torch.load(candidate_ckpt_path)
                else:
                    # Map model to be loaded to specified single gpu.
                    if args.device == 'npu':
                        loc = 'npu:{}'.format(args.gpu)
                    else:
                        loc = 'cuda:{}'.format(args.gpu)
                    checkpoint = torch.load(candidate_ckpt_path, map_location=loc)
                print(f"load checkpoint to : {loc}")
    
                args.start_epoch = checkpoint['epoch']
                best_acc1 = checkpoint['best_acc1']
                if args.gpu is not None:
                    # best_acc1 may be from a checkpoint from a different GPU
                    best_acc1 = best_acc1.to(args.gpu)
    
                model.load_state_dict(checkpoint['state_dict'])
                optimizer.load_state_dict(checkpoint['optimizer'])
                print("[gpu id:", args.gpu, "]",
                      "=> loaded checkpoint '{}' (epoch {})".format(candidate_ckpt_path, checkpoint['epoch']))
            else:
                print("no valid ckpt found to resume.")
    cudnn.benchmark = True
    ...
            # remember best acc@1 and save checkpoint
            is_best = acc1 > best_acc1
            best_acc1 = max(acc1, best_acc1)  
            file_name = "checkpoint_npu{}".format(args.npu)
    
            save_path = f"./rank_{args.rank}"
            if not os.path.exists(save_path):
                os.makedirs(save_path, exist_ok=True)
    
            modeltmp = model.cpu()
            save_checkpoint({
                'epoch': epoch + 1,
                'arch': args.arch,
                'state_dict': modeltmp.state_dict(),
                'best_acc1': best_acc1,
                'optimizer' : optimizer.state_dict(),
            }, is_best, save_path=save_path)
            modeltmp.to(CALCULATE_DEVICE)
    ...
    def save_checkpoint(state, is_best, filename='checkpoint.pth.tar', save_path="./"):
        if is_best:
            target_path = os.path.join(save_path, 'model_best.pth.tar')
            torch.save(state, target_path)
            print(f"save ckpt to {target_path} done. Best epoch for now is :{state['epoch']}")
  4. For distributed training, modify the main_apex_d76_npu.py file in the DistributedResnet50 directory. The modification involves adjusting the logic of model saving and loading.
    import argparse
    import glob
    import os
    ...
    if args.resume:
        candidate_ckpt_path = ""
        for p in glob.glob(f"./rank*"):
            best_ckpt_path = os.path.join(p, "model_best.pth.tar")
            if os.path.exists(best_ckpt_path):
                candidate_ckpt_path = best_ckpt_path
                break
    
        if candidate_ckpt_path:
            print("[gpu id:", args.gpu, "]", "=> loading checkpoint '{}'".format(candidate_ckpt_path))
            loc = ""
            if args.gpu is None:
                checkpoint = torch.load(candidate_ckpt_path)
            else:
                # Map model to be loaded to specified single gpu.
                if args.device == 'npu':
                    loc = 'npu:{}'.format(args.gpu)
                else:
                    loc = 'cuda:{}'.format(args.gpu)
                checkpoint = torch.load(candidate_ckpt_path, map_location=loc)
            print(f"load checkpoint to : {loc}")
    
            args.start_epoch = checkpoint['epoch']
            best_acc1 = checkpoint['best_acc1']
            if args.gpu is not None:
                # best_acc1 may be from a checkpoint from a different GPU
                best_acc1 = best_acc1.to(args.gpu)
    
            model.load_state_dict(checkpoint['state_dict'])
            optimizer.load_state_dict(checkpoint['optimizer'])
            print("[gpu id:", args.gpu, "]",
                  "=> loaded checkpoint '{}' (epoch {})".format(candidate_ckpt_path, checkpoint['epoch']))
        else:
            print("no valid ckpt found to resume.")
    cudnn.benchmark = True
    ...
    if args.device == 'gpu':
        if not args.multiprocessing_distributed or (args.multiprocessing_distributed
                                                    and args.rank % ngpus_per_node == 0):
            save_checkpoint({
                'epoch': epoch + 1,
                'arch': args.arch,
                'state_dict': model.state_dict(),
                'best_acc1': best_acc1,
                'optimizer': optimizer.state_dict(),
            }, is_best)
    elif args.device == 'npu':
        # Use device 0 to save model file on each node. The model is saved only when the ACC is escalated.
        if args.rank % ngpus_per_node == 0:
            save_path = f"./rank_{args.rank}"
            if not os.path.exists(save_path):
                os.makedirs(save_path, exist_ok=True)
    
            modeltmp = model.cpu()
            save_checkpoint({
                'epoch': epoch + 1,
                'arch': args.arch,
                'state_dict': modeltmp.state_dict(),
                'best_acc1': best_acc1,
                'optimizer': optimizer.state_dict(),
            }, is_best, save_path=save_path)
    
            loc = 'npu:{}'.format(args.gpu)
            modeltmp.to(loc)
    ...
    def save_checkpoint(state, is_best, filename='checkpoint.pth.tar', save_path="./"):
        if is_best:
            target_path = os.path.join(save_path, 'model_best.pth.tar')
            torch.save(state, target_path)
            print(f"save ckpt to {target_path} done. Best epoch for now is :{state['epoch']}")
  5. Create a YAML file. The YAML file will be used for job startup. Add the --resume option to the startup command. For details, see YAML template of PyTorch.
    ...
          command:
          - "/bin/bash"
          - "-c"
          - "cd /job/code/ResNet50_for_PyTorch_1.4_code/scripts;chmod +x train_start.sh;bash train_start.sh /job/code/ResNet50_for_PyTorch_1.4_code/ /job/output/logs  --data=/job/data/imagenet --seed=49 --worker=128 --learning-rate=1.6 --warmup=8 --label-smoothing=0.1 --mom=0.9 --weight-decay=1.0e-04 --static-loss-scale=128 --print-freq=1 --dist-url='tcp://127.0.0.1:50000' --dist-backend='hccl'  --epoch=90 --batch-size=4096 --resume=true;"
    ...

MindSpore-based Fault Recovery Code Adaptation Example

  1. Download the ResNet code of the r1.9 branch from the MindSpore repository as the training code.
  2. Run the following command to create a code directory on the master node:
    mkdir /data/atlas_dls/code
  3. Go to the MindXDL-deploy repository, select the master branch, and obtain the train_start.sh, main.sh, and pre_stop.sh files in the samples/mindspore/resnet50 directory. Then, combine the files with the resnet/scripts directory in the training code to construct the following directory structure on the master node:
    root@ubuntu:/data/atlas_dls/code/resnet/scripts/#
    scripts/
    ├── pre_stop.sh
    ├── main.sh
     ...
    ├── run_distribute_train.sh
    ├── run_distribute_train_gpu.sh
    └── train_start.sh
  4. Modify the train_start.sh file in /data/atlas_dls/code/resnet/scripts.
    1. Change dataset_path to the actual dataset directory in the container.
    2. Change conig_yaml_path to the actual configuration file path in the container.
    # train_start.sh: Modify the parameters based on your needs. Global configuration parameters: dataset path and configuration parameter file path. For other model adaptation, add or delete parameters based on your needs.
    dataset_path=/job/data/imagenet_full/train
    config_yaml_path=/job/code/resnet/config/resnet50_imagenet2012_config.yaml
    
    # main.sh: For this example (ResNet-50 model), you do not need to modify the script. For other model adaptation, add, delete, or modify the environment variable configuration based on the site requirements, and then modify the training startup script path and corresponding parameters, that is, the part invoked by the python command in the main.sh script.
    # In this example, the python command for a single-node system with a single device is as follows:
    python ${ROOT_PATH}/../train.py --data_path=${DATA_PATH} --config_path=${CONFIG_PATH} --output_path=${OUTPUT_PATH} --pre_trained=${OUTPUT_PATH}
    # In this example, the commands for single-server multi-device and distributed deployment are as follows:
    python ${ROOT_PATH}/../train.py --run_distribute=True --device_num=${RANK_SIZE} --data_path=${DATA_PATH} --config_path=${CONFIG_PATH} --output_path=${OUTPUT_PATH} --pre_trained=${OUTPUT_PATH}

    The train_start.sh script calls the main.sh script to start a training job. When adapting to other models, adjust the environment variable configuration, startup script path, and startup script parameters in the main.sh script based on the usage guide of the training startup script (train.py in this example).

  5. Modify the resnet50_imagenet2012_config.yaml file in /data/atlas_dls/code/resnet/config/. Configure model and graph build saving and loading functions.
    ...
    run_distribute: False
    enable_profiling: False
    data_path: "/cache/data"
    output_path: "/cache/train" # Checkpoint saving path. You can change the path as required.
    load_path: "/cache/checkpoint_path/"
    device_target: "Ascend"
    checkpoint_path: "./checkpoint/"
    checkpoint_file_path: ""
    ...
    net_name: "resnet50"
    dataset: "imagenet2012"
    device_num: 1
    pre_trained: "/job/code/output/checkpoint/ckpt_0" #  Path for loading the pre-trained model in the container, which can be a directory or file. Modify the path as required by referring to the training YAML file.
    run_eval: False
    eval_dataset_path: ""
    parameter_server: False
    filter_weight: False
    save_best_ckpt: True
    eval_start_epoch: 40
    ...
    network_dataset: "resnet50_imagenet2012"
    
    
    # Retraining options
    save_graphs: False  # Whether to save the graph build result.
    save_graphs_path: "./graphs" # Path for saving the graph build result.
    has_trained_epoch: 0 # Epoch for model pre-training. The default value is 0.
    has_trained_step: 0 # Model pre-training step. The default value is 0.
    ---
    # Help description of each option
    enable_modelarts: "Whether training on modelarts, default: False"
    ...
    batch_size: "Batch size for training and evaluation"
    epoch_size: "Total training epochs."
    checkpoint_path: "The location of the checkpoint file."
    checkpoint_file_path: "The location of the checkpoint file."
    save_graphs: "Whether save graphs during training, default: False."
    save_graphs_path: "Path to save graphs."
  6. The startup script of the ResNet code is train.py. Check whether the code for saving the checkpoint exists in train.py. If yes, go to 8. If no, go to 7.
  7. Add code for saving the checkpoints. The following is an example of checkpoint saving provided in the r1.9 branch of ResNet. You need to define and set required parameters in the configuration file. For other model adaptation, add the code for saving the checkpoint based on the startup script content by referring to the following snippet: For details, see the tutorial on the official website of MindSpore.
     ...
        # Model saving code
        if config.save_checkpoint:
            ckpt_append_info = [{"epoch_num": config.has_trained_epoch, "step_num": config.has_trained_step}]
            config_ck = CheckpointConfig(save_checkpoint_steps=config.save_checkpoint_epochs * step_size,
                                         keep_checkpoint_max=config.keep_checkpoint_max,
                                         append_info=ckpt_append_info)
            ckpt_cb = ModelCheckpoint(prefix="resnet", directory=ckpt_save_dir, config=config_ck)
            cb += [ckpt_cb]
    ...
  8. The startup script of the ResNet code is train.py. Check whether the code for loading the checkpoint exists in train.py. If yes, go to 10. If no, go to 9.
  9. Add the code for loading the checkpoint. The following is an example of checkpoint loading provided in the r1.9 branch of ResNet. You need to define and set required parameters in the configuration file. For other model adaptation, add the code for loading the checkpoint based on the startup script content by referring to the following snippet: For details, see the tutorial on the official website of MindSpore.
    ...
    def load_pre_trained_checkpoint():
        """
        Load checkpoint according to pre_trained path.
        """
        param_dict = None
        if config.pre_trained:
            if os.path.isdir(config.pre_trained):
                # To simplify the document, the verification of configuration parameters such as config.output_path is omitted. Do not forget to verify them in actual operation.
                ckpt_save_dir = os.path.join(config.output_path, config.checkpoint_path, "ckpt_0")
                ckpt_pattern = os.path.join(ckpt_save_dir, "*.ckpt")
                ckpt_files = glob.glob(ckpt_pattern)
                if not ckpt_files:
                    logger.warning(f"There is no ckpt file in {ckpt_save_dir}, "
                                   f"pre_trained is unsupported.")
                else:
                    ckpt_files.sort(key=os.path.getmtime, reverse=True)
                    time_stamp = datetime.datetime.now()
                    print(f"time stamp {time_stamp.strftime('%Y.%m.%d-%H:%M:%S')}"
                          f" pre trained ckpt model {ckpt_files[0]} loading",
                          flush=True)
                    param_dict = load_checkpoint(ckpt_files[0])
            elif os.path.isfile(config.pre_trained):
                # Call the checkpoint loading code.
                param_dict = ms.load_checkpoint(config.pre_trained)
            else:
                print(f"Invalid pre_trained {config.pre_trained} parameter.")
        return param_dict
    ...
  10. Create the YAML file for delivering a job.

    For details about how to modify YAML parameters, see YAML Parameters.

Pangu Model-based Fault Recovery Code Adaptation Example

  1. Download the pangu_alpha code of the r1.9 branch from the MindSpore repository as the training code.
  2. Run the following command to create a code directory on the master node:
    mkdir /data/atlas_dls/code
  3. Go to the MindXDL-deploy repository, select the master branch, and obtain the train_start.sh, main.sh, and pre_stop.sh files in the samples/mindspore/pangu_alpha directory. Then, combine the files with the pangu_alpha/scripts directory in the training code to construct the following directory structure on the master node: For the Pangu model running over 10 billions of parameters, use the corresponding file in the samples/mindspore/pangu_alpha_13B directory.
    root@ubuntu:/data/atlas_dls/code/pangu_alpha/scripts/# 
    scripts/
    ├── cache_util.sh
    ├── hccl.log
    ├── log
    ├── main.sh
    ├── pre_stop.sh
    ├── run_distribute_train_gpu.sh
    ├── run_distribute_train.sh
    ├── run_eval_gpu_resnet_benckmark.sh
    ├── run_eval_gpu.sh
    ├── run_eval.sh
    ├── run_gpu_resnet_benchmark.sh
    ├── run_infer_310.sh
    ├── run_infer.sh
    ├── run_parameter_server_train_gpu.sh
    ├── run_parameter_server_train.sh
    ├── run_standalone_train_gpu.sh
    ├── run_standalone_train.sh
    └── train_start.sh
  4. Modify the train_start.sh file in the /data/atlas_dls/code/pangu_alpha/scripts directory. Change dataset to the actual dataset directory in the container.
    ...
    # Training dataset path. Change it as required.
    # Security tip: The verification of paths and input parameters is involved.
    dataset="/job/data/dataset/train_data"
    
    
    # Single-node training scenario
    if [[ "$server_count" == "1" ]]; then
        server_id=0
        if [ ${device_count} -lt 8 ]; then
            echo "Less than 8 card training is not supported for pangu alpha model." | tee log
        fi
        if [ ${device_count} -eq 8 ]; then
            bash main.sh ${device_count} ${server_count} ${RANK_TABLE_FILE} ${server_id} ${dataset}
        fi
    
    
    # Distributed training scenario
    else
        server_id=$(get_server_id)
        if [ $? -eq 1 ];then
            echo "get server id failed."
            exit 1
        fi
        echo "server id is: "${server_id}
        bash main.sh ${device_count} ${server_count} ${RANK_TABLE_FILE} ${server_id} ${dataset}
  5. Skip this step for models with 10 billion or less parameters. To train a model with hundreds of billions of parameters and recover it within 5 minutes, additional script adaptation is required. The following uses the r1.9 branch of pangu_alpha code in MindSpore repository as an example. (The resumable training job configuration and script adaptation have been completed.)
    1. Modify the args_opt.num_layers, args_opt.stage_num and args_opt.micro_size parameters in the pangu_alpha_config.py file.
      def set_parse_200B(args_opt):
          r"""
              Set config for 200B mode
          """
          args_opt.embedding_size = 16384
          args_opt.num_layers = 32
          args_opt.num_heads = 128
          if args_opt.per_batch_size == 0:
              args_opt.per_batch_size = 1
          args_opt.word_emb_dp = 0
          if args_opt.run_type == "train":
              args_opt.start_lr = 6e-5
              args_opt.end_lr = 6e-6
             args_opt.stage_num = 8
             args_opt.micro_size = 16
              args_opt.op_level_model_parallel_num = 16
              if args_opt.optimizer_shard = 1:
                  args_opt.op_level_model_parallel_num = 8
          elif args_opt.run_type == "predict":
              args_opt.stage_num = 4
              args_opt.micro_size = 1
              args_opt.op_level_model_parallel_num = 16
              if args_opt.optimizer_shard == 1:
                  args_opt.op_level_model_parallel_num = 8
    2. In addition, you need to specify or directly change the value of micro_batch_interleaved in src/utils.py to 1. For details, see the calculation relationship among stage_device_num, data_parallel_num, micro_batch_interleaved, and batch_size in the run_train_pipeline function of the train.py script. The final result must meet the following condition: The value of batch_size of PanguAlphaConfig is a multiple of the value of data_parallel of TransformerOpParallelConfig.
  6. The startup script of the Pangu code is train.py. Check whether the code for saving the checkpoint exists in train.py. If yes, go to 8. If no, go to 7.
  7. Add code for saving the checkpoints. The following is an example of checkpoint saving provided in the r1.9 branch of Pangu. All parameters need to be defined and set in the configuration file. The src/utils.py script is used as an example. For details, see 10.
    ...
    
        # Call the checkpoint saving code.
        add_checkpoint_callback_policy(args_opt, callback, rank)
    ...
    # Define the checkpoint saving code.
    def add_checkpoint_callback_policy(args_param, callback, rank_id):
        r"""
        Add checkpoint policy to callback.
        """
        # Security tip: The verification of paths and input parameters is involved.
        if args_param.save_checkpoint:
            # The checkpoints save the epoch_num and step_num info information.
            ckpt_append_info = [{"epoch_num": args_param.has_trained_epoches, "step_num": args_param.has_trained_steps}]
            ckpt_config = CheckpointConfig(save_checkpoint_steps=args_param.save_checkpoint_steps,
                                           keep_checkpoint_max=args_param.keep_checkpoint_max,
                                           integrated_save=False,
                                           append_info=ckpt_append_info
                                           )
    
    
            ckpoint_cb = ModelCheckpoint(prefix=args_param.ckpt_name_prefix + str(rank_id),
                                         directory=os.path.join(args_param.save_checkpoint_path, f"rank_{rank_id}"),
                                         config=ckpt_config)
    
    
            callback.append(ckpoint_cb)
    ...
  8. The startup script of the Pangu code is train.py. Check whether the code for loading the checkpoint exists in train.py. If yes, go to 11. If no, go to 9.
  9. Add the code for loading the checkpoint. The following is a checkpoint loading example provided by r1.9 branch of Pangu. Some checkpoint loading code exists. You need to add the checkpoint loading code related to resumable training. For details about the parameters, see 10 in the src/utils.py configuration file.
    ...
    # If pipeline parallelism is not enabled for the running model, modify the following function:
    def set_parallel_context(args_opt):
    # If pipeline parallelism is enabled for the running model, modify the following function:
    # Security tip: The verification of paths and input parameters is involved.
    def set_pipeline_parallel_context(args_opt):
    # Before adding the following code to context.set_auto_parallel_context, refer to the parameter description of set_auto_parallel_context in Parallel Distributed Training Mode.
    
    
            # Content added to resumable training.
            if not os.path.exists(args_opt.strategy_load_ckpt_path):
                args_opt.strategy_load_ckpt_path = ""
    
            # content added to resumable training. The strategy_ckpt_save_file_path parameter can be specified based on the path in the container.
            strategy_ckpt_save_file_path = '/job/data/code/fault_torlence/pangu_alpha/strategy.ckpt' 
            if args_opt.strategy_load_ckpt_path == strategy_ckpt_save_file_path:
                 strategy_ckpt_save_file_path = '/job/data/code/fault_torlence/pangu_alpha/strategy_new.ckpt'
    
           # strategy_ckpt_save_file='strategy.ckpt' is changed to strategy_ckpt_save_file=strategy_ckpt_save_file_path
            context.set_auto_parallel_context(
                parallel_mode=ParallelMode.SEMI_AUTO_PARALLEL, gradients_mean=False,
                full_batch=bool(args_opt.full_batch), strategy_ckpt_load_file=args_opt.strategy_load_ckpt_path,
                enable_parallel_optimizer=bool(args_opt.optimizer_shard), strategy_ckpt_save_file='strategy.ckpt')
            set_algo_parameters(elementwise_op_strategy_follow=True)
            _set_multi_subgraphs()
    ...
    ...
    # Define the checkpoint loading code.
    # Security tip: The verification of paths and input parameters is involved.
    def restore_checkpoint(args_param, sink_size, dataset, model, network, epoch):
        r"""
        Load checkpoint process.
        """
        print("======start single checkpoint", flush=True)
        ckpt_name = args_param.ckpt_name_prefix
        # To simplify the document, the verification of the command line parameters save_checkpoint_path and ckpt_name is omitted. Do not forget to verify them.
        ckpt_pattern = os.path.join(args_param.save_checkpoint_path, "rank_{}".format(D.get_rank()),
                                    f"{ckpt_name}*.ckpt")
        ckpt_all_files = glob.glob(ckpt_pattern)
        if not ckpt_all_files:
            print(f"There is no ckpt file in {args_param.save_checkpoint_path}, "
                  f"current ckpt_files found is {ckpt_files} "
                  f"with pattern {ckpt_pattern}, so skip the loading.")
            return
        ckpt_exp_pattern = os.path.join(
            args_param.save_checkpoint_path,
            "rank_{}".format(D.get_rank()),
            f"{ckpt_name}*_breakpoint.ckpt",
        )
        ckpt_exp_files = glob.glob(ckpt_exp_pattern)
        ckpt_files = []
        for file in ckpt_all_files:
            if file not in ckpt_exp_files:
                ckpt_files.append(file)
    
        if not ckpt_files:
            print(
                f"There is no ckpt file in {args_param.save_checkpoint_path}, "
                f"current ckpt_files found is {ckpt_files} "
                f"with pattern {ckpt_pattern}, so skip the loading."
            )
            return
        ckpt_files.sort(key=os.path.getmtime, reverse=True)
        time_stamp = datetime.datetime.now()
        print(f"time stamp {time_stamp.strftime('%Y.%m.%d-%H:%M:%S')} pre trained ckpt model {ckpt_files} loading",
              flush=True)
        # Load the latest checkpoint file.
        print(f'Start to load from {ckpt_files[0]}')
        param_dict = load_checkpoint(ckpt_files[0])
        if param_dict.get("epoch_num") and param_dict.get("step_num"):
            args_param.has_trained_epoches = int(param_dict["epoch_num"].data.asnumpy())
            args_param.has_trained_steps = int(param_dict["step_num"].data.asnumpy())
        model.build(train_dataset=dataset, sink_size=sink_size, epoch=epoch)
        load_param_into_net(network, param_dict)
    ...
  10. Modify the parameters in the src/utils.py script.
    ...
        opt.add_argument("--vocab_size",
                          type=int,
                          default=50304, # Change the value based on the training dataset. Here, the value has been changed to the value of the sample dataset.
                          help="vocabulary size, default is 40000.")
    ...
        opt.add_argument("--data_column_name",
                         type=str,
                         default="text", # Change the value based on the field defined by the dataset. Here, the value has been changed to the value of the sample dataset.
                         help="Column name of datasets")
    ...
        parser.add_argument("--strategy_load_ckpt_path",
                            type=str,
                            default="/job/data/code/fault_torlence/pangu_alpha/strategy/strategy.ckpt", # Specify the paths in the container during the resumable training based on user habits. The paths will not be overwritten by training.
                            help="The training prallel strategy for the model.")
        parser.add_argument("--tokenizer_path",
                            type=str,
                            default="./tokenizer_path",
                            help="The path where stores vocab and vocab model file")
    ...
    def add_retrain_params(opt):
        """
        Add parameters about retrain.
        """
        opt.add_argument("--pre_trained",
                         type=str,
                         default="/job/data/code/fault_torlence/pangu_alpha/8p", # path of the pre-trained model.
                         help="Pretrained checkpoint path.")
        opt.add_argument("--save_checkpoint_path",  # Path for saving the model.
                         type=str,
                         default="/job/data/code/fault_torlence/pangu_alpha/8p",
                         help="Save checkpoint path.")
        opt.add_argument("--keep_checkpoint_max", # model saving policy: maximum quantity.
                         type=int,
                         default=1,
                         help="Max checkpoint save number.")
        opt.add_argument("--save_checkpoint_steps", # model saving policy: saving interval.
                         type=int,
                         default=20,
                         help="Save checkpoint step number.")
        opt.add_argument("--save_checkpoint", # whether to save the model in the current training.
                         type=ast.literal_eval,
                         default=True,
                         help="Whether save checkpoint in local disk.")
        opt.add_argument("--ckpt_name_prefix", # model saving policy: file name prefix.
                         type=str,
                         default="pangu",
                         help="Saving checkpoint name prefix.")
    ...
  11. Use the a800_vcjob.yaml file in the yamls folder in Code Repository to run the job. For details about how to modify YAML parameters, see YAML Parameters.

Example of Dying Gasp Code Adaptation Based on the ResNet-50 Model

The dying gasp function supports only the MindSpore framework. You need to learn the methods and examples in Common MindSpore Callback Functions (ModelCheckpoint content), and then adapt the training startup script.

The cluster scheduling component also enhances the dying gasp function. Take the r1.9 branch of the ResNet model as an example. Add the following content in bold to the train.py file:

from mindx_elastic.terminating_message import ExceptionCheckpoint
import os # Import OS if it does not exist.
import datetime # Import datetime if it does not exist.
...
def _is_time_interval_valid():
     # Security tip: The verification of paths and input parameters is involved.
    ckpt_save_dir = os.path.join(config.output_path, config.checkpoint_path, "ckpt_0")
    ckpt_pattern = os.path.join(ckpt_save_dir, "*breakpoint.ckpt")
    ckpt_files = glob.glob(ckpt_pattern)
    if not ckpt_files:
        return True
    else:
        ckpt_files.sort(key=os.path.getmtime, reverse=True)
        last_breakpoint_ckpt = ckpt_files[0]
        last_breakpoint_ckpt_timestamp = os.path.getmtime(last_breakpoint_ckpt)
        if int((datetime.datetime.now() - datetime.timedelta(minutes=1)).timestamp()) > int(last_breakpoint_ckpt_timestamp):
            return True
        return False


def train_net():
...
    # define callbacks
    time_cb = TimeMonitor(data_size=step_size)
    loss_cb = LossCallBack(config.has_trained_epoch)
    cb = [time_cb, loss_cb]
    ckpt_save_dir = set_save_ckpt_dir()
    if config.save_checkpoint:
        ckpt_append_info = [{"epoch_num": config.has_trained_epoch, "step_num": config.has_trained_step}]
        config_ck = CheckpointConfig(save_checkpoint_steps=config.save_checkpoint_epochs * step_size,
                                     keep_checkpoint_max=config.keep_checkpoint_max,
                                     append_info=ckpt_append_info,
                                     exception_save=_is_time_interval_valid())
        ckpt_cb = ModelCheckpoint(prefix="resnet", directory=ckpt_save_dir, config=config_ck)

        cb += [ckpt_cb]
       if _is_time_interval_valid():
           ckpoint_exp = ExceptionCheckpoint(prefix="resnet", directory=ckpt_save_dir, config=config_ck)
             cb += [ckpoint_exp]
    run_eval(target, model, ckpt_save_dir, cb)
...

Adaptation of other models is similar. The usage of ExceptionCheckpoint is similar to that of ModelCheckpoint. You can add the defined ExceptionCheckpoint to the callback list.

Use the a800_vcjob.yaml file in the yamls folder of Code Repository to run the job. For details about how to modify YAML parameters, see YAML Parameters.

Example of Dying Gasp Code Adaptation Based on the Pangu_alpha Model

The dying gasp function supports only the MindSpore framework. You need to learn the methods and examples in Common MindSpore Callback Functions (ModelCheckpoint content), and then adapt the training startup script.

Cluster scheduling components also enhance the dying gasp feature. Take the r1.9 branch of the pangu_alpha model as an example. Add the following content in bold to the train.py file. You need to download the Elastic_Agent software package to obtain mindx_elastic and install it in the container.

from mindx_elastic.terminating_message import ExceptionCheckpoint
...
def add_checkpoint_callback_policy(args_param, callback, rank_id):
...
        ckpoint_cb = ModelCheckpoint(
            prefix=args_param.ckpt_name_prefix + str(rank_id),
            directory=os.path.join(args_param.save_checkpoint_path,
                                   f"rank_{rank_id}"),
            config=ckpt_config)


        # Exception callback
         # Security tip: The verification of paths and input parameters is involved.
        ckpoint_exp = ExceptionCheckpoint(
            prefix=args_param.ckpt_name_prefix + str(rank_id),
            directory=os.path.join(args_param.save_checkpoint_path,
                                   f"rank_{rank_id}"), config=ckpt_config)
        callback.append(ckpoint_cb)
        callback.append(ckpoint_exp)
...

# If the recovery policy is not used, use the dying gasp instead of the periodic CKPT to perform resumable training, the restore_exception_checkpoint function needs to be modified. (When the lastword CKPT corresponding to a rank is missing, the loaded models are inconsistent, which may cause an exception during the training.)
def restore_exception_checkpoint(args_param, sink_size, dataset, model, network, epoch):
    """
    Restore exception checkpoint to training model.
    Args:
        args_param: model training parameters
        sink_size: model training sink size
        dataset: dataset used for training
        model: model
        network: pangu_alpha network
        epoch: training epoch

    Returns: load exception checkpont success or not.

    """
    ckpt_name = args_param.ckpt_name_prefix

    try:
        ckpt_pattern = os.path.join(
            args_param.save_checkpoint_path,
            f"rank_{D.get_rank()}",
            f"{ckpt_name}*breakpoint.ckpt",
        )
        ckpt_files = glob.glob(ckpt_pattern)
        if not ckpt_files:
            return False
        ckpt_files.sort(key=os.path.getmtime, reverse=True)
        print(f" checkpoint files {ckpt_files[0]}")
        param_dict = load_checkpoint(ckpt_files[0])
        print(f" checkpoint param dict epoch num {param_dict.get('epoch_num')}")
        if param_dict.get("epoch_num") and param_dict.get("step_num"):
            args_param.has_trained_epoches = int(param_dict["epoch_num"].data.asnumpy())
            args_param.has_trained_steps = int(param_dict["step_num"].data.asnumpy())

        # Load checkpoint files
        model.build(train_dataset=dataset, sink_size=sink_size, epoch=epoch)
        load_param_into_net(network, param_dict)
    except TypeError:
        return False
    else:
        return True

Adaptation of other models is similar. The usage of ExceptionCheckpoint is similar to that of ModelCheckpoint. You can add the defined ExceptionCheckpoint to the callback list.

Use the a800_vcjob.yaml file in the yamls folder of Code Repository to run the job. For details about how to modify YAML parameters, see YAML Parameters.

Code Adaptation Example of the Hybrid Parallel Model Based on the Pangu_alpha Model

  1. Download the pangu_alpha code of the r1.9 branch in MindSpore repository. The pangu_alpha model is used as an example to describe how to adapt the restoration policy of the hybrid parallel model.
  2. Configure the restoration policy for hybrid parallel models. For details about how to use GROUP_INFO_FILE, see MindSpore Documentation. Take the pangu_alpha model as an example and add variables (content in bold) to the DL component startup script main.sh.
    ...
            rankid=$((rank_start + i))
            export DEVICE_ID=${i}
            export RANK_ID=${rankid}
            mkdir -p ${ROOT_PATH}/../device${rankid}
            cd ${ROOT_PATH}/../device${rankid} || exit
            group_info_dir=./group_info.pb
            group_info_file_tmp=$(realpath $group_info_dir)
            export GROUP_INFO_FILE=${group_info_file_tmp}
            echo "start training for rank ${RANK_ID}, device ${DEVICE_ID}"
    ...
  3. Enable the restoration policy for hybrid parallel models within the available compute resources. Take the MindSpore pangu_alpha 2.6B model as an example. In the src/pangu_alpha_config.py file, verify that the value of args_opt.optimizer_shard is changed to 0.
    # Verify that optimizer_shard is set to 0.
        elif args_opt.mode == "2.6B":
            args_opt.embedding_size = 2560
            args_opt.num_layers = 32
            args_opt.num_heads = 32
            args_opt.op_level_model_parallel_num = 8
            if args_opt.run_type == "train":
                args_opt.start_lr = 1e-4
                args_opt.end_lr = 1e-6
                args_opt.optimizer_shard = 0
  4. Load the dying gasp checkpoint based on the restoration policy. Take the MindSpore pangu_alpha 2.6B model as an example and check the following code in the train.py file. The adaptation process is as follows:
    1. Import the Python dependency package. You need to download the software package to obtain mindx_elastic and install it in the container.
    2. Add the code for processing the environment variable of the parallel policy.
    3. Add the method of loading the dying gasp checkpoint.
    4. Check and adapt the original checkpoint loading method.
    # Import dependencies.
    import json
    from mindx_elastic.restore_module import RestoreStrategyGenerator
    ...
    # If pipeline parallelism is not enabled for the running model, modify the following function:
    def run_train(args_opt):
    # If pipeline parallelism is enabled for the running model, modify the following function:
    def run_train_pipeline(args_opt):
    # Add the code for processing the environment variable of the parallel policy.
    ...
        device_num = 1
        if args_opt.distribute == "true":
            rank, device_num = set_parallel_context(args_opt)
        context.set_context(save_graphs=False, save_graphs_path="./graphs_of_device_id_" + str(rank))
        # env variable prepare
         # Security tip: The verification of paths, input parameters, and environment variables is involved.
        group_info_file = os.getenv("GROUP_INFO_FILE")
        if group_info_file:
            os.environ["GROUP_INFO_FILE_REFLECT"] = group_info_file
        if group_info_file:
            # To make the document simple and easy to read, the verification of group_info_file is omitted. You need to verify it as required when you use it.
            with open(os.path.expanduser("/job/code/group_info_env"), "a") as outfile:
                outfile.write(f"export GROUP_INFO_FILE_REFLECT={group_info_file}\n")
    ...
        if args_opt.pre_trained:
    
            flag = restore_exception_checkpoint(args_opt, args_opt.sink_size, ds, model,
                                                pangu_alpha_with_grads, epoch=actual_epoch_num)
            if not flag:
                restore_checkpoint(args_opt, args_opt.sink_size, ds, model,
                                   pangu_alpha_with_grads, epoch=actual_epoch_num)
    ...
    # Modify the original method of loading the checkpoint file.
    # Security tip: The verification of paths, input parameters, and environment variables is involved.
    def restore_checkpoint(args_param, sink_size, dataset, model, network, epoch):
        r"""
        Load checkpoint process.
        """
        print("======start single checkpoint", flush=True)
        ckpt_name = args_param.ckpt_name_prefix
        ckpt_pattern = os.path.join(args_param.save_checkpoint_path, "rank_{}".format(D.get_rank()),
                                    f"{ckpt_name}*.ckpt")
        ckpt_all_files = glob.glob(ckpt_pattern)
    
    
        if not ckpt_all_files:
            print(f"There is no ckpt file in {args_param.save_checkpoint_path}, "
                  f"current ckpt_files found is {ckpt_all_files} "
                  f"with pattern {ckpt_pattern}, so skip the loading.")
    
    
        ckpt_exp_pattern = os.path.join(args_param.save_checkpoint_path, "rank_{}".format(D.get_rank()),
                                        f"{ckpt_name}*_breakpoint.ckpt")
        ckpt_exp_files = glob.glob(ckpt_exp_pattern)
        ckpt_files = []
        for file in ckpt_all_files:
            if file not in ckpt_exp_files:
                ckpt_files.append(file)
    
    
        if not ckpt_files:
            print(f"There is no ckpt file in {args_param.save_checkpoint_path}, "
                  f"current ckpt_files found is {ckpt_files} "
                  f"with pattern {ckpt_pattern}, so skip the loading.")
            return
        ckpt_files.sort(key=os.path.getmtime, reverse=True)
    ...
    # Define the dying gasp checkpoint loading method.
    # Security tip: The verification of paths, input parameters, and environment variables is involved.
    def get_exception_checkpoints(args_param):
        r"""
        Load checkpoint process.
        """
    
        print("======start exception checkpoint", flush=True)
        restore_ranks = os.getenv("RESTORE_RANKS")
        if not restore_ranks:
            return None
    
        restore_rank_list = list(map(int, restore_ranks.split(",")))
        ckpt_file_list = []
        ckpt_name = args_param.ckpt_name_prefix
        for ckpt_rank in restore_rank_list:
            ckpt_pattern = os.path.join(args_param.save_checkpoint_path,
                                        f"rank_{ckpt_rank}",
                                        f"{ckpt_name}*_breakpoint.ckpt")
            ckpt_files = glob.glob(ckpt_pattern)
            if not ckpt_files:
                print(
                    f"There is no ckpt file in {args_param.save_checkpoint_path}, "
                    f"current ckpt_files found is {ckpt_files} "
                    f"with pattern {ckpt_pattern}, so skip the loading.")
                return None
            ckpt_files.sort(key=os.path.getmtime, reverse=True)
            ckpt_file_list.append(ckpt_files[0])
        print(f"checkpoint file {ckpt_file_list}")
        return ckpt_file_list
    
    # Security tip: The verification of paths, input parameters, and environment variables is involved.
    def check_exception_checkpoints(ckpt_file_list):
        """
        Check exception checkpoints size.
        Args:
            ckpt_file_list: exception checkpoints
        Returns: result of exception checkpoints size check.
    
        """
        ckpt_size_list = []
        for ckpt_file in ckpt_file_list:
            ckpt_size_list.append(os.path.getsize(ckpt_file))
    
        if len(set(ckpt_size_list)) > 1:
            return False
        return True
    
    # Security tip: The verification of paths, input parameters, and environment variables is involved.
    def restore_exception_checkpoint(args_param, sink_size, dataset, model, network, epoch):
        """
        Restore exception checkpoint to training model.
        Args:
            args_param: model training parameters
            sink_size: model training sink size
            dataset: dataset used for training
            model: model
            network: pangu_alpha network
            epoch: training epoch
    
    
        Returns: load exception checkpont success or not.
    
    
        """
        restore_strategy_generator = RestoreStrategyGenerator()
        res_query = restore_strategy_generator.gen_fault_tolerance_strategy()
        if not res_query:
            return False
    
        restore_ranks, restore_dict = res_query
        print(f"restore ranks: {restore_ranks}, restore dict: {restore_dict}")
        if not restore_ranks:
            return False
    
        if not restore_dict:
               return False
    
        os.environ["RESTORE_RANKS"] = restore_ranks
        os.environ["RESTORE_RANKS_MAP"] = str(restore_dict)
    
        if os.getenv("RESTORE_RANKS") == "-1":
            return False
    
    
        ckpt_file_list = get_exception_checkpoints(args_param)
    
    
        restore_flag = False
        if ckpt_file_list:
            restore_flag = check_exception_checkpoints(ckpt_file_list)
    
    
        if not restore_flag:
            return False
    
    
        ckpt_name = args_param.ckpt_name_prefix
        restore_ranks_map = os.getenv("RESTORE_RANKS_MAP")
        if not restore_ranks_map:
            return False
    
    
        try:
            print("whether run into load process")
            restore_ranks_map_json = json.loads(restore_ranks_map)
            map_rank_id = D.get_rank()
            for key in restore_ranks_map_json.keys():
                key_list = list(key.split(","))
                if str(D.get_rank()) in key_list:
                    map_rank_id = restore_ranks_map_json.get(key)
    
    
            print(f"loading map rank id {map_rank_id}")
            ckpt_pattern = os.path.join(args_param.save_checkpoint_path,
                                        f"rank_{map_rank_id}",
                                        f"{ckpt_name}*breakpoint.ckpt")
            ckpt_files = glob.glob(ckpt_pattern)
            ckpt_files.sort(key=os.path.getmtime, reverse=True)
            print(f" checkpoint files {ckpt_files[0]}")
            param_dict = load_checkpoint(ckpt_files[0])
            print(f" checkpoint param dict epoch num {param_dict.get('epoch_num')}")
            if param_dict.get("epoch_num") and param_dict.get("step_num"):
                args_param.has_trained_epoches = int(
                    param_dict["epoch_num"].data.asnumpy())
                args_param.has_trained_steps = int(
                    param_dict["step_num"].data.asnumpy())
    
    
            # Load the checkpoint file.
            model.build(train_dataset=dataset, sink_size=sink_size, epoch=epoch)
            load_param_into_net(network, param_dict)
        except TypeError:
            return False
        else:
            return True
  5. Use the a800_vcjob.yaml file in the yamls folder of Code Repository to run the job. For details about how to modify YAML parameters, see YAML Parameters. In the YAML file of a delivered job, the environment variable name corresponding to metadata.name is mindx-dls-test, which should be the same as the job name. See the following content in bold.
    ...
    apiVersion: batch.volcano.sh/v1alpha1
    kind: Job
    metadata:
      name: mindx-dls-test                 
      namespace:  xxx                     
      labels:
        ring-controller.atlas: ascend-910   
    ...
          spec:
            terminationGracePeriodSeconds: 600 # For details, see Table 2.
            containers:
            - image: mindspore:b035        
              imagePullPolicy: IfNotPresent
              name: mindspore
              env:
              - name: mindx-dls-test        
                valueFrom:
                  fieldRef:
                    fieldPath: metadata.name
              - name: XDL_IP               
                valueFrom:
            ...

YAML Parameters

To use the rescheduling feature, you need to add fault-scheduling, terminationGracePeriodSeconds, and maxRetry to the YAML file of a delivered vcjob. The following table lists the configuration items of these parameters.

Table 1 Value list of fault-scheduling configuration items of the vcjob for resumable training

No.

Value

Description

1

grace

Job rescheduling enabled. Gracefully delete the original pod during the rescheduling. If the job still fails after 15 minutes, forcibly delete the original pod. For details, see Volcano Configuration. If the dying gasp solution is used, this configuration is required.

2

force

Job rescheduling enabled. Forcibly delete the original pod during the rescheduling.

3

off

The job does not use the fault rescheduling feature, and the maxRetry of Kubernetes still takes effect.

4

None (no fault-scheduling)

Same as off.

5

Other values

Same as off.

Table 2 Value list of terminationGracePeriodSeconds configuration items of the vcjob for dying gasp

No.

Value

Description

1

0 < terminationGracePeriodSeconds < grace-over-time

Duration from the time when the container receives the stop signal to the time when the container is forcibly stopped by the Kubernetes. The value must be greater than 0 and less than the value of grace-over-time in the volcano-*.yaml file. In addition, ensure that the CKPT file can be saved completely. Change the value as required. For details, see Container Lifecycle Hooks on the Kubernetes official website.

To use the resumable training function, you need to expand the memory. Add parameters based on the comments. In addition, the maxRetry mechanism needs to be used. The following is an example of the YAML template:

apiVersion: v1
kind: ConfigMap
metadata:
  name: rings-config-mindx-dls-test     
  namespace: vcjob                    
  labels:
    ring-controller.atlas: ascend-910
data:
  hccl.json: |
    {
        "status":"initializing"
    }
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: fault-config-mindx-dls-test
  namespace: vcjob 
data:
  fault-npus: |
    {
        "status":"initializing"
    }
---
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
  name: mindx-dls-test   # The value of this parameter must be consistent with the name of ConfigMap.
  namespace: vcjob       # Select a proper namespace as required. (The namespaces of ConfigMap and jobs must be the same.)
  labels:
    ring-controller.atlas: ascend-910  # The HCCL-Controller distinguishes scenarios with Ascend 910 and other processors configured.
    fault-scheduling: "force"
spec:
  minAvailable: 1
  schedulerName: volcano    # Use Volcano scheduler to schedule jobs.
  policies:
    - event: PodEvicted
      action: RestartJob
  plugins:
    ssh: []
    env: []
    svc: []
  maxRetry: 3
...
      spec:
        containers:
        - image: mindspore:b035         # Training framework image, which can be modified.
          imagePullPolicy: IfNotPresent
          name: mindspore
...
          command:
          - "/bin/bash"
          - "-c"
          - "cd /job/code/resnet/scripts; chmod +x train_start.sh; ./train_start.sh;" # Command for executing the training script. Ensure that related command and path exist on Docker.
          #args: [ "while true; do sleep 30000; done;"  ]                            # Comment out the previous line and use this line. You can manually run the training script in the container to facilitate debugging.
                                                                                     # The command is kubectl exec -it -n {namespace} {podname} bash.
          lifecycle:  # To use the dying gasp function, add the code in bold.
            preStop:
              exec:
                command: ["/bin/bash", "-c", "cd /job/code/resnet/scripts; bash pre_stop.sh"]
          resources:
            requests:
              huawei.com/Ascend910: 1                                                # Number of requested NPUs. The maximum value is 8. You can add lines to configure resources such as memory and CPU.
            limits:
              huawei.com/Ascend910: 1                                                # The value must be the same as that in requests.
...
       # Add the following lines for memory expansion of resumable training:
          volumeMounts:
          - mountPath: /dev/shm
            name: shm
        volumes:
        - name: shm
          emptyDir:
            medium: Memory
            sizeLimit: 16Gi
...