开发者
资源

适配示例

本章节根据GPT-3模型进行断点续训特性适配,提供如下脚本适配示例。

  • 断点续训展示的组件代码为开源代码,其中涉及到相关安全说明请参见安全说明
  • 下文中模型示例代码可能与实际版本存在差异,请以实际版本代码为准。
表1 断点续训脚本适配

步骤

示例

准备模型代码及数据集

参考GPT-3模型适配示例的步骤1-步骤7

适配模型脚本重调度功能

Job级别重调度

参考GPT-3模型适配示例的步骤8获取示例脚本。

Pod级别重调度

参考GPT-3模型适配示例的步骤9获取示例脚本、(可选)步骤10配置自定义参数。

(可选)配置优雅容错功能

参考(可选)配置优雅容错功能

(可选)配置临终CheckPoint恢复功能

参考(可选)配置临终CheckPoint恢复功能

GPT-3模型适配示例

  1. 依次执行以下命令,拉取Megatron源码仓代码。
    git clone https://github.com/NVIDIA/Megatron-LM.git 
    cd Megatron-LM 
    git checkout 285068c8108e0e8e6538f54fe27c3ee86c5217a2
  2. 依次执行以下命令,下载并安装megatron-npu。
    git clone https://gitee.com/ascend/Megatron-LM.git megatron_npu 
    cd megatron_npu 
    pip install -e .
  3. 执行以下命令,用megatron_npu中的pretrain_gpt.py替换Megatron-LM中的pretrain_gpt.py。
    cp tests_gpt/pretrain_gpt.py  <path_to_Megatron-LM>/
  4. 自行准备GPT-3对应的数据集,使用时请遵守对应规范。
  5. 管理员用户上传数据集到存储节点。
    1. 进入“/data/atlas_dls/public”目录,将数据集上传到任意位置,如“/data/atlas_dls/public/dataset/GPT-3/enwiki”
      root@ubuntu:/data/atlas_dls/public/dataset/GPT-3/enwiki# pwd
      回显示例如下:
      /data/atlas_dls/public/dataset/GPT-3/enwiki
    2. 执行du -sh命令,查看数据集大小。
      root@ubuntu:/data/atlas_dls/public/dataset/GPT-3/enwiki# du -sh
      回显示例如下:
      90G
  6. 2中下载的训练代码解压到本地,将解压后的训练代码中“Megatron-LM”目录重命名为“GPT-3_for_PyTorch_1.11_code/”目录。
  7. 将GPT-3_for_PyTorch_1.11_code文件上传至环境的“/data/atlas_dls/public/code/”路径下。
  8. 获取脚本并构造Job级别重调度目录结构。重调度默认提供Job级别重调度
    • Ascend Operator

      进入“MindXDL-deploy”仓库,选择“branch_v6.0.0-RC3”分支,获取“samples/train/resumable-training/fault-rescheduling/withoutRanktable/pytorch/gpt-3”目录中的“train_start.sh”文件,在训练代码中创建“scripts”目录,在管理节点构造成如下的目录结构。

      root@ubuntu:/data/atlas_dls/public/code/GPT-3_for_PyTorch_1.11_code/scripts#
      scripts/
      └── train_start.sh
    • HCCL Controller
      进入“MindXDL-deploy”仓库,选择“branch_v6.0.0-RC3”分支,获取“samples/train/resumable-training/fault-rescheduling/withRanktable/pytorch/gpt-3”目录中的train_start.sh、pre_stop.sh、utils.sh和rank_table.sh文件,在管理节点构造成如下的目录结构。
      root@ubuntu:/data/atlas_dls/public/code/GPT-3_for_PyTorch_1.11_code/scripts#
      scripts/
      ├── rank_table.sh
      ├── utils.sh
      └── train_start.sh
  9. 获取脚本并构造Pod级别重调度目录结构。
    • Ascend Operator

      进入“MindXDL-deploy”仓库,选择“branch_v6.0.0-RC3”分支,获取“samples/train/resumable-training/fault-tolerance/without-ranktable/pytorch/gpt-3”目录中的train_start.sh文件,在训练代码中创建“scripts”目录,在管理节点构造成如下的目录结构。

      root@ubuntu:/data/atlas_dls/public/code/GPT-3_for_PyTorch_1.11_code/scripts#
      scripts/
      └── train_start.sh
    • HCCL Controller
      进入“MindXDL-deploy”仓库,选择“branch_v6.0.0-RC3”分支,获取“samples/train/resumable-training/fault-tolerance/ranktable/pytorch/gpt-3”目录中的train_start.sh、utils.sh和rank_table.sh文件,在管理节点构造成如下的目录结构。
      root@ubuntu:/data/atlas_dls/public/code/GPT-3_for_PyTorch_1.11_code/scripts#
      scripts/
      ├── rank_table.sh
      ├── utils.sh
      └── train_start.sh
  10. (可选)配置Pod级别重调度功能的触发次数和时间间隔。可以在train_start.sh脚本内的DISTRIBUTED_ARGS字段中新增max_restarts和monitor_interval参数,示例如下。使用Job级别重调度请跳过本步骤。
    ...
        DISTRIBUTED_ARGS="--nproc_per_node $GPUS_PER_NODE --nnodes $NNODES --node_rank $NODE_RANK --master_addr $MASTER_ADDR --master_port $MASTER_PORT --max_restarts 5 --monitor_interval 10 "
    ...

    参数说明:

    • max_restarts:配置容器内最大允许触发的故障次数,取值为整数。超出次数后PyTorch训练进程会直接退出训练,不配置该参数时默认为32767次,超过芯片发生故障次数(多个芯片同时发生故障时故障次数会增加),训练进程将会退出。
    • monitor_interval:配置监测训练进程状态的时间间隔,单位为秒,取值为整数。不配置该参数时默认为30秒。

(可选)配置优雅容错功能

优雅容错为故障处理的高级功能,该功能可在用户没有备用训练资源或者期望设备恢复时,尝试对故障芯片进行自动恢复。使用优雅容错功能的适配说明如下。

  • 需保证模型脚本可以正常进行训练。
  • 需配置优雅容错所需的组件Ascend Device Plugin,可参考(可选)配置组件
  1. 配置yaml,参考yaml示例配置ConfigMap和挂载字段。
  2. 适配模型脚本并启动训练。
    • PyTorch框架的模型,已在构建镜像时为PyTorch框架引入进程管理能力,使用torchrun方式启动训练即可。
    • MindSpore框架的模型。
      1. 进入“MindXDL-deploy”仓库,选择“branch_v6.0.0-RC3”分支,获取“samples/train/resumable-training/fault-tolerance/ranktable/mindspore/resnet50”目录中的“reset_process.py”文件。
      2. 修改模型启动脚本文件,将修改后的脚本文件和准备好的“reset_process.py”文件放在同一目录下,即可启动训练。模型启动脚本文件修改示例如下。
        ...
        # Launch Command 在模型原本的启动命令后,新增&,使得训练主进程可以在后台执行
         ${DLS_PROGRAM_EXECUTOR} ${run_file_path}${boot_file} ${train_param} --run_distribute=True --device_num=${MS_LOCAL_WORKER} --parameter_server=False --device_target=Ascend --output_dir=${output_url} &> ${output_url}/worker_$i.log &  
        train_pid=$!     #获取训练进程的pid
        tail -f ${output_url}/log &
        old_log_pid=$!
        # 将以下命令复制在启动脚本里
        python -u reset_process.py -p "${train_pid}" & #将训练进程的pid传给监测进程
        reset_pid=$!
        wait ${train_pid}      #等待训练进程结束获取其退出码
        exit_code=$?
        if [ ${exit_code} -eq 0 ]; then 
          kill -15 ${reset_pid}     #如果训练进程正常退出则向监测进程发送-15信号
          echo "training finished."
          exit ${exit_code} 
        else
          if [ -d "训练进程工作目录"]; then
             touch "训练进程工作目录"/newlog           # 创建新训练进程的日志
             tail -f "训练进程工作目录" &              # 查看新训练进程的日志
          fi
          kill -9 ${old_log_pid}
          wait ${reset_pid}       # 如果训练进程异常退出则等待监测进程结束
          exit_code=$?
        fi
        exit ${exit_code}

(可选)配置临终CheckPoint恢复功能

临终CheckPoint恢复为训练重启的高级功能,该功能可以使用临终ckpt文件继续训练,缩短训练时间。
  • 临终遗言功能需要根据模型并行策略进行保存,若数据并行域数量等于1,则不支持使用临终遗言功能。
  • 当前临终遗言功能仅支持基于Megatron框架的模型,非Megatron框架的模型不支持使用临终遗言功能。
  • 单节点任务出现节点故障时,无法保存临终ckpt。
  • 仅支持在训练进程异常或者故障时,使用临终遗言,保存临终ckpt文件。
  • 更多临终遗言涉及到的周边约束说明,请参见《故障恢复加速》的“约束与限制”章节
  1. 修改“/data/atlas_dls/public/code/GPT-3_for_PyTorch_1.11_code/megatron”目录下的training.py文件,修改内容如下加粗字段所示。
    ...
    from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP
    # 新增以下加粗字段
    import os
    import mindio_ttp.framework_ttp as ttp
    import mindio_ttp.megatron_ttp as mega
    
    from megatron import get_args
    ...
    
    ...
    def setup_model_and_optimizer(model_provider_func,
                                  model_type,
                                  no_wd_decay_cond=None,
                                  scale_lr_cond=None,
                                  lr_mult=1.0):
        ...
        if args.load is not None:
            timers = get_timers()
            timers('load-checkpoint', log_level=0).start(barrier=True)
            #删除以下加粗字段
            args.iteration = load_checkpoint(model, optimizer, opt_param_scheduler)
            #新增以下加粗字段
            args.iteration = mega.load_checkpoint(model, optimizer, opt_param_scheduler)
            timers('load-checkpoint').stop(barrier=True)
            timers.log(['load-checkpoint'])
    ...
    
    ...
    def train_step(forward_step_func, data_iterator,
                   model, optimizer, opt_param_scheduler):
        """Single training step."""
        ...
        # Reduce gradients.
        optimizer.reduce_model_grads(args, timers)
        # all rank need to barrier
        #新增以下加粗字段
        torch.distributed.barrier()
        ...
        timers('optimizer').stop()
    ...
        #新增以下加粗字段
    @ttp.ttp_to_persist
    def train(forward_step_func, model, optimizer, opt_param_scheduler,
              train_data_iterator, valid_data_iterator,
              process_non_loss_data_func):
        """Train the model function."""
        ...
        # Tracking loss.
        total_loss_dict = {}
        # begin to init ttp processor & controller
        #新增以下加粗字段
        cur_rank = torch.distributed.get_rank()
        world_size: int = torch.distributed.get_world_size()
        masterIp = os.getenv('MASTER_ADDR')
        port = int(os.getenv('TTP_PORT'))
        dp_ranks = mpu.get_data_parallel_all_rank()
        if cur_rank == 0:
           controller_ip = os.getenv('XDL_IP')
           ttp.ttp_init_controller(cur_rank, world_size, len(dp_ranks)//2, False)
           ttp.ttp_start_controller(controller_ip, port)
        ttp.ttp_init_processor(cur_rank, dp_ranks, len(dp_ranks), world_size, len(dp_ranks)//2, False)
        ttp.ttp_start_processor(masterIp, port)
        ttp.ttp_register_save_ckpt_handler(mega.ttp_save_checkpoint)
        ttp.ttp_register_rename_handler(mega.rename_save_result)
         ...
         iteration += 1
        #新增以下加粗字段
         ttp.ttp_set_ckpt_args((iteration, model, optimizer, opt_param_scheduler))
         args.consumed_train_samples += mpu.get_data_parallel_world_size() *  \
                                           args.micro_batch_size * \
                                           get_num_microbatches()
         ...
    ...
  2. 由于框架特性,模型开始训练前15步无优化器更新,如15步内故障不能保存,需要修改只有优化器更新才会上报训练状态的代码逻辑。用户可以修改“/data/atlas_dls/public/code/GPT-3_for_PyTorch_1.11_code/megatron/optimizer/”目录下的optimizer.py文件,新增如下加粗字段。
    ...
    from torch._utils import _flatten_dense_tensors, _unflatten_dense_tensors
    # 新增如下加粗字段
    import mindio_ttp.framework_ttp as ttp
    # 修改class MixedPrecisionOptimizer(MegatronOptimizer)类中的字段
            timers('optimizer-inner-step', log_level=1).start(
                barrier=args.barrier_with_L1_time)
            # 新增如下加粗字段
            ttp.ttp_start_updating_os(-1)
            self.optimizer.step()
            # 新增如下加粗字段
            if hasattr(self, 'increase_step'):
                self.increase_step()
            ttp.ttp_end_updating_os(self.current_step)
  3. 修改“/data/atlas_dls/public/code/GPT-3_for_PyTorch_1.11_code/megatron/optimizer”目录下的__init__.py文件,修改内容如下加粗字段所示。
    ...
    from apex.optimizers import FusedAdam as Adam
    from apex.optimizers import FusedSGD as SGD
    # 新增以下加粗字段
    from mindio_ttp.megatron_ttp import ReplicatedOptimizer
    from megatron import get_args
    ...
            # Megatron optimizer.
            # 删除以下加粗字段
            opt_ty = DistributedOptimizer
            # 新增以下加粗字段
            opt_ty = ReplicatedOptimizer \
                if args.use_distributed_optimizer else \
                Float16OptimizerWithFloat16Params
    ...
  4. 修改“/data/atlas_dls/public/code/GPT-3_for_PyTorch_1.11_code/megatron/core/”目录下的parallel_state.py文件,新增如下加粗字段所示。
    ...
    # Memory buffers to avoid dynamic memory allocation
    _GLOBAL_MEMORY_BUFFER = None
    all_data_parallel_group_ranks = None
        ...
        global _DATA_PARALLEL_GLOBAL_RANKS
        global all_data_parallel_group_ranks
        assert _DATA_PARALLEL_GROUP is None, 'data parallel group is already initialized'
        all_data_parallel_group_ranks = []
        ...
    
    def get_data_parallel_src_rank():
        """Calculate the global rank corresponding to the first local rank
        in the data parallel group."""
        assert _DATA_PARALLEL_GLOBAL_RANKS is not None, \
            "Data parallel group is not initialized"
        return _DATA_PARALLEL_GLOBAL_RANKS[0]
    
    def get_data_parallel_all_rank():
        assert _DATA_PARALLEL_GLOBAL_RANKS is not None, \
            "Data parallel group is not initialized"
        return _DATA_PARALLEL_GLOBAL_RANKS
    
    def get_all_data_parallel_ranks(): 
        return all_data_parallel_group_ranks
    ...
  5. 进入制作PyTorch框架镜像章节制作的训练镜像中,依次执行以下命令,查找torchrun的路径。
    bash 镜像名称
    which torchrun
    回显示例如下:
    /usr/local/python3.8.3/bin/torchrun
  6. 修改torchrun文件,新增如下加粗部分,修改后将容器保存为新的训练镜像。
    ...
    import sys
    import mindio_ttp.framework_ttp
    from torch.distributed.run import main
    ...
  7. 修改“/data/atlas_dls/public/code/GPT-3_for_PyTorch_1.11_code/scripts”目录下的train_start.sh文件,修改内容如下加粗字段所示。
    ...
    # env for breakpoint ckpt
    export RESUME_MODE_ENABLE=1
    #新增以下加粗字段
    export TTP_OT=360
    ...
    
      server_id=${RANK}
      logger "server id is: ""${server_id}"
      DISTRIBUTED_ARGS="--nproc_per_node $LOCAL_WORLD_SIZE --nnodes $server_count --node_rank $RANK --master_addr $MASTER_ADDR --master_port $MASTER_PORT"
      # 删除以下加粗代码
      ${DLS_PROGRAM_EXECUTOR} -m torch.distributed.launch $DISTRIBUTED_ARGS ${boot_file_path}${boot_file} ${train_param} &> ${output_url}/log &    
      # 新增以下加粗代码
      torchrun $DISTRIBUTED_ARGS ${boot_file_path}${boot_file} ${train_param} && tee ${output_url}/log 
      check_return_code
    ...