开发者
资源

对接ModelLink框架

前提条件

ModelLink框架和数据集的准备请参见ModelLink

  • 支持的版本对应Commit ID为:af1347
  • README中涉及安装Megatron-LM的tag=23.05的部分切换为23.06(原tag已被删除)。
  • “enable_ttp”为开启TTP代码开关,默认开启。
  • 若不使用TTP而使用原生代码,请参见10导出变量ENABLE_TTP为False;当卸载MindIO.TTP时,启动方式还原为torchrun,依旧需要导出变量ENABLE_TTP为False。

操作步骤

ModelLink框架和MindIO TTP SDK安装完成之后,需要修改代码才可在框架中启动TTP模块,在训练中同步优化器数据更新状态到TTP模块,故障时TTP能够及时生成临终CheckPoint。

  1. 修改“ascendspeed/__init__.py”
    1. 打开“ascendspeed/__init__.py”文件。
      vim ascendspeed/__init__.py
    2. 按“i”进入编辑模式,修改以下内容。
      # 45行导入get_ttp_switch。
      from .utils import get_ttp_switch
    3. 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
  2. 修改“ascendspeed/utils.py”
    1. 打开“ascendspeed/utils.py”文件。
      vim ascendspeed/utils.py
    2. 按“i”进入编辑模式,修改以下内容。
      # 44行后增加下列方法。
      def get_ttp_switch():
          enable_ttp = os.getenv('ENABLE_TTP', 'True')
          enable_ttp = enable_ttp.lower() in ('true', '1')
          return enable_ttp
    3. 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
  3. 修改“ascendspeed/optimizer/__init__.py”
    1. 打开“ascendspeed/optimizer/__init__.py”文件。
      vim ascendspeed/optimizer/__init__.py
    2. 按“i”进入编辑模式,修改以下内容。
      • 导入TTPReplicaOptimizer。
        (常用)DP Size为偶数时:
        from ascendspeed import get_ttp_switch
        enable_ttp = get_ttp_switch()
        if enable_ttp:
            from mindio_ttp.modellink import TTPReplicaOptimizer

        适配DP Size为奇数时,优化器导入为:

        from mindio_ttp.modellink import TTPOddReplicaOptimizer as TTPReplicaOptimizer
      • 修改194行的“opt_ty”参数值。
        if args.use_distributed_optimizer:
            if args.release_fp32_grad:
                opt_ty = DistributedOptimizerWithoutFP32Grad
            else:
                opt_ty = TTPReplicaOptimizer if enable_ttp else DistributedOptimizer  // add here
        if enable_ttp:
            return opt_ty(optimizer,
                          args.clip_grad,
                          args.log_num_zeros_in_grad,
                          params_have_main_grad,
                          args.use_contiguous_buffers_in_local_ddp,
                          args.fp16,
                          args.bf16,
                          args.params_dtype,
                          grad_scaler,
                          model, args.local_copy_os)  // add local_copy param
        else:
            return opt_ty(optimizer,
                          args.clip_grad,
                          args.log_num_zeros_in_grad,
                          params_have_main_grad,
                          args.use_contiguous_buffers_in_local_ddp,
                          args.fp16,
                          args.bf16,
                          args.params_dtype,
                          grad_scaler,
                          model)

        开启local copy时:local_copy设置为True。

    3. 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
  4. 修改“ascendspeed/optimizer/clip_grads.py”
    1. 打开“ascendspeed/optimizer/clip_grads.py”文件。
      vim ascendspeed/optimizer/clip_grads.py
    2. 按“i”进入编辑模式,修改以下内容。
      # 99行后新增以下内容
      from ascendspeed import get_ttp_switch
      enable_ttp = get_ttp_switch()
      if enable_ttp:
          from mindio_ttp.modellink.os_shard_parallel import get_os_shard_group
      
      # 第104行作如下修改:
      torch.distributed.all_reduce(total_norm,
                                   op=torch.distributed.ReduceOp.SUM,
                                   group=get_os_shard_group() if enable_ttp else parallel_state.get_data_parallel_group())
    3. 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
  5. 修改“ascendspeed/optimizer/optimizer.py”
    1. 打开“ascendspeed/optimizer/optimizer.py”文件。
      vim ascendspeed/optimizer/optimizer.py
    2. 按“i”进入编辑模式,修改以下内容。
      # 35行后新增以下内容
      from ascendspeed import get_ttp_switch
      enable_ttp = get_ttp_switch()
      if enable_ttp:
          import mindio_ttp.framework_ttp as ttp
      
      # 在490行的step方法中增加以下内容
      timers('optimizer-inner-step', log_level=1).start(
          barrier=args.barrier_with_L1_time) // add after this line
      
      if enable_ttp and hasattr(self, 'begin_to_update'):
          self.begin_to_update(args.iteration)
      self.optimizer.step()
      if enable_ttp and hasattr(self, 'end_to_update'):
          self.end_to_update()
    3. 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
  6. 修改ascendspeed/core/parallel_state.py。
    1. 打开“ascendspeed/core/parallel_state.py”文件。
      vim ascendspeed/core/parallel_state.py
    2. 按“i”进入编辑模式,修改代码。
      • 适配DP Size为偶数时:
        # 在176行args = get_args()后添加
        from ascendspeed import get_ttp_switch
        enable_ttp = get_ttp_switch()
        if enable_ttp:
            from mindio_ttp.modellink import build_optimizer_replica_group
        
        # 在for循环内group = torch.distributed.new_group(ranks)后增加
        if enable_ttp:
            build_optimizer_replica_group(list(ranks))

      适配DP Size为奇数时,parallel_state.py进行如下适配。

      • 适配DP Size为奇数时:
        # 在283左右的def is_unitialized方法前添加
        def get_data_parallel_all_rank():
            assert _DATA_PARALLEL_GLOBAL_RANKS is not None, \
                "Data parallel group is not initialized"
            return _DATA_PARALLEL_GLOBAL_RANKS
    3. 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
  7. 修改“ascendspeed/checkpointing.py”
    1. 打开“ascendspeed/checkpointing.py”文件。
      vim ascendspeed/checkpointing.py
    2. 按“i”进入编辑模式,修改代码。
      # 34行后增加get_ttp_switch
      from ascendspeed import get_ttp_switch
      
      # 47行后增加
      enable_ttp = get_ttp_switch()
      
      # 112行修改get_checkpoint_name方法:
      def get_checkpoint_name(checkpoints_path, iteration,
                              release=False, model_name='model_optim_rng.pt', tmp=False): // add "tmp=False"
      
          directory = 'iter_{:07d}'.format(iteration) // add after this line
      if tmp:
          directory = directory + "_tmp"
      
      # 147行增加ttp_save_checkpoint如下:
      def ttp_save_checkpoint(save_rank, step, rank_list, args):
          iteration, model, optimizer, opt_param_scheduler = args
          if hasattr(optimizer, 'set_dump_args'):
              optimizer.set_dump_args(save_rank, step, rank_list)
              save_checkpoint(iteration, model, optimizer, opt_param_scheduler)
      
      # 155行修改save_checkpoint方法:
      print_rank_0('saving checkpoint at iteration {:7d} to {}'.format(
          iteration, args.save)) // add after this line
      
      if enable_ttp and args.local_copy_os and optimizer.error_dump and hasattr(optimizer, "save_args") and optimizer.current_step > optimizer.save_args['step']:
          iteration = optimizer.save_args['step']
      
      if enable_ttp:
          tmp = True if optimizer.error_dump else False
          checkpoint_name = get_checkpoint_name(args.save, iteration, tmp=tmp)
      else:
          checkpoint_name = get_checkpoint_name(args.save, iteration)
      
      cur_rank = torch.distributed.get_rank()
      save_flag = optimizer.need_write_file() if enable_ttp else parallel_state.get_data_parallel_rank() == 0
      
      # 修改判断if parallel_state.get_data_parallel_rank() == 0: 为下:
      if save_flag:
      
      # 修改185行左右判断为:
      if not torch.distributed.is_initialized() or save_flag or args.deepspeed:
      
      # 修改save_checkpoint方法最后save_checkpoint_post_process(iteration),增加判断分支:
      if enable_ttp and optimizer.error_dump:
          print('rank {} successfully saved checkpoint at iteration {:7d} to {}'.format(
              cur_rank, iteration, args.save))
      else:
          save_checkpoint_post_process(iteration)
      
      # 增加以下方法:
      def rename_save_result(args):
          iteration = args[0]
          optimizer = args[2]
          args = get_args()
          src_path = None
          dst_path = None
      
          if args.local_copy_os and hasattr(optimizer, "save_args") and optimizer.current_step > optimizer.save_args['step']:
              iteration = optimizer.save_args['step']
          try:
              tmp_dir = 'iter_{:07d}_tmp'.format(iteration)
              fin_dir = 'iter_{:07d}'.format(iteration)
              src_path = os.path.join(args.save, tmp_dir)
              dst_path = os.path.join(args.save, fin_dir)
              os.rename(src_path, dst_path)
          except Exception:
              print(f"rename from {src_path} to {dst_path} failed.")
              traceback.print_exc()
              return 1
      
          # And update the latest iteration
          tracker_filename = get_checkpoint_tracker_filename(args.save)
          with open(tracker_filename, 'w') as f:
              f.write(str(iteration))
          return 0
    3. 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
  8. 修改“ascendspeed/training.py”
    1. 打开“ascendspeed/training.py”文件。
      vim ascendspeed/training.py
    2. 按“i”进入编辑模式,修改代码。
      # 29行后增加
      from ascendspeed import get_ttp_switch
      
      # 在68行后导入以下内容:
      enable_ttp = get_ttp_switch()
      if enable_ttp:
          import mindio_ttp.framework_ttp as ttp
          from mindio_ttp.modellink import get_dp_rank_list
          from ascendspeed.checkpointing import ttp_save_checkpoint, rename_save_result
      
      # 在setup_model_and_optimizer方法中686行后加入:
          model[0].global_steps = student_global_steps //add after this line
      if enable_ttp and hasattr(optimizer, "current_step"):
          optimizer.current_step = args.iteration
          print(f"optimizer current step has been set to {optimizer.current_step}")
      
      # 在train_step方法中776行后加入:
      timers('backward-embedding-all-reduce').stop() // add after this line
      if enable_ttp and (not args.local_copy_os):
          torch.distributed.barrier()
      
      # 1154行后修改train方法:
      def train(...)
      ...
      total_loss_dict = {} // add after this line
      if enable_ttp:
          # init processor and controller
          cur_rank = torch.distributed.get_rank()
          world_size: int = torch.distributed.get_world_size()
          dp_ranks = get_dp_rank_list()
      
          import os
          masterIp = os.getenv('MASTER_ADDR', '127.0.0.1')
          port = 8000
          if cur_rank == 0:
              ttp.ttp_init_controller(cur_rank, world_size, replica=len(dp_ranks)//2, enable_local_copy=args.local_copy_os)
              ttp.ttp_start_controller(masterIp, port)
          ttp.ttp_init_processor(cur_rank, dp_ranks, len(dp_ranks), world_size, replica=len(dp_ranks)//2, enable_local_copy=args.local_copy_os)
          ttp.ttp_start_processor(masterIp, port)
          ttp.ttp_register_save_ckpt_handler(ttp_save_checkpoint)
          ttp.ttp_register_rename_handler(rename_save_result)
      ...
      
      iteration += 1 //add after this line
      if enable_ttp:
          if hasattr(optimizer, "current_step") and iteration != optimizer.current_step:
              print_rank_0(f"refresh optimizer step {iteration}")
          ttp.ttp_set_ckpt_args((iteration, model, optimizer, lr_scheduler))
      
      # train方法后,evaluate方法前增加
      if enable_ttp:
          train = ttp.ttp_to_persist(train)

      适配DP Size为奇数时:

      • replica = 2
      • dp_ranks获取如下:
        dp_ranks = parallel_state.get_data_parallel_all_rank()
    3. 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
  9. 修改“ascendspeed/arguments.py”
    1. 打开“ascendspeed/arguments.py”文件。
      vim ascendspeed/arguments.py
    2. 按“i”进入编辑模式,修改代码。
      # 845行之后增加以下内容:
      group.add_argument('--local-copy-os',
                         action='store_true',
                         default=False,
                         help='barrier or local copy os, suggest barrier on small cluster')
    3. 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。
  10. 编辑预训练脚本。(参考“examples/llama/pretrain_llama_7B_zero_8p.sh”
    1. 拷贝“examples/llama/pretrain_llama_7B_zero_8p.sh”文件到工程根目录并重命名,此处以ttprun_llama.sh为例。
      cp examples/llama/pretrain_llama_7B_zero_8p.sh ./ttprun_llama.sh
    2. 打开“ttprun_llama.sh”文件。
      vim ttprun_llama.sh
    3. 按“i”进入编辑模式,在对应位置编辑脚本。
      • 删除
        • 删除涉及deepspeed启动的代码。
        • 删除参数:
          -- ds_args \
        • (可选)删除参数:
          # Atlas 800T A2 训练服务器上需保留这个两参数,其他服务器需要删除
          --use-flash-attn \
          --use-fused-rmsnorm \ 
      • 修改
        • GPUS_PER_NODE修改为NPUS_PER_NODE。
        • 修改set_env.sh为实际路径。
        • 根据实际情况修改ip、port、nnodes、node_rank。
        • 修改启动方式为:
          ttprun $DISTRIBUTED_ARGS

          卸载MindIO TTP后,启动方式需要还原为torchrun。

      • 新增
        • 增加多机环境导出变量的语句:
          # 此处以enp189s0f0为例,高速网卡名称,可通过在计算节点执行ip a命令获取,使用第一个配置节点IP地址的名称,请根据实际情况进行修改
          export GLOO_SOCKET_IFNAME=enp189s0f0
          
          # 将设置好的MASTER_ADDR导出
          export MASTER_ADDR=xxx.xxx.xxx.xxx
          
          # 导出变量,集群故障后使用TTP保存数据等待框架退出的时间,不导出默认120s,视情况修改
          export TTP_OT=360
          
          # TTP特性开关,默认值为True,此时不加ENABLE_TTP参数;使用原生代码时设置为False
          export ENABLE_TTP=True
        • 增加参数:
          # 此处以./dataset/llama为例,请根据实际情况进行修改
          TOKENIZER_PATH=./dataset/llama
          
          DISTRIBUTED_ARGS="--nproc_per_node $NPUS_PER_NODE --nnodes $NNODES --node_rank $NODE_RANK --master_addr $MASTER_ADDR --master_port $MASTER_PORT"
          
          --use-distributed-optimizer \
          --save $CHECKPOINT_PATH \
          
          # 大集群推荐,小集群比较占显存,加上以下参数开启local copy
          --local-copy-os \

          适配DP Size为奇数时,修改NPUS_PER_NODE为奇数。

    4. 按“Esc”键,输入:wq!,按“Enter”保存并退出编辑。