which torchrun
vim {torchrun文件路径}/torchrun
# 增加加粗内容 import re import sys import mindio_ttp.framework_ttp from torch.distributed.run import main as torch_main
vim mindspeed_llm/training/initialize.py
# 跳转到_initialize_distributed_wrapper函数定义,对加粗处修改: def _initialize_distributed_wrapper(fn): @wraps(fn) def wrapper(*args, **kwargs): device_count = torch.cuda.device_count() device = get_args().rank % device_count torch.cuda.set_device(device) from mindio_ttp.adaptor import tft_init_controller_processor tft_init_controller_processor(enable_tls=True, tls_option_top_path='{证书配置根目录}') fn(*args, **kwargs) return wrapper
vim mindspeed_llm/training/initialize.py
# 跳转到new_group_wrapper函数定义,对加粗处修改: def new_group_wrapper(fn): @wraps(fn) def wrapper(*args, **kwargs): backend = kwargs.get('backend', None) from mindio_ttp.adaptor import tft_is_arf_reboot_node if tft_is_arf_reboot_node() and isinstance(backend, str) and 'gloo' in backend: return None if (backend is None)or torch.distributed.distributed_c10d._is_barrier_after_init(): kwargs['use_local_synchronization'] = True res = fn(*args, **kwargs) return res return wrapper
vim mindspeed_llm/training/training.py
... class CustomFunction(torch.autograd.Function): @staticmethod def forward(ctx, input): torch.cuda.set_stream(torch.cuda.default_stream()) return input @staticmethod def backward(ctx, grad): torch.cuda.set_stream(torch.cuda.default_stream()) return grad def streamHandler(): input_tensor = torch.empty(1, dtype=torch.float32, device="npu", requires_grad=True) grad_tensor = torch.empty(1, dtype=torch.float32, device="npu", requires_grad=True) output_tensor = CustomFunction.apply(input_tensor) output_tensor.backward(grad_tensor) def pretrain(train_valid_test_dataset_provider, ... if args.do_train and args.train_iters > 0: if args.enable_high_availability: from mindio_ttp.adaptor import tft_register_processor, tft_train from mindio_ttp.framework_ttp import tft_register_set_stream_handler tft_register_set_stream_handler(streamHandler) tft_register_processor(train_valid_test_dataset_provider, model_provider, model_type) iteration, num_floating_point_operations_so_far = tft_train(train_args, test_data_iterator_list) else: iteration, num_floating_point_operations_so_far = train(*train_args) ...
如需要叠加使用数据修复支持周期CheckPoint功能,则需修改“mindspeed_llm/training/arguments.py”文件。
def _add_high_availability_args(parser): group = parser.add_argument_group(title='high_availability') group.add_argument('--enable-high-availability', action='store_true', help='switch of the high availability feature') group.add_argument('--enable-hbmfault-repair', action='store_true', help='high availability feature, enable hbmfault repair') group.add_argument("--enable-worker-reboot", action='store_true', help="high availability feature, enable ARF") group.add_argument("--distributed-optimizer-no-replica", action='store_true') return parser
此处以编辑“examples/legacy/llama2/pretrain_llama2_7b_ptd.sh”脚本为例。
vim examples/legacy/llama2/pretrain_llama2_7b_ptd.sh
#!/bin/bash export CUDA_DEVICE_MAX_CONNECTIONS=1 export PYTORCH_NPU_ALLOC_CONF=expandable_segments:True export GLOO_SOCKET_IFNAME=enp189s0f0 export TTP_OT=720 export TTP_ADDR="master node ip" source /usr/local/Ascend/ascend-toolkit/set_env.sh GPUS_PER_NODE=8 MASTER_ADDR=localhost MASTER_PORT=6000 NNODES=1 NODE_RANK=0 WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES)) CKPT_SAVE_DIR="your model save ckpt path" DATA_PATH="your data path" TOKENIZER_MODEL="your tokenizer path" CKPT_LOAD_DIR="your model ckpt path" TP=1 PP=2 DISTRIBUTED_ARGS=" --nproc_per_node $GPUS_PER_NODE \ --nnodes $NNODES \ --node_rank $NODE_RANK \ --master_addr $MASTER_ADDR \ --master_port $MASTER_PORT " GPT_ARGS=" --tensor-model-parallel-size ${TP} \ --pipeline-model-parallel-size ${PP} \ --sequence-parallel \ --num-layers 32 \ --hidden-size 4096 \ --ffn-hidden-size 11008 \ --num-attention-heads 32 \ --tokenizer-type Llama2Tokenizer \ --tokenizer-model ${TOKENIZER_MODEL} \ --seq-length 4096 \ --max-position-embeddings 4096 \ --micro-batch-size 1 \ --global-batch-size 256 \ --make-vocab-size-divisible-by 1 \ --lr 1.25e-6 \ --train-iters 5000 \ --lr-decay-style cosine \ --untie-embeddings-and-output-weights \ --disable-bias-linear \ --attention-dropout 0.0 \ --init-method-std 0.01 \ --hidden-dropout 0.0 \ --position-embedding-type rope \ --normalization RMSNorm \ --use-fused-rmsnorm \ --swiglu \ --use-flash-attn \ --no-masked-softmax-fusion \ --attention-softmax-in-fp32 \ --min-lr 1.25e-7 \ --weight-decay 1e-1 \ --lr-warmup-fraction 0.01 \ --clip-grad 1.0 \ --adam-beta1 0.9 \ --initial-loss-scale 65536 \ --adam-beta2 0.95 \ --no-gradient-accumulation-fusion \ --no-load-optim \ --no-load-rng \ --use-distributed-optimizer \ --use-fused-swiglu \ --use-fused-rotary-pos-emb \ --overlap-grad-reduce \ --bf16 \ --enable-high-availability \ --enable-hbmfault-repair \ --enable-worker-reboot \ --distributed-optimizer-no-replica \ " DATA_ARGS=" --data-path $DATA_PATH \ --split 949,50,1 " OUTPUT_ARGS=" --log-interval 1 \ --save-interval 10000 \ --eval-interval 1000 \ --eval-iters 10 \ " torchrun $DISTRIBUTED_ARGS pretrain_gpt.py \ $GPT_ARGS \ $DATA_ARGS \ $OUTPUT_ARGS \ --distributed-backend nccl \ --jit-compile \ --load $CKPT_LOAD_DIR \ --save $CKPT_SAVE_DIR \ | tee logs/train_llama2_7b.log
高可用功能相关参数说明如下:
开启MindIO TFT开关后,各类优化器显存会发生变化,变化详情请参见表1。
对于分布式优化器而言,由于增加了优化器副本,导致静态内存有所增加。但是集群规模越大时,DP Size越大,平均到单卡的显存增加量很小,这样可以避免OOM,因此推荐在大集群中使用。根据显存情况选择开启与否,调节参数。