对接ModelLink框架
前提条件
ModelLink框架和数据集的准备请参见ModelLink。
- 支持的版本对应Commit ID为:af1347
- README中涉及安装Megatron-LM的tag=23.05的部分切换为23.06(原tag已被删除)。
- “enable_ttp”为开启TTP代码开关,默认开启。
- 若不使用TTP而使用原生代码,请参见10导出变量ENABLE_TTP为False;当卸载MindIO.TTP时,启动方式还原为torchrun,依旧需要导出变量ENABLE_TTP为False。
操作步骤
ModelLink框架和MindIO TTP SDK安装完成之后,需要修改代码才可在框架中启动TTP模块,在训练中同步优化器数据更新状态到TTP模块,故障时TTP能够及时生成临终CheckPoint。
- 修改“ascendspeed/__init__.py”。
- 打开“ascendspeed/__init__.py”文件。
vim ascendspeed/__init__.py
- 按“i”进入编辑模式,修改以下内容。
# 45行导入get_ttp_switch。 from .utils import get_ttp_switch
- 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 打开“ascendspeed/__init__.py”文件。
- 修改“ascendspeed/utils.py”。
- 打开“ascendspeed/utils.py”文件。
vim ascendspeed/utils.py
- 按“i”进入编辑模式,修改以下内容。
# 44行后增加下列方法。 def get_ttp_switch(): enable_ttp = os.getenv('ENABLE_TTP', 'True') enable_ttp = enable_ttp.lower() in ('true', '1') return enable_ttp - 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 打开“ascendspeed/utils.py”文件。
- 修改“ascendspeed/optimizer/__init__.py”。
- 打开“ascendspeed/optimizer/__init__.py”文件。
vim ascendspeed/optimizer/__init__.py
- 按“i”进入编辑模式,修改以下内容。
- 导入TTPReplicaOptimizer。(常用)DP Size为偶数时:
from ascendspeed import get_ttp_switch enable_ttp = get_ttp_switch() if enable_ttp: from mindio_ttp.modellink import TTPReplicaOptimizer
适配DP Size为奇数时,优化器导入为:
from mindio_ttp.modellink import TTPOddReplicaOptimizer as TTPReplicaOptimizer
- 修改194行的“opt_ty”参数值。
if args.use_distributed_optimizer: if args.release_fp32_grad: opt_ty = DistributedOptimizerWithoutFP32Grad else: opt_ty = TTPReplicaOptimizer if enable_ttp else DistributedOptimizer // add here if enable_ttp: return opt_ty(optimizer, args.clip_grad, args.log_num_zeros_in_grad, params_have_main_grad, args.use_contiguous_buffers_in_local_ddp, args.fp16, args.bf16, args.params_dtype, grad_scaler, model, args.local_copy_os) // add local_copy param else: return opt_ty(optimizer, args.clip_grad, args.log_num_zeros_in_grad, params_have_main_grad, args.use_contiguous_buffers_in_local_ddp, args.fp16, args.bf16, args.params_dtype, grad_scaler, model)
开启local copy时:local_copy设置为True。
- 导入TTPReplicaOptimizer。
- 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 打开“ascendspeed/optimizer/__init__.py”文件。
- 修改“ascendspeed/optimizer/clip_grads.py”。
- 打开“ascendspeed/optimizer/clip_grads.py”文件。
vim ascendspeed/optimizer/clip_grads.py
- 按“i”进入编辑模式,修改以下内容。
# 99行后新增以下内容 from ascendspeed import get_ttp_switch enable_ttp = get_ttp_switch() if enable_ttp: from mindio_ttp.modellink.os_shard_parallel import get_os_shard_group # 第104行作如下修改: torch.distributed.all_reduce(total_norm, op=torch.distributed.ReduceOp.SUM, group=get_os_shard_group() if enable_ttp else parallel_state.get_data_parallel_group()) - 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 打开“ascendspeed/optimizer/clip_grads.py”文件。
- 修改“ascendspeed/optimizer/optimizer.py”。
- 打开“ascendspeed/optimizer/optimizer.py”文件。
vim ascendspeed/optimizer/optimizer.py
- 按“i”进入编辑模式,修改以下内容。
# 35行后新增以下内容 from ascendspeed import get_ttp_switch enable_ttp = get_ttp_switch() if enable_ttp: import mindio_ttp.framework_ttp as ttp # 在490行的step方法中增加以下内容 timers('optimizer-inner-step', log_level=1).start( barrier=args.barrier_with_L1_time) // add after this line if enable_ttp and hasattr(self, 'begin_to_update'): self.begin_to_update(args.iteration) self.optimizer.step() if enable_ttp and hasattr(self, 'end_to_update'): self.end_to_update() - 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 打开“ascendspeed/optimizer/optimizer.py”文件。
- 修改ascendspeed/core/parallel_state.py。
- 打开“ascendspeed/core/parallel_state.py”文件。
vim ascendspeed/core/parallel_state.py
- 按“i”进入编辑模式,修改代码。
- 适配DP Size为偶数时:
# 在176行args = get_args()后添加 from ascendspeed import get_ttp_switch enable_ttp = get_ttp_switch() if enable_ttp: from mindio_ttp.modellink import build_optimizer_replica_group # 在for循环内group = torch.distributed.new_group(ranks)后增加 if enable_ttp: build_optimizer_replica_group(list(ranks))
适配DP Size为奇数时,parallel_state.py进行如下适配。
- 适配DP Size为奇数时:
# 在283左右的def is_unitialized方法前添加 def get_data_parallel_all_rank(): assert _DATA_PARALLEL_GLOBAL_RANKS is not None, \ "Data parallel group is not initialized" return _DATA_PARALLEL_GLOBAL_RANKS
- 适配DP Size为偶数时:
- 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 打开“ascendspeed/core/parallel_state.py”文件。
- 修改“ascendspeed/checkpointing.py”。
- 打开“ascendspeed/checkpointing.py”文件。
vim ascendspeed/checkpointing.py
- 按“i”进入编辑模式,修改代码。
# 34行后增加get_ttp_switch from ascendspeed import get_ttp_switch # 47行后增加 enable_ttp = get_ttp_switch() # 112行修改get_checkpoint_name方法: def get_checkpoint_name(checkpoints_path, iteration, release=False, model_name='model_optim_rng.pt', tmp=False): // add "tmp=False" directory = 'iter_{:07d}'.format(iteration) // add after this line if tmp: directory = directory + "_tmp" # 147行增加ttp_save_checkpoint如下: def ttp_save_checkpoint(save_rank, step, rank_list, args): iteration, model, optimizer, opt_param_scheduler = args if hasattr(optimizer, 'set_dump_args'): optimizer.set_dump_args(save_rank, step, rank_list) save_checkpoint(iteration, model, optimizer, opt_param_scheduler) # 155行修改save_checkpoint方法: print_rank_0('saving checkpoint at iteration {:7d} to {}'.format( iteration, args.save)) // add after this line if enable_ttp and args.local_copy_os and optimizer.error_dump and hasattr(optimizer, "save_args") and optimizer.current_step > optimizer.save_args['step']: iteration = optimizer.save_args['step'] if enable_ttp: tmp = True if optimizer.error_dump else False checkpoint_name = get_checkpoint_name(args.save, iteration, tmp=tmp) else: checkpoint_name = get_checkpoint_name(args.save, iteration) cur_rank = torch.distributed.get_rank() save_flag = optimizer.need_write_file() if enable_ttp else parallel_state.get_data_parallel_rank() == 0 # 修改判断if parallel_state.get_data_parallel_rank() == 0: 为下: if save_flag: # 修改185行左右判断为: if not torch.distributed.is_initialized() or save_flag or args.deepspeed: # 修改save_checkpoint方法最后save_checkpoint_post_process(iteration),增加判断分支: if enable_ttp and optimizer.error_dump: print('rank {} successfully saved checkpoint at iteration {:7d} to {}'.format( cur_rank, iteration, args.save)) else: save_checkpoint_post_process(iteration) # 增加以下方法: def rename_save_result(args): iteration = args[0] optimizer = args[2] args = get_args() src_path = None dst_path = None if args.local_copy_os and hasattr(optimizer, "save_args") and optimizer.current_step > optimizer.save_args['step']: iteration = optimizer.save_args['step'] try: tmp_dir = 'iter_{:07d}_tmp'.format(iteration) fin_dir = 'iter_{:07d}'.format(iteration) src_path = os.path.join(args.save, tmp_dir) dst_path = os.path.join(args.save, fin_dir) os.rename(src_path, dst_path) except Exception: print(f"rename from {src_path} to {dst_path} failed.") traceback.print_exc() return 1 # And update the latest iteration tracker_filename = get_checkpoint_tracker_filename(args.save) with open(tracker_filename, 'w') as f: f.write(str(iteration)) return 0 - 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 打开“ascendspeed/checkpointing.py”文件。
- 修改“ascendspeed/training.py”。
- 打开“ascendspeed/training.py”文件。
vim ascendspeed/training.py
- 按“i”进入编辑模式,修改代码。
# 29行后增加 from ascendspeed import get_ttp_switch # 在68行后导入以下内容: enable_ttp = get_ttp_switch() if enable_ttp: import mindio_ttp.framework_ttp as ttp from mindio_ttp.modellink import get_dp_rank_list from ascendspeed.checkpointing import ttp_save_checkpoint, rename_save_result # 在setup_model_and_optimizer方法中686行后加入: model[0].global_steps = student_global_steps //add after this line if enable_ttp and hasattr(optimizer, "current_step"): optimizer.current_step = args.iteration print(f"optimizer current step has been set to {optimizer.current_step}") # 在train_step方法中776行后加入: timers('backward-embedding-all-reduce').stop() // add after this line if enable_ttp and (not args.local_copy_os): torch.distributed.barrier() # 1154行后修改train方法: def train(...) ... total_loss_dict = {} // add after this line if enable_ttp: # init processor and controller cur_rank = torch.distributed.get_rank() world_size: int = torch.distributed.get_world_size() dp_ranks = get_dp_rank_list() import os masterIp = os.getenv('MASTER_ADDR', '127.0.0.1') port = 8000 if cur_rank == 0: ttp.ttp_init_controller(cur_rank, world_size, replica=len(dp_ranks)//2, enable_local_copy=args.local_copy_os) ttp.ttp_start_controller(masterIp, port) ttp.ttp_init_processor(cur_rank, dp_ranks, len(dp_ranks), world_size, replica=len(dp_ranks)//2, enable_local_copy=args.local_copy_os) ttp.ttp_start_processor(masterIp, port) ttp.ttp_register_save_ckpt_handler(ttp_save_checkpoint) ttp.ttp_register_rename_handler(rename_save_result) ... iteration += 1 //add after this line if enable_ttp: if hasattr(optimizer, "current_step") and iteration != optimizer.current_step: print_rank_0(f"refresh optimizer step {iteration}") ttp.ttp_set_ckpt_args((iteration, model, optimizer, lr_scheduler)) # train方法后,evaluate方法前增加 if enable_ttp: train = ttp.ttp_to_persist(train) - 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 打开“ascendspeed/training.py”文件。
- 修改“ascendspeed/arguments.py”。
- 打开“ascendspeed/arguments.py”文件。
vim ascendspeed/arguments.py
- 按“i”进入编辑模式,修改代码。
# 845行之后增加以下内容: group.add_argument('--local-copy-os', action='store_true', default=False, help='barrier or local copy os, suggest barrier on small cluster') - 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 打开“ascendspeed/arguments.py”文件。
- 编辑预训练脚本。(参考“examples/llama/pretrain_llama_7B_zero_8p.sh”)
- 拷贝“examples/llama/pretrain_llama_7B_zero_8p.sh”文件到工程根目录并重命名,此处以ttprun_llama.sh为例。
cp examples/llama/pretrain_llama_7B_zero_8p.sh ./ttprun_llama.sh
- 打开“ttprun_llama.sh”文件。
vim ttprun_llama.sh
- 按“i”进入编辑模式,在对应位置编辑脚本。
- 删除
- 删除涉及deepspeed启动的代码。
- 删除参数:
-- ds_args \
- (可选)删除参数:
# Atlas 800T A2 训练服务器上需保留这个两参数,其他服务器需要删除 --use-flash-attn \ --use-fused-rmsnorm \
- 修改
- GPUS_PER_NODE修改为NPUS_PER_NODE。
- 修改set_env.sh为实际路径。
- 根据实际情况修改ip、port、nnodes、node_rank。
- 修改启动方式为:
ttprun $DISTRIBUTED_ARGS
卸载MindIO TTP后,启动方式需要还原为torchrun。
- 新增
- 增加多机环境导出变量的语句:
# 此处以enp189s0f0为例,高速网卡名称,可通过在计算节点执行ip a命令获取,使用第一个配置节点IP地址的名称,请根据实际情况进行修改 export GLOO_SOCKET_IFNAME=enp189s0f0 # 将设置好的MASTER_ADDR导出 export MASTER_ADDR=xxx.xxx.xxx.xxx # 导出变量,集群故障后使用TTP保存数据等待框架退出的时间,不导出默认120s,视情况修改 export TTP_OT=360 # TTP特性开关,默认值为True,此时不加ENABLE_TTP参数;使用原生代码时设置为False export ENABLE_TTP=True
- 增加参数:
# 此处以./dataset/llama为例,请根据实际情况进行修改 TOKENIZER_PATH=./dataset/llama DISTRIBUTED_ARGS="--nproc_per_node $NPUS_PER_NODE --nnodes $NNODES --node_rank $NODE_RANK --master_addr $MASTER_ADDR --master_port $MASTER_PORT" --use-distributed-optimizer \ --save $CHECKPOINT_PATH \ # 大集群推荐,小集群比较占显存,加上以下参数开启local copy --local-copy-os \
适配DP Size为奇数时,修改NPUS_PER_NODE为奇数。
- 增加多机环境导出变量的语句:
- 删除
- 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
- 拷贝“examples/llama/pretrain_llama_7B_zero_8p.sh”文件到工程根目录并重命名,此处以ttprun_llama.sh为例。
父主题: 使用指导