昇腾社区首页
中文
注册
开发者
下载

PyTorch场景(基于MindSpeed-LLM)

本章节将指导用户了解配置在线压测的关键步骤。在线压测的特性介绍、使用约束、支持的产品型号等请参见在线压测

前提条件

操作步骤

  1. 在分布式环境初始化完成,能够获取到全局rank之后,修改训练脚本,在训练脚本中拉起TaskD Manager,在训练进程内部拉起TaskD Worker。
    1. 拉起TaskD Manager。
      1. 创建manager.py文件,放在调用训练脚本时的当前目录下。manager.py文件内容如下所示。
        from taskd.api import init_taskd_manager, start_taskd_manager
        import os
        
        job_id=os.getenv("MINDX_TASK_ID")
        node_nums=XX         # 用户填入任务节点总数,int类型
        proc_per_node=XX     # 用户填入任务每个节点的训练进程数量,int类型
        
        init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node})
        start_taskd_manager()
      2. 在训练脚本中增加以下代码,拉起TaskD Manager。
        sed -i '/import os/i import mindx_elastic.api' $(pip3 show torch | grep Location | awk -F ' ' '{print $2}')/torch/distributed/run.py
        # 可选,使用PyTorch 2.6.0及以上版本,并使用优雅容错、Pod级别重调度或进程级别重调度时需要配置以下命令
        sed -i '/if _torchelastic_use_agent_store():/i\
            if _torchelastic_use_agent_store():\
                from torch.distributed import PrefixStore\
                attempt = os.environ["TORCHELASTIC_RESTART_COUNT"]\
                tcp_store = TCPStore(hostname, port, world_size, False, timeout)\
                return PrefixStore(f"\/worker\/attempt_{attempt}", tcp_store)' $(pip3 show torch | grep Location | awk -F ' ' '{print $2}')/torch/distributed/rendezvous.py
        if [[ "${RANK}" == 0 ]]; then
            export MASTER_ADDR=${POD_IP}
            python manager.py &           # 具体执行路径由当前路径决定
        fi
            
        torchrun ...
    2. 拉起TaskD Worker。
      修改LLAMA2_for_PyTorch_2.7_code/mindspeed_llm/training/training.py文件,在代码中增加如下加粗字段。
      def pretrain(train_valid_test_dataset_provider,
                   model_provider,
                   model_type,
                   forward_step_func,
                   process_non_loss_data_func=None,
                   extra_args_provider=None,
                   args_defaults={}):
          print_rank_0('time to initialize megatron (seconds): {:.3f}'.format(
              time.time() - _TRAIN_START_TIME))
          print_datetime('after megatron is initialized')
          import torch.distributed as dist
          if dist.is_initialized():
             rank = dist.get_rank()
             from taskd.api.taskd_worker_api import init_taskd_worker
             from taskd.api.taskd_worker_api import start_taskd_worker
             init_taskd_worker(rank,5000,"pt")
             start_taskd_worker()
          app_metrics['app_model_init_finish_time'] = one_logger_utils.get_timestamp_in_ms()
          one_logger_utils.on_pretrain_start()
  2. 修改训练框架代码,打开在线压测开关。
    修改LLAMA2_for_PyTorch_2.7_code/mindspeed_llm/training/training.py文件,在代码中增加如下加粗字段。
    # Exiting based on iterations
    if args.exit_interval and iteration % args.exit_interval == 0:
        if args.save and not saved_checkpoint
            save_checkpoint_and_time(iteration, model, opmimizer,
                                     opt_param_scheduler,
                                     num_floating_point_operations_so_far)
        torch.distributed.barrier()
        print_datetime('exiting program at iteration {}'.format(iteration))
        exit = True
        break
    
        if args.manual_gc:
            if args.manual_gc_interval !=0 and iteration % args.manual_gc_interval == 0:
                gc.collect()
        if is_profile_enable():
            prof.step()
    
        from mindio_ttp.framework_ttp import tft_pause_train
        tft_pause_train(args.iteration)     
    ……

    如果训练中出现报错“the libtaskd.so has not been loaded”,则需在训练脚本中导入LD_PRELOAD环境变量。该环境变量允许系统提前加载指定的so文件。示例如下。

    export LD_PRELOAD=/usr/local/Ascend/ascend-toolkit/latest/lib64/libmspti.so:/usr/local/lib/python3.10/dist-packages/taskd/python/cython_api/libs/libtaskd.so
    • libmspti.so:该so由MindStudio提供,集成在CANN包内。当使用默认安装路径时,路径为:/usr/local/Ascend/ascend-toolkit/latest/lib64/libmspti.so。
    • libtaskd.so:该so由TaskD组件提供,安装该whl包后,路径为: TaskD所在路径/taskd/python/cython_api/libs/libtaskd.so。

      TaskD所在路径可通过以下命令进行查询。回显中的Location字段即为TaskD所在路径。

      pip show taskd
  3. 修改任务YAML。
    1. 在任务YAML中修改容器暴露端口,在所有的Pod下增加TaskD通信使用的端口9601。
      ports:                         
         - containerPort: 9601             
           name: taskd-port 
    2. 在任务YAML中新增以下加粗字段。
      ...  
         labels:  
           ...  
           process-recover-enable: "on"  
           fault-scheduling: "force"
       ... 
      ...  
         annotations:  
           ...  
           recover-strategy: "recover"    # 任务可用恢复策略,取值为recover,表示开启进程级别重调度
       ... 
      ...
      spec:
        replicaSpecs:
          Master:
            template:
              spec:
                containers:
                - name: ascend # do not modify
                  env:
                    - name: PROCESS_RECOVER         # 开启进程级别重调度需注入该环境变量
                      value: "on"
                  args:
                    - | 
                      ...
                      export ELASTIC_PROCESS_RECOVER_ENABLE=1;
                      ... 
                      bash scripts/train_start.sh /job/code /job/output pretrain_gpt.py \
                        ...
                        --enable-high-availability \
                        --enable-hbmfault-repair \
                        ...
          Worker:
            template:
              spec:
                containers:
                - name: ascend # do not modify
                  env:
                    - name: PROCESS_RECOVER         # 开启进程级别重调度需注入该环境变量
                      value: "on"
                  args:
                    - |
                      ...
                      export ELASTIC_PROCESS_RECOVER_ENABLE=1;
                      ...
                      bash scripts/train_start.sh /job/code /job/output pretrain_gpt.py \
                        ...
                        --enable-high-availability \
                        --enable-hbmfault-repair \
                        ...
      ...