适配示例
本章节将指导用户step by step地完成断点续训的适配步骤。
- 为保证优雅容错与进程级在线恢复功能的正常使用,请将K8s集群master节点与worker节点的时钟保持一致。
- 断点续训展示的组件代码为开源代码,其中涉及到相关安全说明请参见安全说明。
- 下文中模型示例代码可能与实际版本存在差异,请以实际版本代码为准。
PyTorch场景适配示例(基于MindSpeed-LLM)
训练代码与数据集准备,可以参考MindSpeed-LLM使用指南。下面以单台Atlas 800T A2 训练服务器为例,说明具体操作步骤。
- 准备MindSpeed-LLM训练代码,脚本如下。
mkdir -p /data/atlas_dls/public/code cd /data/atlas_dls/public/code git clone https://gitee.com/ascend/MindSpeed-LLM.git git clone https://github.com/NVIDIA/Megatron-LM.git cd Megatron-LM git checkout core_v0.12.1 cp -r megatron ../MindSpeed-LLM/ cd .. cd MindSpeed-LLM git checkout master ## 准备MindSpeed源码,制作镜像的时候已经准备过了,可以直接拷贝,也可以重新拉取,具体checkout的commit id请参考MindSpeed-LLM仓库的安装指导 git clone https://gitee.com/ascend/MindSpeed.git cd MindSpeed git checkout c99f34c0 cd .. ## 创建必要的文件夹,后续使用 mkdir alllogs mkdir dataset mkdir scripts mkdir yamls mkdir output ## 重命名MindSpeed-LLM为LLAMA2_for_PyTorch_2.7_code cd .. mv MindSpeed-LLM LLAMA2_for_PyTorch_2.7_code
- 准备llama2-7b模型词表,执行如下脚本。
cd LLAMA2_for_PyTorch_2.7_code mkdir ./dataset/llama-2-7b-hf/ cd ./dataset/llama-2-7b-hf/ # 可以基于网页直接下载,也可以基于命令行下载 wget https://huggingface.co/daryl149/llama-2-7b-hf/resolve/main/tokenizer.json wget https://huggingface.co/daryl149/llama-2-7b-hf/resolve/main/tokenizer.model wget https://huggingface.co/daryl149/llama-2-7b-hf/resolve/main/tokenizer_config.json wget https://huggingface.co/daryl149/llama-2-7b-hf/resolve/main/config.json wget https://huggingface.co/daryl149/llama-2-7b-hf/resolve/main/generation_config.json wget https://huggingface.co/daryl149/llama-2-7b-hf/resolve/main/special_tokens_map.json
- 自行准备LLAMA2对应的llama2-7b数据集,示例使用enwiki20230101,执行如下脚本。
## 准备数据集 cd LLAMA2_for_PyTorch_2.7_code/dataset/ # 可以基于网页直接下载,也可以基于命令行下载 wget https://huggingface.co/datasets/lsb/enwiki20230101/resolve/main/data/train-00000-of-00042-d964455e17e96d5a.parquet
- 预处理数据集,执行如下脚本。
## 预训练数据集处理方法,本步骤需要启动mindspeed-dl:v1,挂载第一步准备好的“LLAMA2_for_PyTorch_2.7_code”目录,在容器中执行 docker run -it -v /data/atlas_dls/public/code/LLAMA2_for_PyTorch_2.7_code:/home/LLAMA2_for_PyTorch_2.7_code -v /usr/local/Ascend/driver:/usr/local/Ascend/driver -e ASCEND_VISIBLE_DEVICES=0-7 mindspeed-dl:v1 /bin/bash ## 在容器中执行如下命令,预处理数据集 cd /home/LLAMA2_for_PyTorch_2.7_code export PYTHONPATH=/home/LLAMA2_for_PyTorch_2.7_code/MindSpeed:\$PYTHONPATH python ./preprocess_data.py \ --input ./dataset/train-00000-of-00042-d964455e17e96d5a.parquet \ --tokenizer-name-or-path ./dataset/llama-2-7b-hf \ --tokenizer-type PretrainedFromHF \ --handler-name GeneralPretrainHandler \ --output-prefix ./dataset/enwiki \ --json-keys text \ --workers 8 \ --log-interval 1000
如果出现关于silu函数的报错,将LLAMA2_for_PyTorch_2.7_code/megatron/core/fusions/fused_bias_swiglu.py中的@jit_fuser注释掉再执行。
- 进入“mindxdl-deploy”仓库,根据mindxdl-deploy开源仓版本说明进入版本对应分支,获取“samples/train/resumable-training/fault-tolerance/without-ranktable/pytorch/llama2”目录下的train_start.sh文件,在管理节点构造成如下的目录结构。
root@ubuntu:/data/atlas_dls/public/code/LLAMA2_for_PyTorch_2.7_code/scripts# scripts/ └── train_start.sh
- 配置训练启动脚本train_start.sh,请根据实际情况进行修改。
# 开启Elastic Agent侧进程级别重调度、进程级在线恢复、临终CheckPoint恢复功能 export ELASTIC_PROCESS_RECOVER_ENABLE=1 # 开启HCCL算子的重执行特性。重执行是指当执行通信算子时报SDMA或者RDMA CQE类型的错误,HCCL会尝试重新执行此通信算子。 export HCCL_OP_RETRY_ENABLE="L0:0, L1:1, L2:1" # 物理机上可以通信的网口,根据主节点高速网卡实际情况进行配置,如任务YAML中配置hostNetwork为false,则设置为eth0。示例基于Atlas 800T A2 训练服务器,如果使用的其他设备,请根据实际情况修改 export GLOO_SOCKET_IFNAME=enp189s0f0 # 如任务YAML中配置hostNetwork为false,则设置为eth0。示例基于Atlas 800T A2 训练服务器,如果使用的其他设备,请根据实际情况修改 export HCCL_SOCKET_IFNAME=enp189s0f0 # 设置CKPT保存目录,注意CKPT、日志文件等应挂载到宿主机 LOAD_CHECKPOINT_PATH=/job/code/output/ckpt # 设置CKPT保存目录 SAVE_CHECKPOINT_PATH=/job/code/output/ckpt # 配置数据集路径 DATA_PATH=/job/code/data/enwiki_text_document # 配置词表路径 TOKENIZER_MODEL=/job/code/model_from_hf/llama-2-7b-hf/tokenizer.model #(可选)自定义配置Elastic Agent运行日志的落盘路径 mkdir -p /job/code/alllogs/$MINDX_TASK_ID/elasticlogs/elastic-log$XDL_IP-$RANK # MINDX_TASK_ID为训练任务ID,使用XDL_IP、RANK区分不同节点Elastic Agent日志 export ELASTIC_LOG_PATH=/job/code/alllogs/$MINDX_TASK_ID/elasticlogs/elastic-log$XDL_IP-$RANK # 输入Elastic Agent组件运行日志的落盘路径 #(可选)配置组件间grpc通信使用安全链接 export ELASTIC_GRPC_SECURE_CONNECT=on # 安全链接开关,取值为"on"时,表示打开配置 export ELASTIC_GRPC_SECURE_CERTIFICATES_PATH=/usr/security/cert # 安全证书地址,请将/usr/security/cert替换为有效的安全证书地址
- 若训练任务YAML中hostNetwork参数值为false,则需要将train_start.sh中的export GLOO_SOCKET_IFNAME的值改设置为eth0,其他代码保持不变。示例如下:
export GLOO_SOCKET_IFNAME=eth0 # eth0是容器内可以通信的网口 export HCCL_SOCKET_IFNAME=eth0
- 无安全需求场景,无需配置组件间gRPC通信安全链接参数ELASTIC_GRPC_SECURE_CONNECT。安全链接开关打开,请同时配置安全证书地址;安全链接开关关闭,安全证书地址无需配置。
- 若使用TaskD完成进程级别重调度、进程级在线恢复、进程级别原地恢复或弹性训练,还需拉起TaskD Manager。
- 创建manager.py文件,放在调用训练脚本时的当前目录下。manager.py文件内容如下所示。
from taskd.api import init_taskd_manager, start_taskd_manager import os job_id=os.getenv("MINDX_TASK_ID") node_nums=XX # 用户填入任务节点总数,int类型 proc_per_node=XX # 用户填入任务每个节点的训练进程数量,int类型 init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node}) start_taskd_manager() - 在训练脚本中增加以下代码,拉起TaskD Manager,推荐将torchrun分布式参数--monitor_interval设置为1。
在以下代码中,TASKD_SO_PATH和export LD_PRELOAD两条语句的作用是将安装TaskD后libtaskd.so的路径配置到环境变量LD_PRELOAD中。如果这两条语句配置不成功,可通过手动执行pip show taskd命令获取Location的值拼接上/taskd/python/cython_api/libs/libtaskd.so,然后通过export设置。
sed -i '/import os/i import taskd.python.adaptor.patch' $(pip3 show torch | grep Location | awk -F ' ' '{print $2}')/torch/distributed/run.py TASKD_SO_PATH="$(pip show taskd | awk '/^Location: / {print $2"/taskd/python/cython_api/libs/libtaskd.so"}')" export LD_PRELOAD=$TASKD_SO_PATH:$LD_PRELOAD export TASKD_PROCESS_ENABLE="on" if [[ "${RANK}" == 0 ]]; then export MASTER_ADDR=${POD_IP} python manager.py & # 具体执行路径由当前路径决定 fi DISTRIBUTED_ARGS="...\ --monitor_interval 1 \ ..." torchrun $DISTRIBUTED_ARGS ... - 修改任务YAML,新增容器端口,在所有的Pod下增加TaskD通信使用的端口9601。
... ports: - containerPort: 9601 name: taskd-port ...
- 创建manager.py文件,放在调用训练脚本时的当前目录下。manager.py文件内容如下所示。
- 若训练任务YAML中hostNetwork参数值为false,则需要将train_start.sh中的export GLOO_SOCKET_IFNAME的值改设置为eth0,其他代码保持不变。示例如下:
- 配置包含多种重调度级别的训练任务。获取训练任务YAML,该YAML中已经配置了Pod级别重调度、进程级别重调度、进程级在线恢复、弹性训练等。然后根据实际情况配置挂载卷的服务器IP地址等配置。
cd LLAMA2_for_PyTorch_2.7_code/yamls wget https://gitcode.com/Ascend/mindxdl-deploy/blob/master/samples/train/resumable-training/fault-tolerance/without-ranktable/pytorch/llama2/yamls/pytorch_multinodes_acjob_910b.yaml
进程级别重调度、进程级在线恢复、弹性训练等训练进程级别的恢复与优雅容错不可同时存在。优雅容错的配置步骤请参见优雅容错模式。
- (可选)如需使用进程级别重调度或进程级在线恢复,请在LLAMA2_for_PyTorch_2.7_code/mindspeed_llm/training/training.py代码中增加如下加粗内容。
torch_npu版本为7.0.RC1及以上版本时,无需执行本步骤。
... class CustomFunction(torch.autograd.Function): @staticmethod def forward(ctx, input): torch.cuda.set_stream(torch.cuda.default_stream()) return input @staticmethod def backward(ctx, grad): torch.cuda.set_stream(torch.cuda.default_stream()) return grad def streamHandler(): input_tensor = torch.empty(1, dtype=torch.float32, device="npu", requires_grad=True) grad_tensor = torch.empty(1, dtype=torch.float32, device="npu", requires_grad=True) output_tensor = CustomFunction.apply(input_tensor) output_tensor.backward(grad_tensor) def pretrain(train_valid_test_dataset_provider, ... if args.do_train and args.train_iters > 0: if args.enable_high_availability: from mindio_ttp.adaptor import tft_register_processor, tft_train from mindio_ttp.framework_ttp import tft_register_set_stream_handler tft_register_set_stream_handler(streamHandler) tft_register_processor(train_valid_test_dataset_provider, model_provider, model_type) iteration, num_floating_point_operations_so_far = tft_train(train_args, test_data_iterator_list) else: iteration, num_floating_point_operations_so_far = train(*train_args) ...
MindSpore场景适配示例(基于MindFormers)
训练代码与数据集准备,可以参考MindFormers文档。下面以单台Atlas 800T A2 训练服务器为例,说明具体操作步骤。
- 准备MindFormers代码仓,执行如下命令。
mkdir -p /data/atlas_dls/public/code git clone https://gitee.com/mindspore/mindformers.git cd mindformers git checkout e6bd7da6 mkdir dataset mkdir yamls cd .. # 将mindformers重命名为LLAMA2_for_MS_code mv mindformers LLAMA2_for_MS_code
- 准备数据集,示例使用7b数据集。
- 预处理数据集。
## 预训练数据集处理方法,本步骤需要启动mindformers-dl:v1,挂载第一步准备好的“LLAMA2_for_MS_code”目录,在容器中执行 docker run -it -v /data/atlas_dls/public/code/LLAMA2_for_MS_code:/home/LLAMA2_for_MS_code -v /usr/local/Ascend/driver:/usr/local/Ascend/driver -e ASCEND_VISIBLE_DEVICES=0-3 mindformers-dl:v1 /bin/bash ## 在容器中执行如下命名,预处理数据集 cd /home/LLAMA2_for_MS_code/mindformers/tools/dataset_preprocess/llama/ python llama_preprocess.py \ --dataset_type wiki \ --input_glob /home/LLAMA2_for_MS_code/dataset/wikitext-2/wiki.train.tokens \ --model_file /home/LLAMA2_for_MS_code/dataset/tokenizer.model \ --seq_length 4096 \ --output_file /home/LLAMA2_for_MS_code/dataset/wiki4096.mindrecord
在执行以上步骤时,如果出现报错:from mindformers.tools.dataset_preprocess.llama.conversation import get_default_conv_template ModuleNotFoundError:No module named 'mindformers.tools.dataset preprocess',需要按照以下步骤进行处理。
- 执行以下命令启动容器。
docker run -it -v /data/atlas_dls/public/code/LLAMA2_for_MS_code:/home/LLAMA2_for_MS_code -v /usr/local/Ascend/driver:/usr/local/Ascend/driver -e ASCEND_VISIBLE_DEVICES=0-3 mindformers-dl:v1 /bin/bash
- 将/home/LLAMA2_for_MS_code/mindformers/tools/dataset_preprocess/llama/llama_preprocess.py文件内的字符串from mindformers.tools.dataset_preprocess.llama.conversation import get_default_conv_template改成from conversation import get_default_conv_template。
- 执行pip uninstall mindformers命令,卸载mindformers。
- 在LLAMA2_for_MS_code目录下执行bash build.sh命令,重新安装MindFormers。
- 再执行数据预处理命令。
cd /home/LLAMA2_for_MS_code/mindformers/tools/dataset_preprocess/llama/ python llama_preprocess.py \ --dataset_type wiki \ --input_glob /home/LLAMA2_for_MS_code/dataset/wikitext-2/wiki.train.tokens \ --model_file /home/LLAMA2_for_MS_code/dataset/tokenizer.model \ --seq_length 4096 \ --output_file /home/LLAMA2_for_MS_code/dataset/wiki4096.mindrecord
- 执行以下命令启动容器。
- 编辑启动脚本LLAMA2_for_MS_code/scripts/msrun_launcher.sh文件,配置日志路径、通信网口等。
#!/bin/bash # Copyright 2024 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================ # msrun Default Parameters # 各日志等级及路径按需配置 export HCCL_ASYNC_ERROR_HANDLING=0 # export ASCEND_GLOBAL_LOG_LEVEL=1 export LOGLEVEL=DEBUG export GLOG_v=2 export GLOG_log_dir=/job/code/alllogs/${MINDX_TASK_ID}/traininglog/msrun # export HCCL_ENTRY_LOG_ENABLE=1 export HCCL_CONNECT_TIMEOUT=600 # 物理机上可以通信的网口,根据主节点高速网卡实际情况进行配置,如任务YAML中配置hostNetwork为false,则设置为eth0。示例基于Atlas 800T A2 训练服务器,如果使用的其他设备,请根据实际情况修改 export GLOO_SOCKET_IFNAME=enp189s0f0 # 如任务YAML中配置hostNetwork为false,则设置为eth0。示例基于Atlas 800T A2 训练服务器,如果使用的其他设备,请根据实际情况修改 export HCCL_SOCKET_IFNAME=enp189s0f0 # 配置集合通信起始端口,预防该端口被占用 export HCCL_IF_BASE_PORT=64000 export PROCESS_RECOVER="on" # 进程级别重调度及进程级在线恢复Elastic Agent侧开关 export ELASTIC_PROCESS_RECOVER_ENABLE=1 # 使能环境变量使得taskd能与clusterd通信 export MINDIO_FOR_MINDSPORE=1 # 在MindSpore场景下使能MindIO export MS_ENABLE_TFT='{TTP:1,UCE:1,ARF:1,HCCE:1,RSC:1}' # 分别开启临终遗言、片上内存故障进程级在线恢复、进程级重调度、网络故障进程级在线恢复和Pod级别重调度 export MS_TFT_IP=$MS_SCHED_HOST # 配置MindSpore所用MindIO controller地址 export MS_TFT_PORT=8000 # 配置MindSpore所用MindIO controller端口 export HCCL_OP_RETRY_ENABLE="L0:0, L1:1, L2:1" # 此环境变量用于配置是否开启HCCL算子的重执行特性。重执行是指当执行通信算子时报SDMA或者RDMA CQE类型的错误,HCCL会尝试重新执行此通信算子。 # 以任务id分类,生成各类日志文件夹 mkdir -p /job/code/alllogs/${MINDX_TASK_ID} mkdir -p /job/code/alllogs/${MINDX_TASK_ID}/traininglog/log-print/ export LOG_MF_PATH=/job/code/alllogs/${MINDX_TASK_ID}/traininglog/mf/log$MF_LOG_SUFFIX # Add the suffix to the msrun_log LOG_DIR=/job/code/alllogs/${MINDX_TASK_ID}/traininglog/log-output/$MF_LOG_SUFFIX export ASCEND_PROCESS_LOG_PATH=/job/code/alllogs/$MINDX_TASK_ID/plogs/$MS_NODE_RANK #设置plog落盘路径 export TTP_LOG_PATH=/job/code/alllogs/$MINDX_TASK_ID/ttplogs/$MS_NODE_RANK export TRAIN_LOG_PATH=/job/code/alllogs/$MINDX_TASK_ID/trainlogs/$MS_NODE_RANK source /usr/local/Ascend/ascend-toolkit/set_env.sh WORKER_NUM=$MS_WORKER_NUM LOCAL_WORKER=$MS_LOCAL_WORKER MASTER_ADDR=$MS_SCHED_HOST MASTER_PORT=$MS_SCHED_PORT NODE_RANK=$MS_NODE_RANK JOIN="True" CLUSTER_TIME_OUT=7200 # export HCCL_BUFFSIZE=2 # HCCL memory usage # Set PYTHONPATH MF_SCRIPTS_ROOT=$(realpath "$(dirname "$0")") export PYTHONPATH=$MF_SCRIPTS_ROOT/../:$PYTHONPATH # Set the log suffix if [ -z "${MF_LOG_SUFFIX+x}" ] || [ "$MF_LOG_SUFFIX" == "" ] then MF_LOG_SUFFIX=$MF_LOG_SUFFIX else MF_LOG_SUFFIX=_$MF_LOG_SUFFIX fi # get the workspcace path WORKSPACE_PATH=$(pwd) # Add the suffix to the MF_LOG # Set the PLOG path if [ -z "${PLOG_REDIRECT_TO_OUTPUT+x}" ] || [ $PLOG_REDIRECT_TO_OUTPUT == False ] then echo "No change the path of plog, the path of plog is /root/ascend" else export ASCEND_PROCESS_LOG_PATH=$WORKSPACE_PATH/output/plog$MF_LOG_SUFFIX echo "PLOG_REDIRECT_TO_OUTPUT=$PLOG_REDIRECT_TO_OUTPUT, set the path of plog to $ASCEND_PROCESS_LOG_PATH" fi if [ $# != 1 ] && [ $# != 2 ] && [ $# != 6 ] && [ $# != 9 ] then echo "Usage Help: bash msrun_launcher.sh [EXECUTE_ORDER] For Default 8 Devices In Single Machine" echo "Usage Help: bash msrun_launcher.sh [EXECUTE_ORDER] [WORKER_NUM] For Quick Start On Multiple Devices In Single Machine" echo "Usage Help: bash msrun_launcher.sh [EXECUTE_ORDER] [WORKER_NUM] [MASTER_PORT] [LOG_DIR] [JOIN] [CLUSTER_TIME_OUT] For Multiple Devices In Single Machine" echo "Usage Help: bash msrun_launcher.sh [EXECUTE_ORDER] [WORKER_NUM] [LOCAL_WORKER] [MASTER_ADDR] [MASTER_PORT] [NODE_RANK] [LOG_DIR] [JOIN] [CLUSTER_TIME_OUT] For Multiple Devices In Multiple Machines" exit 1 fi # Start Without Parameters For 8 Devices On Single Machine if [ $# == 1 ] then echo "No parameter is entered. Notice that the program will run on default 8 cards. " SINGLE_NODE=false else WORKER_NUM=$MS_LOCAL_WORKER fi # Check WORKER_NUM if [[ ! $WORKER_NUM =~ ^[0-9]+$ ]]; then echo "error: worker_num=$WORKER_NUM is not a number" exit 1 fi # Quick Start For Multiple Devices On Single Machine if [ $# == 2 ] then LOCAL_WORKER=$WORKER_NUM SINGLE_NODE=true fi # Multiple Devices On Single Machine if [ $# == 6 ] then LOCAL_WORKER=$WORKER_NUM MASTER_PORT=$3 LOG_DIR=$4 JOIN=$5 CLUSTER_TIME_OUT=$6 SINGLE_NODE=true fi # Multiple Devices On Multiple Machine if [ $# == 9 ] then LOCAL_WORKER=$3 MASTER_ADDR=$4 MASTER_PORT=$5 NODE_RANK=$6 LOG_DIR=$7 JOIN=$8 CLUSTER_TIME_OUT=$9 if [ $WORKER_NUM == $LOCAL_WORKER ] then echo "worker_num is equal to local_worker, Notice that task will run on single node." SINGLE_NODE=true else echo "worker_num=$WORKER_NUM, local_worker=$LOCAL_WORKER, \ Please run this script on other nodes with different node_rank." SINGLE_NODE=false fi fi # Init msrun Command if [ $SINGLE_NODE == true ] then MSRUN_CMD="msrun --worker_num=$WORKER_NUM \ --local_worker_num=$LOCAL_WORKER \ --master_port=$MASTER_PORT \ --log_dir=$LOG_DIR \ --join=$JOIN \ --cluster_time_out=$CLUSTER_TIME_OUT" else MSRUN_CMD="msrun --worker_num=$WORKER_NUM \ --local_worker_num=$LOCAL_WORKER \ --master_addr=$MASTER_ADDR \ --master_port=$MASTER_PORT \ --node_rank=$NODE_RANK \ --log_dir=$LOG_DIR \ --join=$JOIN \ --cluster_time_out=$CLUSTER_TIME_OUT" fi EXECUTE_ORDER="$MSRUN_CMD $1 2>&1 |& tee -a /job/code/alllogs/${MINDX_TASK_ID}/traininglog/log-print/node-$MS_NODE_RANK" ulimit -u unlimited echo "Running Command: $EXECUTE_ORDER" echo "Please check log files in ${WORKSPACE_PATH}/${LOG_DIR}" function check_return_code() { ret_code=$? if [[ ${ret_code} -ne 0 ]]; then logger "running job failed. exit code: ${ret_code}" | tee -a ${output_url}/log exit ${ret_code} fi } CKPT_PATH="./output/checkpoint" if [ -d "${CKPT_PATH}" ] then msrun --worker_num=$WORKER_NUM \ --local_worker_num=$LOCAL_WORKER \ --master_addr=$MASTER_ADDR \ --master_port=$MASTER_PORT \ --node_rank=$NODE_RANK \ --log_dir=$LOG_DIR \ --join=$JOIN \ --cluster_time_out=$CLUSTER_TIME_OUT $1 --load_checkpoint="${CKPT_PATH}" --resume_training=true 2>&1 |& tee -a /job/code/alllogs/${MINDX_TASK_ID}/traininglog/log-print/node-$MS_NODE_RANK else msrun --worker_num=$WORKER_NUM \ --local_worker_num=$LOCAL_WORKER \ --master_addr=$MASTER_ADDR \ --master_port=$MASTER_PORT \ --node_rank=$NODE_RANK \ --log_dir=$LOG_DIR \ --join=$JOIN \ --cluster_time_out=$CLUSTER_TIME_OUT $1 --load_checkpoint="" --resume_training=false 2>&1 |& tee -a /job/code/alllogs/${MINDX_TASK_ID}/traininglog/log-print/node-$MS_NODE_RANK fi ST=${PIPESTATUS[0]} if [[ ${ST} -ne 0 ]]; then echo "process exit with exitcode:${ST}" logger "running job failed. exit code: $ret" | tee -a ${output_url}/log exit ${ST} fi
- 若故障Pod进行任务重调度(断点续训)之后,训练日志丢失,可参考FAQ进行处理。
- 若使用TaskD完成进程级别重调度、进程级在线恢复、进程级别原地恢复、借轨通信任务暂停与回切或在线压测,还需拉起TaskD Manager。
- 创建manager.py文件,放在调用训练脚本时的当前目录下,manager.py文件内容如下所示。
from taskd.api import init_taskd_manager, start_taskd_manager import os job_id=os.getenv("MINDX_TASK_ID") node_nums=XX # 用户填入任务节点总数,int类型 proc_per_node=XX # 用户填入任务每个节点的训练进程数量,int类型 init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node}) start_taskd_manager() - 在训练脚本中增加以下代码拉起TaskD Manager。在以下代码中,前两条语句的作用是将安装TaskD后libtaskd.so的路径配置到环境变量LD_PRELOAD中。如果这两条语句配置不成功,可通过手动执行pip show taskd命令获取Location的值拼接上/taskd/python/cython_api/libs/libtaskd.so,然后通过export设置。
TASKD_SO_PATH="$(pip show taskd | awk '/^Location: / {print $2"/taskd/python/cython_api/libs/libtaskd.so"}')" export LD_PRELOAD=$TASKD_SO_PATH:$LD_PRELOAD export TASKD_PROCESS_ENABLE="on" if [[ "${MS_SCHED_HOST}" == "${POD_IP}" ]]; then python manager.py & # 具体执行路径由当前路径决定 fi msrun ... - 修改任务YAML,新增容器端口,在所有的Pod下增加TaskD通信使用的端口9601。
... ports: - containerPort: 9601 name: taskd-port ...
- 创建manager.py文件,放在调用训练脚本时的当前目录下,manager.py文件内容如下所示。
- 修改模型参数配置YAML。打开LLAMA2_for_MS_code/configs/llama2/pretrain_llama2_7b.yaml文件,配置数据集路径、分布式并行参数等。请根据实际需要修改训练参数配置YAML中各字段。(注:使用Pod级重调度、进程级重调度、进程级在线恢复特性需要保证启动时CKPT目录下存在已保存的断点CKPT文件,且在训练使用的配置YAML中设置resume_training=true和load_checkpoint={CKPT目录})
@@ -15,7 +15,7 @@ trainer: # runner config runner_config: - epochs: 2 + epochs: 200 batch_size: 1 sink_mode: True sink_size: 1 @@ -88,13 +88,14 @@ parallel: parallel_optimizer_config: gradient_accumulation_shard: False parallel_optimizer_threshold: 64 + optimizer_weight_shard_size: 2 # default parallel of device num = 8 for Atlas 800T A2 parallel_config: - data_parallel: 8 + data_parallel: 64 model_parallel: 1 pipeline_stage: 1 use_seq_parallel: False - micro_batch_num: 1 + micro_batch_num: 16 vocab_emb_dp: True gradient_aggregation_group: 4 # when model parallel is greater than 1, we can set micro_batch_interleave_num=2, that may accelerate the train process. @@ -128,6 +129,8 @@ context: save_graphs: False save_graphs_path: "./graph" device_id: 0 + #ascend_config: + # parallel_speed_up_json_path: "/home/lby/workspace/mindformers/parallel_speed_up.json" graph_kernel_flags: "--disable_pass=cluster.floatstatus_fusion,preprocess.depend_elimination" # model config model: @@ -136,7 +139,7 @@ model: batch_size: 1 # add for increase predict seq_length: 4096 hidden_size: 4096 - num_layers: 32 + num_layers: 4 num_heads: 32 vocab_size: 32000 multiple_of: 256 - 拷贝LLAMA2_for_MS_code/configs/llama2/pretrain_llama2_7b.yaml为LLAMA2_for_MS_code/configs/llama2/no_resume_pretrain_llama2_7b.yaml,修改该YAML中的以下字段。在使用编译缓存的情况下,不会生成strategy文件,运行前请删除编译缓存。
src_strategy_path_or_dir: './output/strategy' resume_training: False load_checkpoint: ''
- (可选)使用临终CKPT的场景,在保存CKPT后通过Pod级别重调度加载CKPT,需修改文件LLAMA2_for_MS_code/mindformers/trainer/ trainer.py,增加如下加粗内容:
… from mindspore.dataset.engine.datasets import BatchDataset, RepeatDataset, Dataset from mindspore.communication.comm_func import barrier from mindformers.core.parallel_config import build_parallel_config \ … … def _check_config_rules(self): """Check config rules.""" self._adjust_resume_training_if_ckpt_path_invalid() barrier() if self.config.auto_trans_ckpt and self.config.load_ckpt_format == 'ckpt': … … if isinstance(self.config.resume_training, str) and \ self.config.load_checkpoint and os.path.isfile(self.config.load_checkpoint): logger.warning(f"`resume_training={self.config.resume_training}` is not valid " "when `load_checkpoint` is a file path") self.config.resume_training = True def _adjust_resume_training_if_ckpt_path_invalid(self): """Disable resume_training if checkpoint path is empty or invalid.""" if isinstance(self.config.resume_training, bool) and self.config.resume_training: try: p = Path(self.config.load_checkpoint).resolve() except TypeError as e: raise TypeError(f"`load_checkpoint` should be a string, " f"but got type {type(self.config.load_checkpoint)}.") from e if self.config.load_checkpoint: if p.is_dir() and not any(p.iterdir()): self.config.resume_training = False self.config.load_checkpoint = "" logger.warning( "The `load_checkpoint` is an empty path while the `resume_training` is True. " "Hence `resume_training` and `load_checkpoint` is changed to False " "and '' respectively, and randomly initialized weights will be used." ) else: self.config.resume_training = False self.config.load_checkpoint = "" logger.warning( "The `load_checkpoint` is '' while the `resume_training` is True. " "Hence `resume_training` and `load_checkpoint` is changed to False and '' respectively, " "and randomly initialized weights will be used." ) def _check_args_task_and_model(self): """Check args, task and model.""" # get support model names of task …修改LLAMA2_for_MS_code/configs/llama2/pretrain_llama2_7b.yaml文件中的如下字段:
resume_training: True load_checkpoint: './output/checkpoint'
- 准备任务训练YAML,根据实际情况修改挂载卷的服务器IP地址等配置。YAML中各参数继承PyTorch场景。
cd LLAMA2_for_MS_code/yamls wget https://gitcode.com/Ascend/mindxdl-deploy/blob/master/samples/train/resumable-training/fault-tolerance/ranktable/mindspore/llama2/yamls/ms_multinodes_acjob_910b.yaml 修改启动命令为: command: # training command, which can be modified - /bin/bash - -c - | cd /job/code/;bash /job/code/scripts/msrun_launcher.sh "run_mindformer.py --config configs/llama2/pretrain_llama2_7b.yaml --train_dataset_dir {数据集的实际路径}/wiki4096/wiki4096.mindrecord --use_parallel True --run_mode train"
强化学习后训练场景适配示例(基于Verl)
MindCluster仅支持Job级别重调度。Verl的训练任务被Ray集群所管理,为适配MindCluster的Ascend Job任务部署,每个Worker节点上部署一个Pod,Pod内承载该Ray集群上的所有进程。Ray集群的head节点根据Ascend Operator注入的环境变量RANK=0所在的节点决定。RANK=0节点的Pod启动Ray集群,提交Verl后训练任务,其他Worker节点的Pod加入Ray集群。最后所有节点都检测提交的训练任务是否存在异常。
- 若存在异常,则以非0退出,Volcano感知到业务异常触发Job级别重调度。
- 若没有异常且任务已结束,则以0退出。
下面以两台Atlas 900 A3 SuperPoD 超节点为例,说明具体操作步骤。
- 模型权重转换,将HuggingFace模型转换为Megatron模型,可参考Verl模型转化脚本。
# 启动容器,具体模型路径根据实际情况修改 docker run -it \ -v /qwen30b/Qwen3-30B-A3B-Instruct-2507:/qwen30b/Qwen3-30B-A3B-Instruct-2507 \ -v /usr/local/Ascend/driver:/usr/local/Ascend/driver \ -e ASCEND_VISIBLE_DEVICES=0-15 \ verl:v1 /bin/bash # 执行权重转化 cd ~/verl python scripts/converter_hf_to_mcore.py \ --hf_model_path /qwen30b/Qwen3-30B-A3B-Instruct-2507 \ --output_path /qwen30b/Qwen3-30B-A3B-Instruct-Mcore \
若出现如下错误,则先执行如下命令:
export LD_PRELOAD="/usr/local/lib/python3.10/dist-packages/sklearn/utils/../../scikit_learn.libs/libgomp-947d5fa1.so.1.0.0"

- 构建Verl的Qwen3 30B MoE的训练脚本。其中推理后端为vLLM,训练后端为Megatron。
获取脚本示例run_dapo_qwen3_30b_a3b_megatron.sh,并将其放置到verl路径中examples_npu下。同时在“examples_npu/config”路径下创建两个文件dapo_trainer-megatron.yaml和runtime_env.yaml,其内容如下:
- dapo_trainer-megatron.yaml
# examples_npu/config/dapo_trainer-megatron.yaml hydra: searchpath: - file://verl/trainer/config defaults: - ppo_megatron_trainer - _self_ data: gen_batch_size: ${data.train_batch_size} reward_model: reward_manager: dapo overlong_buffer: enable: False # We try to avoid forgetting to set enable len: 0 penalty_factor: 0.0 log: False algorithm: filter_groups: _target_: verl.trainer.config.FilterGroupsConfig enable: False # We try to avoid forgetting to set enable metric: null # acc / score / seq_reward / seq_final_reward / ... max_num_gen_batches: 0 # Non-positive values mean no upper limit trainer: project_name: verl-dapo - runtime_env.yaml
# examples_npu/config/runtime_env.yaml working_dir: ./ excludes: ["/.git/"] env_vars: HCCL_EXEC_TIMEOUT: "7200" HCCL_CONNECT_TIMEOUT: "7200" VLLM_USE_V1: "1" VLLM_VERSION: "0.9.1" HCCL_IF_BASE_PORT: "23999" HCCL_ASYNC_ERROR_HANDLING: "0" P2P_HCCL_BUFFSIZE: "20"
- dapo_trainer-megatron.yaml
- 构建适配MindCluster的Ray启动脚本。在每台Worker节点上准备好Ray启动脚本,放到两台Atlas 900 A3 SuperPoD 超节点上。其中的网卡信息需根据实际情况配置,其余脚本可以保持不变。
- 获取准备任务YAML中的verl-resche.yaml,根据实际情况修改其中的参数,然后执行如下命令启动任务。
kubectl apply -f verl-resche.yaml
启动任务后,会显示如下迭代信息:
