昇腾社区首页
中文
注册
开发者
下载

使用7.1.RC1及以上版本TaskD

MindCluster集群调度组件结合MindStudio提供的profiling能力,对集群中的性能劣化故障(慢节点)提供诊断功能。该功能提供动态打点和打点数据持久化功能、可动态启停训练任务打点功能,无需重启任务进行诊断,对训练无损耗。

当前支持的打点数据如表1所示。

表1 打点数据说明

打点数据的类型

支持的AI框架

提供支持的组件

FP

(标识前向传播数据)

PyTorch

说明:

仅支持单算子场景。

mstx_torch_plugin

Step

(标识Step时延)

PyTorch、MindSpore

  • PyTorch
    • 原生优化器场景:若torch_npu为7.1.RC1版本,需使用mstx_torch_plugin;若torch_npu为7.1.RC1以上版本,无需使用mstx_torch_plugin,torch_npu自带Step打点。
    • 自定义优化器场景:手动增加打点数据。
  • MindSpore
    • MindFormers场景:Step打点数据由MindFormers提供。
    • MindSpeed场景:不提供Step打点数据。

Communication

(标识通信算子)

PyTorch、MindSpore

  • PyTorch:torch_npu
  • MindSpore:MindSpore框架。

SaveCheckpoint

(标识SaveCheckpoint耗时)

PyTorch、MindSpore

  • PyTorch:torch_npu
  • MindSpore:MindSpore框架。

DataLoader

(标识DataLoader耗时)

PyTorch、MindSpore

  • PyTorch:torch_npu
  • MindSpore:MindSpore框架。

使用约束

  • 当前Step、SaveCheckpoint、FP、DataLoader仅支持同步开启。如需关闭以上四类打点数据,需同时关闭Communication。
  • Communication通信算子数据支持单独开启、关闭。
  • 动态轻量打点功能与MindStudio的全量打点功能不可同时开启,开启全量打点功能会导致性能劣化故障不能正常采集数据。

前提条件

  • (可选)已安装ClusterDAscend Device PluginVolcano(以上MindCluster组件版本均需与TaskD配套)
  • 在容器内安装torch_npu可选,PyTorch场景需安装、版本号≥7.1.RC1)、MindSpore(可选,MindSpore场景需安装、版本号≥2.7.0)Kernels必选,版本号≥8.2.RC1)、Toolkit必选,版本号≥8.2.RC1)、TaskD必选

准备软件包

表2 准备软件包

软件包

是否必选

说明

获取方法

使用场景

mstx_torch_plugin

Ascend PyTorch Profiler中的采集并解析msproftx数据功能已经内置了通信算子的打点。为了方便用户在不修改业务代码的基础上获取更多关键阶段的耗时数据,mstx_torch_plugin在Ascend PyTorch Profiler内置了dataloaderforwardstepsave_checkpoint这四个关键阶段函数的打点。

说明:
  • 如需使用FP打点数据,需安装mstx_torch_plugin。其他场景下无需安装。
  • 需使用1.0及以上版本的mstx_torch_plugin。

获取链接

PyTorch

配置性能劣化故障检测

本方案仅针对7.1.RC1及以上版本的TaskD组件。如使用7.1.RC1以下版本的组件请参考使用其他版本TaskD章节进行操作。

  1. 以下两种方式请根据实际需要进行二选一。
    • 在容器内安装mstx_torch_plugin。
      1. 下载mstx_torch_plugin的whl包。whl包链接:mstx_torch_plugin
      2. 安装软件包。
        pip install mstx_torch_plugin-1.0-py3-none-any.whl
      3. 在AI任务执行脚本中import导入该whl包。

        需保证import的顺序在import torch和import torch_npu后面,示例如下。

        import torch 
        import torch_npu  
        import mstx_torch_plugin
    • 在PyTorch场景下,非原生优化器或不使用mstx_torch_plugin的情况下,为获取训练的Step耗时数据需修改训练脚本中的训练迭代循环,需增加Step打点代码。
      以下示例为PyTorch-MindSpeed场景,需修改./mindspeed_llm/training/training.py文件增加如下加粗字段。MindSpore场景请根据实际情况修改。
      def train(forward_step_func, model, optimizer, opt_param_scheduler,
                train_data_iterator, valid_data_iterator,
                process_non_loss_data_func, config):
                  # Cache into one-logger for callback
          ……
          ……
          if is_profile_enabled():
              prof = get_profiler()
              prof.start()
          step_id = iteration
          while iteration < args.train_iters:
              stream = torch.npu.current_stream()      # 获取当前环境的执行流,用于获取NPU侧时间
              range_id = torch.npu.mstx.range_start(f"step {step_id}", stream) # 标识当前训练step的开始
              ……
              ……
              if args.manual_gc:
                  if args.manual_gc_interval != 0 and iteration % args.manual_gc_interval == 0:
                      gc.collect()
      
              if is_profile_enabled():
                  prof.step()
              step_id +=1  # 训练step加一,用于标识下一step
              torch.npu.mstx.range_end(range_id) # 标识当前训练step的结束
  2. 在容器内,以CANN软件包的运行用户登录环境,执行source ${install_path}/set_env.sh命令设置环境变量。其中${install_path}为CANN软件的安装目录。示例如下。
    source /usr/local/Ascend/ascend-toolkit/set_env.sh
  3. 训练启动前,在训练脚本中导入LD_PRELOAD环境变量。该环境变量允许系统提前加载指定的so文件。示例如下。
    export LD_PRELOAD=/usr/local/Ascend/ascend-toolkit/latest/lib64/libmspti.so:/usr/local/python3.10.5/lib/python3.10/site-packages/taskd/python/cython_api/libs/libtaskd.so
    • libmspti.so:该so由MindStudio提供,集成在CANN包内。当使用默认安装路径时,路径为:/usr/local/Ascend/ascend-toolkit/latest/lib64/libmspti.so。
    • libtaskd.so:该so由TaskD组件提供,安装该whl包后,路径为: TaskD所在路径/taskd/python/cython_api/libs/libtaskd.so。

      TaskD所在路径可通过以下命令进行查询。回显中的Location字段即为TaskD所在路径。

      pip show taskd
  4. 在分布式环境初始化完成,能够获取到全局rank之后,修改训练脚本,在训练脚本中拉起TaskD Manager,在管理进程中拉起TaskD Proxy,在训练进程内部拉起TaskD Worker。
    1. 拉起TaskD Manager和TaskD Proxy。
      • PyTorch场景
        1. 创建manager.py文件,放在调用训练脚本时的当前目录下。manager.py文件内容如下所示。
          from taskd.api import init_taskd_manager, start_taskd_manager
          import os
          
          job_id=os.getenv("MINDX_TASK_ID")
          node_nums=XX         # 用户填入任务节点总数,int类型
          proc_per_node=XX     # 用户填入任务每个节点的训练进程数量int类型
          
          init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node})
          start_taskd_manager()
        2. 在训练脚本中增加以下代码,拉起TaskD Manager和TaskD Proxy。
          # 可选,使用TaskD Agent时需要配置以下sed命令,使用Elastic Agent时无需配置
          sed -i '/import os/i import taskd.python.adaptor.patch' $(pip3 show torch | grep Location | awk -F ' ' '{print $2}')/torch/distributed/run.py
          
          if [[ "${RANK}" -eq 0 ]]; then
              export MASTER_ADDR=${POD_IP}
              python manager.py &
          fi
              
          torchrun ...
      • MindSpore场景
        1. 创建manager.py文件,放在调用训练脚本时的当前目录下,manager.py文件内容如下所示。
          from taskd.api import init_taskd_manager, start_taskd_manager
          import os
          
          job_id=os.getenv("MINDX_TASK_ID")
          node_nums=XX         # 用户填入任务节点总数int类型
          proc_per_node=XX     # 用户填入任务每个节点的训练进程数量int类型
          
          init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node})
          start_taskd_manager()
        2. 在训练脚本中增加以下代码拉起TaskD Manager。
          if [[ "${MS_SCHED_HOST}" -eq "${POD_IP}" ]]; then
              python manager.py &
          fi
              
          msrun ...
        3. 修改mindspore/python/mindspore/parallel/cluster/process_entity/_api.py文件,拉起TaskD Proxy。示例如下。
          ...
            if ("TTP:1" in tft_env) or ("UCE:1" in tft_env) or ("ARF:1" in tft_env):
                      try:
                          from taskd.python.framework.agent.ms_mgr.msrun_plugin import MSRunPlugin
                          from taskd .api.taskd_proxy_api import init_taskd_proxy
                          from taskd.python.framework.common.type import CONFIG_UPSTREAMIP_KEY, LOCAL_HOST
                        import threading
                        proxy = threading.Thread(target=init_taskd_proxy, args=({CONFIG_UPSTREAMIP_KEY : os.getenv("MS_SCHED_HOST", LOCAL_HOST)},))
                        proxy.daemon = True
                        proxy.start()
                          self.msmgr = MSRunPlugin()
                          self.msmgr.register_callbacks("KILL_WORKER", self.kill_workers)
                          self.msmgr.register_callbacks("START_ALL_WORKER", self.start_all_workers)
                          self.msmgr.register_callbacks("START_WORKER_LIST", self.start_worker_list)
                          self.msmgr.register_callbacks("MONITOR", self.monitor_rank_status)
                          self.enable_mindx = True
                          os.environ["MS_ENABLE_RECOVERY"] = str(1)
          ...
    2. 拉起TaskD Worker。
      • PyTorch-MindSpeed场景:修改LLAMA2_for_PyTorch_2.7_code/mindspeed_llm/training/training.py文件,在代码中增加如下加粗字段。
        def pretrain(train_valid_test_dataset_provider,
                     model_provider,
                     model_type,
                     forward_step_func,
                     process_non_loss_data_func=None,
                     extra_args_provider=None,
                     args_defaults={}):
            print_rank_0('time to initialize megatron (seconds): {:.3f}'.format(
                time.time() - _TRAIN_START_TIME))
            print_datetime('after megatron is initialized')
            import torch.distributed as dist
            if dist.is_initialized():
               rank = dist.get_rank()
               from taskd.api.taskd_worker_api import init_taskd_worker
               from taskd.api.taskd_worker_api import start_taskd_worker
               init_taskd_worker(rank,5000)
               start_taskd_worker()
            app_metrics['app_model_init_finish_time'] = one_logger_utils.get_timestamp_in_ms()
            one_logger_utils.on_pretrain_start()
      • MindSpore-MindFormers场景:修改./mindformers/trainer/base_trainer.py文件,在代码中增加如下加粗字段。
            def training_process(
                    self,
                    config: Optional[Union[dict, MindFormerConfig, ConfigArguments, TrainingArguments]] = None,
                    network: Optional[Union[Cell, PreTrainedModel]] = None,
                    dataset: Optional[Union[BaseDataset, GeneratorDataset]] = None,
                    optimizer: Optional[Optimizer] = None,
                    callbacks: Optional[Union[Callback, List[Callback]]] = None,
                    compute_metrics: Optional[Union[dict, set]] = None,
                    **kwargs):
                ……
                ……
        
                logger.info(".........Starting Training Model..........")
                if get_real_rank() % 8 == 0:
                    pprint(config)
                logger.info(".........Model Compiling, Please Wait a Moment...........")
                try:
                    rank = get_rank()
                    from taskd.api.taskd_worker_api import init_taskd_worker
                    from taskd.api.taskd_worker_api import start_taskd_worker
                    init_taskd_worker(rank,5000)
                    start_taskd_worker()
                except Exception as e:
                    print("failed to call mindcluster taskd")
                model.train(config.runner_config.epochs, dataset,
                            callbacks=callbacks,
                            dataset_sink_mode=config.runner_config.sink_mode,
                            sink_size=config.runner_config.sink_size,
                            initial_epoch=config.runner_config.initial_epoch)
  5. 修改任务YAML。
    1. 修改容器暴露端口,在所有的Pod下增加TaskD通信使用的端口9601。
      ports:                         
         - containerPort: 9601             
           name: taskd-port 
    2. 挂载文件。
      1. 挂载轻量profiling配置文件:需将宿主机上任务对应的data-trace ConfigMap落盘到/user/cluster-info/datatrace-config/命名空间.data-trace-任务名称/文件夹下。将名为profilingSwitch的文件挂载到容器指定路径:/user/cluster-info/datatrace-config/。
      2. 挂载轻量profiling落盘文件:轻量profiling数据写在容器内的/user/cluster-info/profiling路径下。如需在宿主机获取,请修改任务YAML,将该路径挂出。
        • 容器内YAML挂载示例如下。
          volumeMounts:
          - name: profilingdata
            mountPath: /user/cluster-info/
          - name: profileswitch
            mountPath: /user/cluster-info/datatrace-config
        • 宿主机内YAML挂载示例如下。
          volumes:
          - name: profileswitch
            hostPath:
              path: /user/cluster-info/datatrace-config/default.data-trace-default-test-pytorch-fault-mixtral
          - name: profilingdata
            hostPath:
               path: /home/profilingdatapath
  6. 开启轻量profiling获取落盘数据。修改ClusterD提供的gRPC接口,接口信息见ModifyTrainingDataTraceSwitch,动态开启或关闭轻量profiling能力。
    • 通过ClusterD提供的gRPC接口开启或修改轻量profiling获取落盘数据,创建的data-trace-<任务名称> ConfigMap的生命周期会随着任务的删除而删除。当任务不存在时,该接口会调用失败。
    • 开启通信算子后可能造成训练性能下降,不建议常态开启通信算子。

获取性能劣化故障检测数据

  • 落盘数据按rank进行分类,轻量profiling数据写在容器内的/user/cluster-info/profiling路径。
  • 对于存在环境变量MINDX_TASK_ID的Pod,rank 0数据在容器内的路径为/user/cluster-info/profiling/$MINDX_TASK_ID/0。
    • 如无该环境变量,默认会落盘到名为default_task_id_时间戳的文件夹内。
    • /user/cluster-info/profiling达到配置的上限大小后,将进行文件老化,默认每次删除修改时间最早的20%个文件。老化过程中仅删除profiling目录下rank文件夹中的以数字命名的文件,建议不手动添加其他文件到profiling文件夹下。如果用户手动添加其他文件,TaskD不会将该文件删除,但该文件会占用空间。
    • 轻量profiling文件以时间戳命名,各条记录以换行分割,每次追加写入rank下最新文件。最新文件大小超过10MB时,TaskD会新建profiling文件。如果使用NFS等网络存储方式,当数据同步较慢时,可能存在文件大小未达到10MB即创建新文件的情况。