昇腾社区首页
中文
注册

MindSpore场景(基于MindFormers)

本章节将指导用户了解配置借轨通信任务暂停与回切的关键步骤。借轨通信任务暂停与回切的特性介绍、使用约束、支持的产品型号及原理请参见借轨通信任务暂停与回切

前提条件

操作步骤

  1. 准备mindformers代码仓,执行如下命令。
    mkdir -p /data/atlas_dls/public/code
    git clone https://gitee.com/mindspore/mindformers/pulls/6480
    cd mindformers
    git checkout 39b2c08ba08d77e259daa13d0db9817bf2dc03f2
    mkdir dataset
    mkdir yamls
    cd ..
     
    # 将mindformers重命名为LLAMA2_for_MS_code
    mv mindformers LLAMA2_for_MS_code
  2. 在分布式环境初始化完成,能够获取到全局rank之后,修改训练脚本,在训练脚本中拉起TaskD Manager,在管理进程中拉起TaskD Proxy,在训练进程内部拉起TaskD Worker。
    1. 拉起TaskD Manager。
      1. 创建manager.py文件,放在和训练脚本同一目录下,manager.py文件内容如下所示。
        from taskd.api import init_taskd_manager, start_taskd_manager
        import os
        
        job_id=os.getenv("MINDX_TASK_ID")
        node_nums=XX         # 用户填入任务节点总数
        proc_per_node=XX     # 用户填入任务每个节点的训练进程数量
        
        init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node})
        start_taskd_manager()
      2. 在训练脚本中增加执行以下代码拉起TaskD Manager。
        if [[ "${MS_SCHED_HOST}" == "${POD_IP}" ]]; then
            python manager.py &   # 具体执行路径由当前路径决定
        fi
            
        msrun ...
    2. 拉起TaskD Worker。修改./mindformers/trainer/base_trainer.py文件,在代码中增加如下加粗字段。
          def training_process(
                  self,
                  config: Optional[Union[dict, MindFormerConfig, ConfigArguments, TrainingArguments]] = None,
                  network: Optional[Union[Cell, PreTrainedModel]] = None,
                  dataset: Optional[Union[BaseDataset, GeneratorDataset]] = None,
                  optimizer: Optional[Optimizer] = None,
                  callbacks: Optional[Union[Callback, List[Callback]]] = None,
                  compute_metrics: Optional[Union[dict, set]] = None,
                  **kwargs):
              ……
              ……
      
              logger.info(".........Starting Training Model..........")
              if get_real_rank() % 8 == 0:
                  pprint(config)
              logger.info(".........Model Compiling, Please Wait a Moment...........")
              try:
                  rank = get_rank()
                  from taskd.api.taskd_worker_api import init_taskd_worker
                  from taskd.api.taskd_worker_api import start_taskd_worker
                  init_taskd_worker(rank,5000,"ms")
                  start_taskd_worker()
              except Exception as e:
                  print("failed to call mindcluster taskd")
              model.train(config.runner_config.epochs, dataset,
                          callbacks=callbacks,
                          dataset_sink_mode=config.runner_config.sink_mode,
                          sink_size=config.runner_config.sink_size,
                          initial_epoch=config.runner_config.initial_epoch)
  3. 修改任务YAML。
    1. 在任务YAML中修改容器暴露端口,在所有的Pod下增加TaskD通信使用的端口9601。
      ports:                         
         - containerPort: 9601             
           name: taskd-port 
    2. 在任务YAML中新增以下加粗字段。
      ...  
         labels:  
           ...  
           process-recover-enable: "on"  
           fault-scheduling: "force"
       ... 
      ...  
         annotations:  
           ...  
           recover-strategy: "retry"    # 任务可用恢复策略,取值为retry,表示开启进程级在线恢复
       ... 
      ...
      spec:
        replicaSpecs:
          Master:
            template:
              spec:
                containers:
                - name: ascend # do not modify
                  env:
                    - name: PROCESS_RECOVER         # 开启进程级别在线恢复需注入该环境变量
                      value: "on"
                  args:
                    - | 
                      ...
                      export ELASTIC_PROCESS_RECOVER_ENABLE=1;
                      ... 
                      bash scripts/train_start.sh /job/code /job/output pretrain_gpt.py \
                        ...
                        --enable-high-availability \
                        --enable-hbmfault-repair \
                        ...
          Worker:
            template:
              spec:
                containers:
                - name: ascend # do not modify
                  env:
                    - name: PROCESS_RECOVER         # 开启进程级在线恢复需注入该环境变量
                      value: "on"
                  args:
                    - |
                      ...
                      export ELASTIC_PROCESS_RECOVER_ENABLE=1;
                      ...
                      bash scripts/train_start.sh /job/code /job/output pretrain_gpt.py \
                        ...
                        --enable-high-availability \
                        --enable-hbmfault-repair \
                        ...
      ...
  4. 修改训练框架代码,打开借轨开关。

    编辑启动脚本LLAMA2_for_MS_code/scripts/msrun_launcher.sh文件,在代码中增加如下加粗字段。

    export MS_ENABLE_TFT='{TTP:1,TSP:1}'           # 开启临终遗言和借轨回切
    export HCCL_OP_RETRY_ENABLE="L0:0, L1:1, L2:1"  # 设置HCCL算子不同层级(L0/L1/L2)的重执行开关状态。重执行是指当通信算子执行报SDMA或者RDMA CQE类型的错误时,HCCL会尝试重新执行此通信算子。

    如果训练中出现报错“the libtaskd.so has not been loaded”,则需在训练脚本中导入LD_PRELOAD环境变量。该环境变量允许系统提前加载指定的so文件。示例如下。

    export LD_PRELOAD=/usr/local/Ascend/ascend-toolkit/latest/lib64/libmspti.so:/usr/local/python3.8.17/lib/python3.8/site-packages/taskd/python/cython_api/libs/libtaskd.so
    • libmspti.so:该so由MindStudio提供,集成在CANN包内。当使用默认安装路径时,路径为:/usr/local/Ascend/ascend-toolkit/latest/lib64/libmspti.so。
    • libtaskd.so:该so由TaskD组件提供,安装该whl包后,路径为: TaskD所在路径/taskd/python/cython_api/libs/libtaskd.so。

      TaskD所在路径可通过以下命令进行查询。回显中的Location字段即为TaskD所在路径。

      pip show taskd