性能劣化故障

MindCluster集群调度组件结合MindStudio提供的profiling能力,对集群中的性能劣化故障(慢节点)提供诊断功能。该功能提供动态打点和打点数据持久化功能、可动态启停训练任务打点功能,无需重启任务进行诊断,对训练无损耗。

当前支持的打点数据如表1所示。

表1 打点数据说明

打点数据的类型

支持的AI框架

提供支持的组件

FP

(标识前向传播数据)

PyTorch

说明:

仅支持单算子场景。

mstx_torch_plugin

Step

(标识step时延)

PyTorch、MindSpore

  • PyTorch
    • 原生优化器场景:mstx_torch_plugin
    • 自定义优化器场景:手动增加打点数据。
  • MindSpore
    • MindFormers场景:Step打点数据由MindFormers提供。
    • MindSpeed场景:不提供Step打点数据。

Communication

(标识通信算子)

PyTorch、MindSpore

  • PyTorch:torch_npu
  • MindSpore:MindSpore框架。

SaveCheckpoint

(标识saveCkpt耗时)

PyTorch、MindSpore

  • PyTorch:torch_npu
  • MindSpore:MindSpore框架。

DataLoader

(标识dataloader耗时)

PyTorch、MindSpore

  • PyTorch:torch_npu
  • MindSpore:MindSpore框架。

使用约束

前提条件

准备软件包

表2 准备软件包

软件包

是否必选

说明

获取方法

使用场景

mstx_torch_plugin

Ascend PyTorch Profiler中的采集并解析msproftx数据功能已经内置了通信算子的打点。为了方便用户在不修改业务代码的基础上获取更多关键阶段的耗时数据,mstx_torch_plugin在Ascend PyTorch Profiler内置了dataloaderforwardstepsave_checkpoint这四个关键阶段函数的打点。

说明:
  • 如需使用FP打点数据,需安装mstx_torch_plugin。其他场景下无需安装。
  • 需使用1.0及以上版本的mstx_torch_plugin。

获取链接

PyTorch

配置性能劣化故障检测

  1. (可选)在容器内安装mstx_torch_plugin。

    1. 下载mstx_torch_plugin的whl包。whl包链接:mstx_torch_plugin
    2. 安装软件包。
      pip install mstx_torch_plugin-1.0-py3-none-any.whl
    3. 在AI任务执行脚本中import导入该whl包。

      需保证import的顺序在import torch和import torch_npu后面,示例如下。

      import torch 
      import torch_npu  
      import mstx_torch_plugin

  2. (可选)在PyTorch场景下,非原生优化器或不使用mstx_torch_plugin的情况下,为获取训练的Step耗时数据需修改训练脚本中的训练迭代循环,需增加Step打点代码。

    以下示例为PyTorch-MindSpeed场景,需修改./mindspeed_llm/training/training.py文件增加如下加粗字段。MindSpore场景请根据实际情况修改。
    def train(forward_step_func, model, optimizer, opt_param_scheduler,
              train_data_iterator, valid_data_iterator,
              process_non_loss_data_func, config):
                # Cache into one-logger for callback
        ……
        ……
        if is_profile_enabled():
            prof = get_profiler()
            prof.start()
        step_id = iteration
        while iteration < args.train_iters:
            stream = torch.npu.current_stream()      # 获取当前环境的执行流,用于获取NPU侧时间
            range_id = torch.npu.mstx.range_start(f"step {step_id}", stream) # 标识当前训练step的开始
            ……
            ……
            if args.manual_gc:
                if args.manual_gc_interval != 0 and iteration % args.manual_gc_interval == 0:
                    gc.collect()
    
            if is_profile_enabled():
                prof.step()
            step_id +=1  # 训练step加一,用于标识下一step
            torch.npu.mstx.range_end(range_id) # 标识当前训练step的结束

  3. 以CANN软件包的运行用户登录环境,执行source ${install_path}/set_env.sh命令设置环境变量。其中${install_path}为CANN软件的安装目录。示例如下。

    source /usr/local/Ascend/ascend-toolkit/set_env.sh

  4. 训练启动前,在训练脚本中导入LD_PRELOAD环境变量。该环境变量允许系统提前加载指定的so文件。示例如下。

    export LD_PRELOAD=/usr/local/Ascend/ascend-toolkit/latest/lib64/libmspti.so:/usr/local/python3.8.17/lib/python3.8/site-packages/taskd/python/cython_api/libs/libtaskd.so
    • libmspti.so:该so由MindStudio提供,集成在CANN包内。当使用默认安装路径时,路径为:/usr/local/Ascend/ascend-toolkit/latest/lib64/libmspti.so。
    • libtaskd.so:该so由TaskD组件提供,安装该whl包后,路径为: TaskD所在路径/taskd/python/cython_api/libs/libtaskd.so。

      TaskD所在路径可通过以下命令进行查询。回显中的Location字段即为TaskD所在路径。

      pip show taskd

  5. 在分布式环境初始化完成,能够获取到全局rank之后,修改训练脚本,在训练进程内部拉起taskd worker。

    • PyTorch-MindSpeed场景

      修改./MindSpeed-LLM/mindspeed_llm/training/training.py文件,在代码中增加如下加粗字段。

      def pretrain(train_valid_test_dataset_provider,
                   model_provider,
                   model_type,
                   forward_step_func,
                   process_non_loss_data_func=None,
                   extra_args_provider=None,
                   args_defaults={}):
          print_rank_0('time to initialize megatron (seconds): {:.3f}'.format(
              time.time() - _TRAIN_START_TIME))
          print_datetime('after megatron is initialized')
          import torch.distributed as dist
          if dist.is_initialized():
             rank = dist.get_rank()
             from taskd.api.taskd_worker_api import init_taskd_worker
             from taskd.api.taskd_worker_api import start_taskd_worker
             init_taskd_worker(rank,5000)
             start_taskd_worker()
          app_metrics['app_model_init_finish_time'] = one_logger_utils.get_timestamp_in_ms()
          one_logger_utils.on_pretrain_start()
    • MindSpore-Mindformer场景

      修改./mindformers/trainer/base_trainer.py文件,在代码中增加如下加粗字段。

          def training_process(
                  self,
                  config: Optional[Union[dict, MindFormerConfig, ConfigArguments, TrainingArguments]] = None,
                  network: Optional[Union[Cell, PreTrainedModel]] = None,
                  dataset: Optional[Union[BaseDataset, GeneratorDataset]] = None,
                  optimizer: Optional[Optimizer] = None,
                  callbacks: Optional[Union[Callback, List[Callback]]] = None,
                  compute_metrics: Optional[Union[dict, set]] = None,
                  **kwargs):
              ……
              ……
      
              logger.info(".........Starting Training Model..........")
              if get_real_rank() % 8 == 0:
                  pprint(config)
              logger.info(".........Model Compiling, Please Wait a Moment...........")
              try:
                  rank = get_rank()
                  from taskd.api.taskd_worker_api import init_taskd_worker
                  from taskd.api.taskd_worker_api import start_taskd_worker
                  init_taskd_worker(rank,5000)
                  start_taskd_worker()
              except Exception as e:
                  print("failed to call mindcluster taskd")
              model.train(config.runner_config.epochs, dataset,
                          callbacks=callbacks,
                          dataset_sink_mode=config.runner_config.sink_mode,
                          sink_size=config.runner_config.sink_size,
                          initial_epoch=config.runner_config.initial_epoch)

  6. 修改任务YAML。

    1. 挂载轻量profiling配置文件:需将宿主机上任务对应的data-trace ConfigMap落盘到/user/cluster-info/datatrace-config/命名空间.data-trace-任务名称/文件夹下。将名为profilingSwitch的文件挂载到容器指定路径:/user/cluster-info/datatrace-config/。
    2. 挂载轻量profiling落盘文件:轻量profiling数据写在容器内的/user/cluster-info/profiling路径下。如需在宿主机获取,请修改任务YAML,将该路径挂出。
      • 容器内YAML挂载示例如下。
        volumeMounts:
        - name: profilingdata
          mountPath: /user/cluster-info/
        - name: profileswitch
          mountPath: /user/cluster-info/datatrace-config
      • 宿主机内YAML挂载示例如下。
        volumes:
        - name: profileswitch
          hostPath:
            path: /user/cluster-info/datatrace-config/default.data-trace-default-test-pytorch-fault-mixtral
        - name: profilingdata
          hostPath:
             path: /home/profilingdatapath

  7. 开启轻量profiling获取落盘数据。修改任务对应的data-trace ConfigMap或ClusterD提供的gRPC接口,接口信息见ModifyTrainingDataTraceSwitch,动态开启或关闭轻量profiling能力。

    以default命名空间下的名为default-test-pytorch-fault-mixtral的任务为例,以编辑ConfigMap的方式开启轻量profiling获取落盘数据,示例如下。
    1. 在master节点执行以下命令查询该任务对应的配置ConfigMap。
      kubectl get cm
      • 如果data-trace-default-test-pytorch-fault-mixtral cm已经存在,执行步骤3编辑该文件。

        回显示例如下。

        NAME                                              DATA   AGE
        data-trace-default-test-pytorch-fault-mixtral     1      18h
      • 如果data-trace-default-test-pytorch-fault-mixtral cm不存在,执行步骤2创建该文件。
    2. 执行以下命令,创建配置轻量profiling获取落盘数据所需ConfigMap文件。
      1. 将以下内容写入datacm.yaml。
        apiVersion: v1
        kind: ConfigMap
        metadata:
          name: data-trace-default-test-pytorch-fault-mixtral  # cm的名字需以data-trace为前缀+任务名     
          labels:
            reset: "true"
        data:
          profilingSwitch: '{"CommunicationOperator":"off","Step":"on","SaveCheckpoint":"on","FP":"on","DataLoader":"on"}'
      2. 在master节点执行以下命令,创建ConfigMap。
        kubectl apply -f datacm.yaml

        回显如下所示,表示ConfigMap创建成功。

        [root@master~]# kubectl apply -f datacm.yaml 
        configmap/data-trace-default-test-pytorch-fault-mixtral created
    3. 执行以下命令编辑ConfigMap文件。
      kubectl edit cm data-trace-default-test-pytorch-fault-mixtral
    4. 如需开启通信算子,请将CommunicationOperator字段的取值改为“on”。
      apiVersion: v1
      data:
        profilingSwitch: '{"CommunicationOperator":"on","Step":"on","SaveCheckpoint":"on","FP":"on","DataLoader":"on"}'

      开启通信算子后可能造成训练性能下降,不建议常态开启通信算子。

    5. 按“Esc”键,输入:wq!保存并退出。

获取性能劣化故障检测数据