昇腾社区首页
EN
注册
开发者
下载

适配示例

本章节将指导用户step by step地完成断点续训的适配步骤。

  • 为保证优雅容错与进程级在线恢复功能的正常使用,请将K8s集群master节点与worker节点的时钟保持一致。
  • 断点续训展示的组件代码为开源代码,其中涉及到相关安全说明请参见安全说明
  • 下文中模型示例代码可能与实际版本存在差异,请以实际版本代码为准。

PyTorch场景适配示例(基于MindSpeed-LLM)

训练代码与数据集准备,可以参考MindSpeed-LLM使用指南。下面以单台Atlas 800T A2 训练服务器为例,说明具体操作步骤。

  1. 准备MindSpeed-LLM训练代码,脚本如下。
    mkdir -p /data/atlas_dls/public/code
    cd /data/atlas_dls/public/code
    git clone https://gitee.com/ascend/MindSpeed-LLM.git 
    git clone https://github.com/NVIDIA/Megatron-LM.git
    cd Megatron-LM
    git checkout core_v0.12.1
    cp -r megatron ../MindSpeed-LLM/
    cd ..
    cd MindSpeed-LLM
    git checkout master
     
    ## 准备MindSpeed源码,制作镜像的时候已经准备过了,可以直接拷贝,也可以重新拉取,具体checkout的commit id请参考MindSpeed-LLM仓库的安装指导
    git clone https://gitee.com/ascend/MindSpeed.git
    cd MindSpeed
    git checkout c99f34c0
    cd ..
    
    ## 创建必要的文件夹,后续使用
    mkdir alllogs
    mkdir dataset
    mkdir scripts
    mkdir yamls
    mkdir output
     
    ## 重命名MindSpeed-LLM为LLAMA2_for_PyTorch_2.7_code
    cd ..
    mv MindSpeed-LLM LLAMA2_for_PyTorch_2.7_code
  2. 准备llama2-7b模型词表,执行如下脚本。
    cd LLAMA2_for_PyTorch_2.7_code
    mkdir ./dataset/llama-2-7b-hf/
    cd ./dataset/llama-2-7b-hf/
    # 可以基于网页直接下载,也可以基于命令行下载
    wget https://huggingface.co/daryl149/llama-2-7b-hf/resolve/main/tokenizer.json 
    wget https://huggingface.co/daryl149/llama-2-7b-hf/resolve/main/tokenizer.model 
    wget https://huggingface.co/daryl149/llama-2-7b-hf/resolve/main/tokenizer_config.json 
    wget https://huggingface.co/daryl149/llama-2-7b-hf/resolve/main/config.json
    wget https://huggingface.co/daryl149/llama-2-7b-hf/resolve/main/generation_config.json
    wget https://huggingface.co/daryl149/llama-2-7b-hf/resolve/main/special_tokens_map.json
  3. 自行准备LLAMA2对应的llama2-7b数据集,示例使用enwiki20230101,执行如下脚本。
    ## 准备数据集
    cd LLAMA2_for_PyTorch_2.7_code/dataset/
    # 可以基于网页直接下载,也可以基于命令行下载
    wget https://huggingface.co/datasets/lsb/enwiki20230101/resolve/main/data/train-00000-of-00042-d964455e17e96d5a.parquet
  4. 预处理数据集,执行如下脚本。
    ## 预训练数据集处理方法,本步骤需要启动mindspeed-dl:v1,挂载第一步准备好的“LLAMA2_for_PyTorch_2.7_code”目录,在容器中执行
    docker run -it -v /data/atlas_dls/public/code/LLAMA2_for_PyTorch_2.7_code:/home/LLAMA2_for_PyTorch_2.7_code -v /usr/local/Ascend/driver:/usr/local/Ascend/driver -e ASCEND_VISIBLE_DEVICES=0-7 mindspeed-dl:v1 /bin/bash
    ## 在容器中执行如下命令,预处理数据集
    cd /home/LLAMA2_for_PyTorch_2.7_code
    export PYTHONPATH=/home/LLAMA2_for_PyTorch_2.7_code/MindSpeed:\$PYTHONPATH
    python ./preprocess_data.py \
        --input ./dataset/train-00000-of-00042-d964455e17e96d5a.parquet \
        --tokenizer-name-or-path ./dataset/llama-2-7b-hf \
        --tokenizer-type PretrainedFromHF \
        --handler-name GeneralPretrainHandler \
        --output-prefix ./dataset/enwiki \
        --json-keys text \
        --workers 8 \
        --log-interval 1000

    如果出现关于silu函数的报错,将LLAMA2_for_PyTorch_2.7_code/megatron/core/fusions/fused_bias_swiglu.py中的@jit_fuser注释掉再执行。

  5. 进入“mindxdl-deploy”仓库,根据mindxdl-deploy开源仓版本说明进入版本对应分支,获取“samples/train/resumable-training/fault-tolerance/without-ranktable/pytorch/llama2”目录下的train_start.sh文件,在管理节点构造成如下的目录结构。
    root@ubuntu:/data/atlas_dls/public/code/LLAMA2_for_PyTorch_2.7_code/scripts#
    scripts/
    └── train_start.sh
  6. 配置训练启动脚本train_start.sh,请根据实际情况进行修改。
    # 开启Elastic Agent侧进程级别重调度、进程级在线恢复、临终CheckPoint恢复功能
    export ELASTIC_PROCESS_RECOVER_ENABLE=1
    
    # 开启HCCL算子的重执行特性。重执行是指当执行通信算子时报SDMA或者RDMA CQE类型的错误,HCCL会尝试重新执行此通信算子。
    export HCCL_OP_RETRY_ENABLE="L0:0, L1:1, L2:1"  
    
    # 物理机上可以通信的网口,根据主节点高速网卡实际情况进行配置,如任务YAML中配置hostNetwork为false,则设置为eth0。示例基于Atlas 800T A2 训练服务器,如果使用的其他设备,请根据实际情况修改
    export GLOO_SOCKET_IFNAME=enp189s0f0
    # 如任务YAML中配置hostNetwork为false,则设置为eth0。示例基于Atlas 800T A2 训练服务器,如果使用的其他设备,请根据实际情况修改              
    export HCCL_SOCKET_IFNAME=enp189s0f0           
    
    # 设置CKPT保存目录,注意CKPT、日志文件等应挂载到宿主机
    LOAD_CHECKPOINT_PATH=/job/code/output/ckpt
    # 设置CKPT保存目录
    SAVE_CHECKPOINT_PATH=/job/code/output/ckpt 
     
    # 配置数据集路径
    DATA_PATH=/job/code/data/enwiki_text_document 
     
    # 配置词表路径
    TOKENIZER_MODEL=/job/code/model_from_hf/llama-2-7b-hf/tokenizer.model
     
    #(可选)自定义配置Elastic Agent运行日志的落盘路径
    mkdir -p /job/code/alllogs/$MINDX_TASK_ID/elasticlogs/elastic-log$XDL_IP-$RANK                # MINDX_TASK_ID为训练任务ID,使用XDL_IP、RANK区分不同节点Elastic Agent日志
    export ELASTIC_LOG_PATH=/job/code/alllogs/$MINDX_TASK_ID/elasticlogs/elastic-log$XDL_IP-$RANK               # 输入Elastic Agent组件运行日志的落盘路径
    #(可选)配置组件间grpc通信使用安全链接
    export ELASTIC_GRPC_SECURE_CONNECT=on                  # 安全链接开关,取值为"on"时,表示打开配置
    export ELASTIC_GRPC_SECURE_CERTIFICATES_PATH=/usr/security/cert   # 安全证书地址,请将/usr/security/cert替换为有效的安全证书地址
    • 若训练任务YAML中hostNetwork参数值为false,则需要将train_start.sh中的export GLOO_SOCKET_IFNAME的值改设置为eth0,其他代码保持不变。示例如下:
      export GLOO_SOCKET_IFNAME=eth0     # eth0是容器内可以通信的网口
      export HCCL_SOCKET_IFNAME=eth0
    • 无安全需求场景,无需配置组件间gRPC通信安全链接参数ELASTIC_GRPC_SECURE_CONNECT。安全链接开关打开,请同时配置安全证书地址;安全链接开关关闭,安全证书地址无需配置。
    • 若使用TaskD完成进程级别重调度、进程级在线恢复、进程级别原地恢复或弹性训练,还需拉起TaskD Manager。
      1. 创建manager.py文件,放在调用训练脚本时的当前目录下。manager.py文件内容如下所示。
        from taskd.api import init_taskd_manager, start_taskd_manager
        import os
        
        job_id=os.getenv("MINDX_TASK_ID")
        node_nums=XX         # 用户填入任务节点总数,int类型
        proc_per_node=XX     # 用户填入任务每个节点的训练进程数量,int类型
        
        init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node})
        start_taskd_manager()
      2. 在训练脚本中增加以下代码,拉起TaskD Manager,推荐将torchrun分布式参数--monitor_interval设置为1。

        在以下代码中,TASKD_SO_PATH和export LD_PRELOAD两条语句的作用是将安装TaskD后libtaskd.so的路径配置到环境变量LD_PRELOAD中。如果这两条语句配置不成功,可通过手动执行pip show taskd命令获取Location的值拼接上/taskd/python/cython_api/libs/libtaskd.so,然后通过export设置。

        sed -i '/import os/i import taskd.python.adaptor.patch' $(pip3 show torch | grep Location | awk -F ' ' '{print $2}')/torch/distributed/run.py
        TASKD_SO_PATH="$(pip show taskd | awk '/^Location: / {print $2"/taskd/python/cython_api/libs/libtaskd.so"}')"
        export LD_PRELOAD=$TASKD_SO_PATH:$LD_PRELOAD
        export TASKD_PROCESS_ENABLE="on"
        if [[ "${RANK}" == 0 ]]; then
            export MASTER_ADDR=${POD_IP}
            python manager.py &           # 具体执行路径由当前路径决定
        fi
            
        DISTRIBUTED_ARGS="...\
                          --monitor_interval 1 \
                          ..."
        torchrun $DISTRIBUTED_ARGS ...
      3. 修改任务YAML,新增容器端口,在所有的Pod下增加TaskD通信使用的端口9601。
        ...
        ports:                         
           - containerPort: 9601             
             name: taskd-port 
        ...
  7. 配置包含多种重调度级别的训练任务。获取训练任务YAML,该YAML中已经配置了Pod级别重调度、进程级别重调度、进程级在线恢复、弹性训练等。然后根据实际情况配置挂载卷的服务器IP地址等配置。
    cd LLAMA2_for_PyTorch_2.7_code/yamls
    wget https://gitcode.com/Ascend/mindxdl-deploy/blob/master/samples/train/resumable-training/fault-tolerance/without-ranktable/pytorch/llama2/yamls/pytorch_multinodes_acjob_910b.yaml 

    进程级别重调度、进程级在线恢复、弹性训练等训练进程级别的恢复与优雅容错不可同时存在。优雅容错的配置步骤请参见优雅容错模式

  8. (可选)如需使用进程级别重调度或进程级在线恢复,请在LLAMA2_for_PyTorch_2.7_code/mindspeed_llm/training/training.py代码中增加如下加粗内容

    torch_npu版本为7.0.RC1及以上版本时,无需执行本步骤。

     ... 
     
    class CustomFunction(torch.autograd.Function): 
      @staticmethod 
      def forward(ctx, input): 
          torch.cuda.set_stream(torch.cuda.default_stream()) 
          return input 
     
      @staticmethod 
      def backward(ctx, grad): 
          torch.cuda.set_stream(torch.cuda.default_stream()) 
          return grad 
     
    def streamHandler(): 
        input_tensor = torch.empty(1, dtype=torch.float32, device="npu", requires_grad=True) 
        grad_tensor = torch.empty(1, dtype=torch.float32, device="npu", requires_grad=True) 
        output_tensor = CustomFunction.apply(input_tensor) 
        output_tensor.backward(grad_tensor) 
     
    def pretrain(train_valid_test_dataset_provider, 
    ... 
        if args.do_train and args.train_iters > 0: 
                if args.enable_high_availability: 
                    from mindio_ttp.adaptor import tft_register_processor, tft_train 
                    from mindio_ttp.framework_ttp import tft_register_set_stream_handler 
                    tft_register_set_stream_handler(streamHandler) 
                    tft_register_processor(train_valid_test_dataset_provider, model_provider, model_type) 
                    iteration, num_floating_point_operations_so_far = tft_train(train_args, test_data_iterator_list) 
                else: 
                    iteration, num_floating_point_operations_so_far = train(*train_args) 
    ...

MindSpore场景适配示例(基于MindFormers)

训练代码与数据集准备,可以参考MindFormers文档。下面以单台Atlas 800T A2 训练服务器为例,说明具体操作步骤。

  1. 准备MindFormers代码仓,执行如下命令。
    mkdir -p /data/atlas_dls/public/code
    
    git clone https://gitee.com/mindspore/mindformers.git
    cd mindformers
    git checkout e6bd7da6
    
    mkdir dataset
    mkdir yamls
    cd ..
     
    # 将mindformers重命名为LLAMA2_for_MS_code
    mv mindformers LLAMA2_for_MS_code
  2. 准备数据集,示例使用7b数据集。
    cd LLAMA2_for_MS_code/dataset
    wget https://ascend-repo-modelzoo.obs.cn-east-2.myhuaweicloud.com/MindFormers/dataset/wikitext-2/wikitext-2-v1.zip
    wget https://ascend-repo-modelzoo.obs.cn-east-2.myhuaweicloud.com/MindFormers/llama2/tokenizer.model
    unzip wikitext-2-v1.zip
  3. 预处理数据集。
    ## 预训练数据集处理方法,本步骤需要启动mindformers-dl:v1,挂载第一步准备好的“LLAMA2_for_MS_code”目录,在容器中执行
    docker run -it -v /data/atlas_dls/public/code/LLAMA2_for_MS_code:/home/LLAMA2_for_MS_code -v /usr/local/Ascend/driver:/usr/local/Ascend/driver -e ASCEND_VISIBLE_DEVICES=0-3 mindformers-dl:v1 /bin/bash
    ## 在容器中执行如下命名,预处理数据集
    cd /home/LLAMA2_for_MS_code/mindformers/tools/dataset_preprocess/llama/
    python llama_preprocess.py \
    --dataset_type wiki \
    --input_glob /home/LLAMA2_for_MS_code/dataset/wikitext-2/wiki.train.tokens \
    --model_file /home/LLAMA2_for_MS_code/dataset/tokenizer.model \
    --seq_length 4096 \
    --output_file /home/LLAMA2_for_MS_code/dataset/wiki4096.mindrecord

    在执行以上步骤时,如果出现报错:from mindformers.tools.dataset_preprocess.llama.conversation import get_default_conv_template ModuleNotFoundError:No module named 'mindformers.tools.dataset preprocess',需要按照以下步骤进行处理。

    1. 执行以下命令启动容器。
      docker run -it -v /data/atlas_dls/public/code/LLAMA2_for_MS_code:/home/LLAMA2_for_MS_code -v /usr/local/Ascend/driver:/usr/local/Ascend/driver -e ASCEND_VISIBLE_DEVICES=0-3 mindformers-dl:v1 /bin/bash
    2. 将/home/LLAMA2_for_MS_code/mindformers/tools/dataset_preprocess/llama/llama_preprocess.py文件内的字符串from mindformers.tools.dataset_preprocess.llama.conversation import get_default_conv_template改成from conversation import get_default_conv_template。
    3. 执行pip uninstall mindformers命令,卸载mindformers。
    4. 在LLAMA2_for_MS_code目录下执行bash build.sh命令,重新安装MindFormers。
    5. 再执行数据预处理命令。
      cd /home/LLAMA2_for_MS_code/mindformers/tools/dataset_preprocess/llama/ 
      python llama_preprocess.py \ 
      --dataset_type wiki \ 
      --input_glob /home/LLAMA2_for_MS_code/dataset/wikitext-2/wiki.train.tokens \ 
      --model_file /home/LLAMA2_for_MS_code/dataset/tokenizer.model \ 
      --seq_length 4096 \ 
      --output_file /home/LLAMA2_for_MS_code/dataset/wiki4096.mindrecord
  4. 编辑启动脚本LLAMA2_for_MS_code/scripts/msrun_launcher.sh文件,配置日志路径、通信网口等。
    #!/bin/bash
    # Copyright 2024 Huawei Technologies Co., Ltd
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    # http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # ============================================================================
    
    # msrun Default Parameters
    
    # 各日志等级及路径按需配置
    export HCCL_ASYNC_ERROR_HANDLING=0
    # export ASCEND_GLOBAL_LOG_LEVEL=1
    export LOGLEVEL=DEBUG
    export GLOG_v=2
    export GLOG_log_dir=/job/code/alllogs/${MINDX_TASK_ID}/traininglog/msrun
    # export HCCL_ENTRY_LOG_ENABLE=1
    export HCCL_CONNECT_TIMEOUT=600
    
    # 物理机上可以通信的网口,根据主节点高速网卡实际情况进行配置,如任务YAML中配置hostNetwork为false,则设置为eth0。示例基于Atlas 800T A2 训练服务器,如果使用的其他设备,请根据实际情况修改
    export GLOO_SOCKET_IFNAME=enp189s0f0 
    # 如任务YAML中配置hostNetwork为false,则设置为eth0。示例基于Atlas 800T A2 训练服务器,如果使用的其他设备,请根据实际情况修改    
    export HCCL_SOCKET_IFNAME=enp189s0f0 
    # 配置集合通信起始端口,预防该端口被占用
    export HCCL_IF_BASE_PORT=64000   
    
    export PROCESS_RECOVER="on"      # 进程级别重调度及进程级在线恢复Elastic Agent侧开关
    export ELASTIC_PROCESS_RECOVER_ENABLE=1        # 使能环境变量使得taskd能与clusterd通信
    export MINDIO_FOR_MINDSPORE=1                  # 在MindSpore场景下使能MindIO
    export MS_ENABLE_TFT='{TTP:1,UCE:1,ARF:1,HCCE:1,RSC:1}'     # 分别开启临终遗言、片上内存故障进程级在线恢复、进程级重调度、网络故障进程级在线恢复和Pod级别重调度
    export MS_TFT_IP=$MS_SCHED_HOST                # 配置MindSpore所用MindIO controller地址
    export MS_TFT_PORT=8000                        # 配置MindSpore所用MindIO controller端口
    export HCCL_OP_RETRY_ENABLE="L0:0, L1:1, L2:1"  # 此环境变量用于配置是否开启HCCL算子的重执行特性。重执行是指当执行通信算子时报SDMA或者RDMA CQE类型的错误,HCCL会尝试重新执行此通信算子。
    
    # 以任务id分类,生成各类日志文件夹
    mkdir -p /job/code/alllogs/${MINDX_TASK_ID}
    mkdir -p /job/code/alllogs/${MINDX_TASK_ID}/traininglog/log-print/
    export LOG_MF_PATH=/job/code/alllogs/${MINDX_TASK_ID}/traininglog/mf/log$MF_LOG_SUFFIX
    # Add the suffix to the msrun_log
    LOG_DIR=/job/code/alllogs/${MINDX_TASK_ID}/traininglog/log-output/$MF_LOG_SUFFIX
    export ASCEND_PROCESS_LOG_PATH=/job/code/alllogs/$MINDX_TASK_ID/plogs/$MS_NODE_RANK     #设置plog落盘路径
    export TTP_LOG_PATH=/job/code/alllogs/$MINDX_TASK_ID/ttplogs/$MS_NODE_RANK        
    export TRAIN_LOG_PATH=/job/code/alllogs/$MINDX_TASK_ID/trainlogs/$MS_NODE_RANK
    
    source /usr/local/Ascend/ascend-toolkit/set_env.sh
    
    WORKER_NUM=$MS_WORKER_NUM
    LOCAL_WORKER=$MS_LOCAL_WORKER
    MASTER_ADDR=$MS_SCHED_HOST
    MASTER_PORT=$MS_SCHED_PORT
    NODE_RANK=$MS_NODE_RANK
    JOIN="True"
    CLUSTER_TIME_OUT=7200
    # export HCCL_BUFFSIZE=2 # HCCL memory usage
    
    # Set PYTHONPATH
    MF_SCRIPTS_ROOT=$(realpath "$(dirname "$0")")
    export PYTHONPATH=$MF_SCRIPTS_ROOT/../:$PYTHONPATH
    
    # Set the log suffix
    if [ -z "${MF_LOG_SUFFIX+x}" ] || [ "$MF_LOG_SUFFIX" == "" ]
    then
      MF_LOG_SUFFIX=$MF_LOG_SUFFIX
    else
      MF_LOG_SUFFIX=_$MF_LOG_SUFFIX
    fi
    
    # get the workspcace path
    WORKSPACE_PATH=$(pwd)
    
    # Add the suffix to the MF_LOG
    
    # Set the PLOG path
    if [ -z "${PLOG_REDIRECT_TO_OUTPUT+x}" ] || [ $PLOG_REDIRECT_TO_OUTPUT == False ]
    then
      echo "No change the path of plog, the path of plog is /root/ascend"
    else
      export ASCEND_PROCESS_LOG_PATH=$WORKSPACE_PATH/output/plog$MF_LOG_SUFFIX
      echo "PLOG_REDIRECT_TO_OUTPUT=$PLOG_REDIRECT_TO_OUTPUT, set the path of plog to $ASCEND_PROCESS_LOG_PATH"
    fi
    
    if [ $# != 1 ] && [ $# != 2 ] && [ $# != 6 ] && [ $# != 9 ]
    then
      echo "Usage Help: bash msrun_launcher.sh [EXECUTE_ORDER] For Default 8 Devices In Single Machine"
      echo "Usage Help: bash msrun_launcher.sh [EXECUTE_ORDER] [WORKER_NUM] For Quick Start On Multiple Devices In Single Machine"
      echo "Usage Help: bash msrun_launcher.sh [EXECUTE_ORDER] [WORKER_NUM] [MASTER_PORT] [LOG_DIR] [JOIN] [CLUSTER_TIME_OUT] For Multiple Devices In Single Machine"
      echo "Usage Help: bash msrun_launcher.sh [EXECUTE_ORDER] [WORKER_NUM] [LOCAL_WORKER] [MASTER_ADDR] [MASTER_PORT] [NODE_RANK] [LOG_DIR] [JOIN] [CLUSTER_TIME_OUT] For Multiple Devices In Multiple Machines"
      exit 1
    fi
    
    # Start Without Parameters For 8 Devices On Single Machine
    if [ $# == 1 ]
    then
      echo "No parameter is entered. Notice that the program will run on default 8 cards. "
      SINGLE_NODE=false
    else
      WORKER_NUM=$MS_LOCAL_WORKER
    fi
    
    # Check WORKER_NUM
    if [[ ! $WORKER_NUM =~ ^[0-9]+$ ]]; then
        echo "error: worker_num=$WORKER_NUM is not a number"
        exit 1
    fi
    
    # Quick Start For Multiple Devices On Single Machine
    if [ $# == 2 ]
    then
      LOCAL_WORKER=$WORKER_NUM
      SINGLE_NODE=true
    fi
    
    # Multiple Devices On Single Machine
    if [ $# == 6 ]
    then
      LOCAL_WORKER=$WORKER_NUM
      MASTER_PORT=$3
      LOG_DIR=$4
      JOIN=$5
      CLUSTER_TIME_OUT=$6
    
      SINGLE_NODE=true
    fi
    
    # Multiple Devices On Multiple Machine
    if [ $# == 9 ]
    then
      LOCAL_WORKER=$3
      MASTER_ADDR=$4
      MASTER_PORT=$5
      NODE_RANK=$6
      LOG_DIR=$7
      JOIN=$8
      CLUSTER_TIME_OUT=$9
    
      if [ $WORKER_NUM == $LOCAL_WORKER ]
      then
        echo "worker_num is equal to local_worker, Notice that task will run on single node."
        SINGLE_NODE=true
      else
        echo "worker_num=$WORKER_NUM, local_worker=$LOCAL_WORKER, \
         Please run this script on other nodes with different node_rank."
        SINGLE_NODE=false
      fi
    fi
    
    # Init msrun Command
    if [ $SINGLE_NODE == true ]
    then
      MSRUN_CMD="msrun --worker_num=$WORKER_NUM \
       --local_worker_num=$LOCAL_WORKER \
       --master_port=$MASTER_PORT \
       --log_dir=$LOG_DIR \
       --join=$JOIN \
       --cluster_time_out=$CLUSTER_TIME_OUT"
    else
      MSRUN_CMD="msrun --worker_num=$WORKER_NUM \
       --local_worker_num=$LOCAL_WORKER \
       --master_addr=$MASTER_ADDR \
       --master_port=$MASTER_PORT \
       --node_rank=$NODE_RANK \
       --log_dir=$LOG_DIR \
       --join=$JOIN \
       --cluster_time_out=$CLUSTER_TIME_OUT"
    fi
    
    EXECUTE_ORDER="$MSRUN_CMD $1 2>&1 |& tee  -a /job/code/alllogs/${MINDX_TASK_ID}/traininglog/log-print/node-$MS_NODE_RANK"
    
    
    ulimit -u unlimited
    
    echo "Running Command: $EXECUTE_ORDER"
    echo "Please check log files in ${WORKSPACE_PATH}/${LOG_DIR}"
    
    
    function check_return_code() {
        ret_code=$?
        if [[ ${ret_code} -ne 0 ]]; then
          logger "running job failed. exit code: ${ret_code}" | tee -a ${output_url}/log
          exit ${ret_code}
        fi
    }
    CKPT_PATH="./output/checkpoint"
    if [ -d "${CKPT_PATH}" ]
    then
        msrun --worker_num=$WORKER_NUM \
             --local_worker_num=$LOCAL_WORKER \
             --master_addr=$MASTER_ADDR \
             --master_port=$MASTER_PORT \
             --node_rank=$NODE_RANK \
             --log_dir=$LOG_DIR \
             --join=$JOIN \
             --cluster_time_out=$CLUSTER_TIME_OUT $1 --load_checkpoint="${CKPT_PATH}" --resume_training=true  2>&1  |& tee -a /job/code/alllogs/${MINDX_TASK_ID}/traininglog/log-print/node-$MS_NODE_RANK
    else  
         msrun --worker_num=$WORKER_NUM \
             --local_worker_num=$LOCAL_WORKER \
             --master_addr=$MASTER_ADDR \
             --master_port=$MASTER_PORT \
             --node_rank=$NODE_RANK \
             --log_dir=$LOG_DIR \
             --join=$JOIN \
             --cluster_time_out=$CLUSTER_TIME_OUT $1 --load_checkpoint="" --resume_training=false  2>&1  |& tee -a /job/code/alllogs/${MINDX_TASK_ID}/traininglog/log-print/node-$MS_NODE_RANK
    fi
    
    ST=${PIPESTATUS[0]}
    if [[ ${ST} -ne 0 ]]; then
        echo "process exit with exitcode:${ST}"
        logger "running job failed. exit code: $ret" | tee -a ${output_url}/log
        exit ${ST}
    fi
    • 若故障Pod进行任务重调度(断点续训)之后,训练日志丢失,可参考FAQ进行处理。
    • 若使用TaskD完成进程级别重调度、进程级在线恢复、进程级别原地恢复、借轨通信任务暂停与回切或在线压测,还需拉起TaskD Manager。
      1. 创建manager.py文件,放在调用训练脚本时的当前目录下,manager.py文件内容如下所示。
        from taskd.api import init_taskd_manager, start_taskd_manager
        import os
        
        job_id=os.getenv("MINDX_TASK_ID")
        node_nums=XX         # 用户填入任务节点总数,int类型
        proc_per_node=XX     # 用户填入任务每个节点的训练进程数量,int类型
        
        init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node})
        start_taskd_manager()
      2. 在训练脚本中增加以下代码拉起TaskD Manager。在以下代码中,前两条语句的作用是将安装TaskD后libtaskd.so的路径配置到环境变量LD_PRELOAD中。如果这两条语句配置不成功,可通过手动执行pip show taskd命令获取Location的值拼接上/taskd/python/cython_api/libs/libtaskd.so,然后通过export设置。
        TASKD_SO_PATH="$(pip show taskd | awk '/^Location: / {print $2"/taskd/python/cython_api/libs/libtaskd.so"}')"
        export LD_PRELOAD=$TASKD_SO_PATH:$LD_PRELOAD
        export TASKD_PROCESS_ENABLE="on"
        if [[ "${MS_SCHED_HOST}" == "${POD_IP}" ]]; then
            python manager.py &   # 具体执行路径由当前路径决定
        fi
        msrun ...
      3. 修改任务YAML,新增容器端口,在所有的Pod下增加TaskD通信使用的端口9601。
        ...
        ports:                         
           - containerPort: 9601             
             name: taskd-port 
        ...
  5. 修改模型参数配置YAML。打开LLAMA2_for_MS_code/configs/llama2/pretrain_llama2_7b.yaml文件,配置数据集路径、分布式并行参数等。请根据实际需要修改训练参数配置YAML中各字段。(注:使用Pod级重调度、进程级重调度、进程级在线恢复特性需要保证启动时CKPT目录下存在已保存的断点CKPT文件,且在训练使用的配置YAML中设置resume_training=true和load_checkpoint={CKPT目录})
    @@ -15,7 +15,7 @@ trainer:
    
     # runner config
     runner_config:
    -  epochs: 2
    +  epochs: 200
       batch_size: 1
       sink_mode: True
       sink_size: 1
    @@ -88,13 +88,14 @@ parallel:
       parallel_optimizer_config:
         gradient_accumulation_shard: False
         parallel_optimizer_threshold: 64
    +    optimizer_weight_shard_size: 2
     # default parallel of device num = 8 for Atlas 800T A2
     parallel_config:
    -  data_parallel: 8
    +  data_parallel: 64
       model_parallel: 1
       pipeline_stage: 1
       use_seq_parallel: False
    -  micro_batch_num: 1
    +  micro_batch_num: 16
       vocab_emb_dp: True
       gradient_aggregation_group: 4
     # when model parallel is greater than 1, we can set micro_batch_interleave_num=2, that may accelerate the train process.
    @@ -128,6 +129,8 @@ context:
       save_graphs: False
       save_graphs_path: "./graph"
       device_id: 0
    +  #ascend_config:
    +  #  parallel_speed_up_json_path: "/home/lby/workspace/mindformers/parallel_speed_up.json"
       graph_kernel_flags: "--disable_pass=cluster.floatstatus_fusion,preprocess.depend_elimination"
     # model config
     model:
    @@ -136,7 +139,7 @@ model:
         batch_size: 1 # add for increase predict
         seq_length: 4096
         hidden_size: 4096
    -    num_layers: 32
    +    num_layers: 4
         num_heads: 32
         vocab_size: 32000
         multiple_of: 256
  6. 拷贝LLAMA2_for_MS_code/configs/llama2/pretrain_llama2_7b.yaml为LLAMA2_for_MS_code/configs/llama2/no_resume_pretrain_llama2_7b.yaml,修改该YAML中的以下字段。
    在使用编译缓存的情况下,不会生成strategy文件,运行前请删除编译缓存。
     src_strategy_path_or_dir: './output/strategy'  
     resume_training: False
     load_checkpoint: ''   
  7. (可选)使用临终CKPT的场景,在保存CKPT后通过Pod级别重调度加载CKPT,需修改文件LLAMA2_for_MS_code/mindformers/trainer/ trainer.py,增加如下加粗内容:
    …
    from mindspore.dataset.engine.datasets import BatchDataset, RepeatDataset, Dataset
    from mindspore.communication.comm_func import barrier
    from mindformers.core.parallel_config import build_parallel_config \
    …
    …
        def _check_config_rules(self):
            """Check config rules."""
            self._adjust_resume_training_if_ckpt_path_invalid()
            barrier()
     
            if self.config.auto_trans_ckpt and self.config.load_ckpt_format == 'ckpt':
    …
    …
            if isinstance(self.config.resume_training, str) and \
                    self.config.load_checkpoint and os.path.isfile(self.config.load_checkpoint):
                logger.warning(f"`resume_training={self.config.resume_training}` is not valid "
                               "when `load_checkpoint` is a file path")
                self.config.resume_training = True
     
        def _adjust_resume_training_if_ckpt_path_invalid(self):
            """Disable resume_training if checkpoint path is empty or invalid."""
            if isinstance(self.config.resume_training, bool) and self.config.resume_training:
                try:
                    p = Path(self.config.load_checkpoint).resolve()
                except TypeError as e:
                    raise TypeError(f"`load_checkpoint` should be a string, "
                                    f"but got type {type(self.config.load_checkpoint)}.") from e
                
                if self.config.load_checkpoint:
                    if p.is_dir() and not any(p.iterdir()):
                        self.config.resume_training = False
                        self.config.load_checkpoint = ""
                        logger.warning(
                            "The `load_checkpoint` is an empty path while the `resume_training` is True. "
                            "Hence `resume_training` and `load_checkpoint` is changed to False "
                            "and '' respectively, and randomly initialized weights will be used."
                        )
                else:
                    self.config.resume_training = False
                    self.config.load_checkpoint = ""
                    logger.warning(
                        "The `load_checkpoint` is '' while the `resume_training` is True. "
                        "Hence `resume_training` and `load_checkpoint` is changed to False and '' respectively, "
                        "and randomly initialized weights will be used."
                    )
     
        def _check_args_task_and_model(self):
            """Check args, task and model."""
            # get support model names of task
    …

    修改LLAMA2_for_MS_code/configs/llama2/pretrain_llama2_7b.yaml文件中的如下字段:

    resume_training: True 
    load_checkpoint: './output/checkpoint'   
  8. 准备任务训练YAML,根据实际情况修改挂载卷的服务器IP地址等配置。YAML中各参数继承PyTorch场景。
    cd LLAMA2_for_MS_code/yamls
    wget https://gitcode.com/Ascend/mindxdl-deploy/blob/master/samples/train/resumable-training/fault-tolerance/ranktable/mindspore/llama2/yamls/ms_multinodes_acjob_910b.yaml
    修改启动命令为:
                  command:                           # training command, which can be modified
                    - /bin/bash
                    - -c
                    - |
                     cd /job/code/;bash /job/code/scripts/msrun_launcher.sh "run_mindformer.py --config configs/llama2/pretrain_llama2_7b.yaml --train_dataset_dir {数据集的实际路径}/wiki4096/wiki4096.mindrecord --use_parallel True --run_mode train"

强化学习后训练场景适配示例(基于Verl)

MindCluster仅支持Job级别重调度。Verl的训练任务被Ray集群所管理,为适配MindCluster的Ascend Job任务部署,每个Worker节点上部署一个Pod,Pod内承载该Ray集群上的所有进程。Ray集群的head节点根据Ascend Operator注入的环境变量RANK=0所在的节点决定。RANK=0节点的Pod启动Ray集群,提交Verl后训练任务,其他Worker节点的Pod加入Ray集群。最后所有节点都检测提交的训练任务是否存在异常。

  • 若存在异常,则以非0退出,Volcano感知到业务异常触发Job级别重调度。
  • 若没有异常且任务已结束,则以0退出。

下面以两台Atlas 900 A3 SuperPoD 超节点为例,说明具体操作步骤。

  1. 模型权重转换,将HuggingFace模型转换为Megatron模型,可参考Verl模型转化脚本
    # 启动容器,具体模型路径根据实际情况修改
    docker run -it \
    -v /qwen30b/Qwen3-30B-A3B-Instruct-2507:/qwen30b/Qwen3-30B-A3B-Instruct-2507 \
    -v /usr/local/Ascend/driver:/usr/local/Ascend/driver \
    -e ASCEND_VISIBLE_DEVICES=0-15 \
    verl:v1 /bin/bash
     
    # 执行权重转化
    cd ~/verl
    python scripts/converter_hf_to_mcore.py \
    --hf_model_path /qwen30b/Qwen3-30B-A3B-Instruct-2507 \
    --output_path /qwen30b/Qwen3-30B-A3B-Instruct-Mcore \

    若出现如下错误,则先执行如下命令:

    export LD_PRELOAD="/usr/local/lib/python3.10/dist-packages/sklearn/utils/../../scikit_learn.libs/libgomp-947d5fa1.so.1.0.0"

  2. 构建Verl的Qwen3 30B MoE的训练脚本。其中推理后端为vLLM,训练后端为Megatron。

    获取脚本示例run_dapo_qwen3_30b_a3b_megatron.sh,并将其放置到verl路径中examples_npu下。同时在“examples_npu/config”路径下创建两个文件dapo_trainer-megatron.yaml和runtime_env.yaml,其内容如下:

    • dapo_trainer-megatron.yaml
      # examples_npu/config/dapo_trainer-megatron.yaml
      hydra:
        searchpath:
          - file://verl/trainer/config
      defaults:
        - ppo_megatron_trainer
        - _self_
      data:
        gen_batch_size: ${data.train_batch_size}
      reward_model:
        reward_manager: dapo
        overlong_buffer: 
          enable: False # We try to avoid forgetting to set enable
          len: 0
          penalty_factor: 0.0
          log: False
      algorithm:
        filter_groups:
          _target_: verl.trainer.config.FilterGroupsConfig
          enable: False # We try to avoid forgetting to set enable
          metric: null # acc / score / seq_reward / seq_final_reward / ...
          max_num_gen_batches: 0 # Non-positive values mean no upper limit
      trainer:
        project_name: verl-dapo
    • runtime_env.yaml
      # examples_npu/config/runtime_env.yaml
      working_dir: ./
      excludes: ["/.git/"]
      env_vars:
        HCCL_EXEC_TIMEOUT: "7200"
        HCCL_CONNECT_TIMEOUT: "7200"
        VLLM_USE_V1: "1"
        VLLM_VERSION: "0.9.1"
        HCCL_IF_BASE_PORT: "23999"
        HCCL_ASYNC_ERROR_HANDLING: "0"
        P2P_HCCL_BUFFSIZE: "20"
  3. 构建适配MindCluster的Ray启动脚本。在每台Worker节点上准备好Ray启动脚本,放到两台Atlas 900 A3 SuperPoD 超节点上。其中的网卡信息需根据实际情况配置,其余脚本可以保持不变。

    获取脚本示例start.sh,并将脚本放置到verl目录下。

  4. 获取准备任务YAML中的verl-resche.yaml,根据实际情况修改其中的参数,然后执行如下命令启动任务。
    kubectl apply -f verl-resche.yaml

    启动任务后,会显示如下迭代信息: