which torchrun
vim {torchrun文件路径}/torchrun
# 增加加粗内容 import re import sys import mindio_ttp.framework_ttp from torch.distributed.run import main as torch_main
vim mindspeed/initialize.py
# 跳转到_initialize_distributed_wrapper函数定义,对加粗处修改: def _initialize_distributed_wrapper(fn): @wraps(fn) def wrapper(*args, **kwargs): device_count = torch.cuda.device_count() device = get_args().rank % device_count torch.cuda.set_device(device) from mindio_ttp.adaptor import tft_init_controller_processor tft_init_controller_processor(enable_tls=True, tls_option_top_path='{证书配置根目录}') fn(*args, **kwargs) return wrapper
vim mindspeed/initialize.py
# 跳转到new_group_wrapper函数定义,新增对加粗处的修改内容: def new_group_wrapper(fn): @wraps(fn) def wrapper(*args, **kwargs): backend = kwargs.get('backend', None) from mindio_ttp.adaptor import tft_is_arf_reboot_node if tft_is_arf_reboot_node() and isinstance(backend, str) and 'gloo' in backend: return None if (backend is None)or torch.distributed.distributed_c10d._is_barrier_after_init(): kwargs['use_local_synchronization'] = True res = fn(*args, **kwargs) return res return wrapper
vim mindspeed/training.py
... class CustomFunction(torch.autograd.Function): @staticmethod def forward(ctx, input): torch.cuda.set_stream(torch.cuda.default_stream()) return input @staticmethod def backward(ctx, grad): torch.cuda.set_stream(torch.cuda.default_stream()) return grad def streamHandler(): input_tensor = torch.empty(1, dtype=torch.float32, device="npu", requires_grad=True) grad_tensor = torch.empty(1, dtype=torch.float32, device="npu", requires_grad=True) output_tensor = CustomFunction.apply(input_tensor) output_tensor.backward(grad_tensor) def pretrain(train_valid_test_dataset_provider, ... if args.do_train and args.train_iters > 0: if args.enable_high_availability: from mindio_ttp.adaptor import tft_register_processor, tft_train from mindio_ttp.framework_ttp import tft_register_set_stream_handler tft_register_set_stream_handler(streamHandler) tft_register_processor(train_valid_test_dataset_provider, model_provider, model_type) iteration, num_floating_point_operations_so_far = tft_train(train_args, test_data_iterator_list) else: iteration, num_floating_point_operations_so_far = train(*train_args) ...
def _add_high_availability_args(parser): group = parser.add_argument_group(title='high_availability') group.add_argument('--enable-high-availability', action='store_true', help='switch of the high availability feature') group.add_argument("--enable-hbmfault-repair", action='store_true', help="high availability feature, enable hbmfault repair") group.add_argument("--enable-worker-reboot", action='store_true', help="high availability feature, enable ARF") group.add_argument("--distributed-optimizer-no-replica", action='store_true') return parser
此处以“tests_extend/model_tests/perf_model/llama2/pretrain_llama2_70b_4k.sh”脚本为例。
vim tests_extend/model_tests/perf_model/llama2/pretrain_llama2_70b_4k.sh
#!/bin/bash export CUDA_DEVICE_MAX_CONNECTIONS=1 source "tests_extend/system_tests/env_npu.sh" # Change for multinode config NPUS_PER_NODE=16 MASTER_ADDR=<master_ip_address> MASTER_PORT=6000 NNODES=8 NODE_RANK=<local_rank> WORLD_SIZE=$(($NPUS_PER_NODE*$NNODES)) export GLOO_SOCKET_IFNAME=enp189s0f0 export TTP_OT=720 export TTP_ADDR="master node ip" source /usr/local/Ascend/ascend-toolkit/set_env.sh CKPT_DIR=./ckpt_llama DATA_PATH="/home/dataset/llama2/alpaca_text_document" TOKENIZER_MODEL="/home/dataset/model/llama-2-7b-hf/tokenizer.model" TP=8 PP=4 CP=1 DISTRIBUTED_ARGS=" --nproc_per_node $NPUS_PER_NODE \ --nnodes $NNODES \ --node_rank $NODE_RANK \ --master_addr $MASTER_ADDR \ --master_port $MASTER_PORT " GPT_ARGS=" --tensor-model-parallel-size ${TP} \ --pipeline-model-parallel-size ${PP} \ --use-fused-rotary-pos-emb \ --use-fused-swiglu \ --use-fused-rmsnorm \ --log-throughput \ --overlap-grad-reduce \ --overlap-param-gather \ --use-ascend-mc2 \ --num-layers-per-virtual-pipeline-stage 2 \ --sequence-parallel \ --use-distributed-optimizer \ --num-layers 80 \ --hidden-size 8192 \ --ffn-hidden-size 28672 \ --num-attention-heads 64 \ --tokenizer-type Llama2Tokenizer \ --tokenizer-model ${TOKENIZER_MODEL} \ --seq-length 4096 \ --max-position-embeddings 4096 \ --micro-batch-size 1 \ --global-batch-size 32 \ --make-vocab-size-divisible-by 1 \ --lr 1.0e-6 \ --train-iters 5000 \ --lr-decay-style cosine \ --untie-embeddings-and-output-weights \ --attention-dropout 0.0 \ --init-method-std 0.01 \ --hidden-dropout 0.0 \ --position-embedding-type rope \ --normalization RMSNorm \ --swiglu \ --use-flash-attn \ --no-masked-softmax-fusion \ --attention-softmax-in-fp32 \ --min-lr 1.0e-7 \ --weight-decay 0.1 \ --clip-grad 1.0 \ --adam-beta1 0.9 \ --initial-loss-scale 4096.0 \ --adam-beta2 0.95 \ --adam-eps 1e-5 \ --no-gradient-accumulation-fusion \ --disable-bias-linear \ --group-query-attention \ --num-query-groups 8 \ --lr-warmup-fraction 0.01 \ --bf16 \ --enable-high-availability \ --enable-hbmfault-repair \ --enable-worker-reboot \ --distributed-optimizer-no-replica \ " DATA_ARGS=" --data-path $DATA_PATH \ --split 949,50,1 " OUTPUT_ARGS=" --log-interval 1 \ --save-interval 10000 \ --eval-interval 1000 \ --eval-iters 10 " torchrun $DISTRIBUTED_ARGS pretrain_gpt.py \ $GPT_ARGS \ $DATA_ARGS \ $OUTPUT_ARGS \ --distributed-backend nccl \ --save $CKPT_DIR \ set +x
高可用功能相关参数说明如下:
开启MindIO TFT开关后,各类优化器显存会发生变化,变化详情请参见表1。
对于分布式优化器而言,由于增加了优化器副本,导致静态内存有所增加。但是集群规模越大时,DP Size越大,平均到单卡的显存增加量很小,这样可以避免OOM,因此推荐在大集群中使用。根据显存情况选择开启与否,调节参数。