Using TaskD 7.1.RC1 or Later

MindCluster cluster scheduling components provide the diagnosis function for performance degradation (slow nodes) in a cluster based on the profiling capability provided by MindStudio. This function provides the capability of dynamic dotting and data persistence, allowing dotting to be enabled or disabled in real time without requiring job restart for diagnosis, ensuring uninterrupted training.

Table 1 describes the supported dotting data.

Table 1 Dotting data description

Dotting Data Type

Supported Framework

Required Component

FP

(forward propagation data)

PyTorch

NOTE:

Only the single-operator scenario is supported.

mstx_torch_plugin

Step

(step latency)

PyTorch, MindSpore

  • PyTorch
    • Native optimizer: If torch_npu 7.1.RC1 is used, use mstx_torch_plugin. If the torch_npu version is later than 7.1.RC1, mstx_torch_plugin is not required, because torch_npu itself provides step dotting.
    • Custom optimizer: Manually add dotting data.
  • MindSpore
    • MindFormers: The step dotting data is provided by MindFormers.
    • MindSpeed: No step dotting data is provided.

Communication

(communication operator)

PyTorch, MindSpore

  • PyTorch: torch_npu
  • MindSpore

SaveCheckpoint

(time consumed by SaveCheckpoint)

PyTorch, MindSpore

  • PyTorch: torch_npu
  • MindSpore

DataLoader

(time consumed by DataLoader)

PyTorch, MindSpore

  • PyTorch: torch_npu
  • MindSpore

Restrictions

  • Currently, Step, SaveCheckpoint, FP, and DataLoader dotting can be enabled synchronously. To disable the dotting of the preceding four types, you need to also disable Communication.
  • Communication operator data dotting can be enabled or disabled separately.
  • Dynamic lightweight dotting and full dotting of MindStudio cannot be enabled at the same time. Enabling full dotting can cause data collection failures due to performance deterioration.

Prerequisite

  • (Optional) ClusterD, Ascend Device Plugin, and Volcano have been installed. (Versions of these MindCluster components must match the TaskD version.)
  • torch_npu (optional; required in the PyTorch scenario; version ≥ 7.1.RC1), MindSpore (optional; required in the MindSpore scenario; version ≥ 2.7.0 ), CANN (mandatory; version ≥ 8.2.RC1), and TaskD (mandatory) have been installed in the container.

Obtaining the Software Package

Table 2 Required software package

Package

Required (Yes/No)

Description

How to Obtain

Scenario

mstx_torch_plugin

No

The collection and parsing of msproftx data function in Ascend PyTorch Profiler includes built-in dotting of communication operators. To capture time consumption data of more key phases without modifying the service code, mstx_torch_plugin adds dotting of the dataloader, forward, step, and save_checkpoint functions to Ascend PyTorch Profiler.

NOTE:
  • To use enable FP dotting, you need to install mstx_torch_plugin. In other scenarios, you do not need to install it.
  • mstx_torch_plugin 1.0 or later is required.

Click here.

PyTorch

Configuring Performance Degradation Detection

This solution applies only to TaskD 7.1.RC1 or later. If you use TaskD with a version earlier than 7.1.RC1, see Using TaskD of Other Versions.

  1. Select either of the following methods as required.
    • Install mstx_torch_plugin in the container.
      1. Download the whl package of mstx_torch_plugin.
      2. Install the package.
        pip install mstx_torch_plugin-1.0-py3-none-any.whl
      3. Import the whl package in the AI task execution script.

        Ensure that it is imported after torch and torch_npu are imported.

        import torch 
        import torch_npu  
        import mstx_torch_plugin
    • If a non-native optimizer is used in the PyTorch scenario or mstx_torch_plugin is not used, you need to modify the training iteration in the training script by adding the step dotting code to obtain the time consumed by the training step.
      The following uses the PyTorch-MindSpeed scenario as an example. You need to add the following fields in bold to the ./mindspeed_llm/training/training.py file. In the MindSpore scenario, modify the file as required.
      def train(forward_step_func, model, optimizer, opt_param_scheduler,
                train_data_iterator, valid_data_iterator,
                process_non_loss_data_func, config):
                  # Cache into one-logger for callback
          …
          …
          if is_profile_enabled():
              prof = get_profiler()
              prof.start()
          step_id = iteration
          while iteration < args.train_iters:
              stream = torch.npu.current_stream()      # Obtain the environment's execution stream to capture the time consumed by NPUs.
              range_id = torch.npu.mstx.range_start(f"step {step_id}", stream) # Start of a training step.
             …
             …
              if args.manual_gc:
                  if args.manual_gc_interval != 0 and iteration % args.manual_gc_interval == 0:
                      gc.collect()
      
              if is_profile_enabled():
                  prof.step()
              step_id +=1 # Increase the training step by 1 to identify the next step.
              torch.npu.mstx.range_end(range_id) # End of a training step.
  2. In the container, log in to the environment as the running user of the CANN package and run the source ${install_path}/set_env.sh command to set environment variables. ${install_path} is the CANN installation directory. Example:
    source /usr/local/Ascend/cann/set_env.sh
  3. Before starting the training, import the environment variable LD_PRELOAD to the training script. This environment variable allows the system to load the specified .so file in advance. Example:
    export LD_PRELOAD=/usr/local/Ascend/cann/lib64/libmspti.so:/usr/local/python3.10.5/lib/python3.10/site-packages/taskd/python/cython_api/libs/libtaskd.so
    • libmspti.so: This .so file is provided by MindStudio and integrated in the CANN package. The default installation path is /usr/local/Ascend/cann/lib64/libmspti.so.
    • libtaskd.so: This .so file is provided by TaskD. After the whl package is installed, the path is TaskD installation path/taskd/python/cython_api/libs/libtaskd.so.

      You can run the following command to query the path where TaskD is located. The Location field in the command output is the target path.

      pip show taskd
  4. After the distributed environment is initialized and the global rank can be obtained, modify the training script to start TaskD Manager, start TaskD Proxy in the management process, and start TaskD Worker in the training process.
    1. (Optional) Start TaskD Manager and TaskD Proxy. The following steps are required if you use gRPC to enable lightweight profiling to obtain the flushed data. If you use ConfigMap, skip them.
      • PyTorch
        1. Create a manager.py file and save it to the directory where the training script is called. The content of the manager.py file is as follows:
          from taskd.api import init_taskd_manager, start_taskd_manager
          import os
          
          job_id=os.getenv("MINDX_TASK_ID")
          node_nums=XX         # Total number of nodes (set by yourself)
          proc_per_node=XX     # Number of training processes on each node (set by yourself)
          
          init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node})
          start_taskd_manager()

          For details about the parameters in the manager.py file, see def init_taskd_manager(config:dict) -> bool:.

        2. Add the following code to the training script to start TaskD Manager and TaskD Proxy.
          sed -i '/import os/i import taskd.python.adaptor.patch' $(pip3 show torch | grep Location | awk -F ' ' '{print $2}')/torch/distributed/run.py
          
          if [[ "${RANK}" -eq 0 ]]; then
              export MASTER_ADDR=${POD_IP}
              python manager.py &
          fi
              
          torchrun ...
      • MindSpore
        1. Create a manager.py file and save it to the directory where the training script is called. The content of the manager.py file is as follows:
          from taskd.api import init_taskd_manager, start_taskd_manager
          import os
          
          job_id=os.getenv("MINDX_TASK_ID")
          node_nums=XX         # Total number of nodes (set by yourself)
          proc_per_node=XX     # Number of training processes on each node (set by yourself)
          
          init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node})
          start_taskd_manager()

          For details about the parameters in the manager.py file, see def init_taskd_manager(config:dict) -> bool:.

        2. Add the following code to the training script to start TaskD Manager.
          if [[ "${MS_SCHED_HOST}" -eq "${POD_IP}" ]]; then
              python manager.py &
          fi
              
          msrun ...
        3. Modify the mindspore/python/mindspore/parallel/cluster/process_entity/_api.py file to start TaskD Proxy. Example:
          ...
            if ("TTP:1" in tft_env) or ("UCE:1" in tft_env) or ("ARF:1" in tft_env):
                      try:
                          from taskd.python.framework.agent.ms_mgr.msrun_plugin import MSRunPlugin
                          from taskd .api.taskd_proxy_api import init_taskd_proxy
                          from taskd.python.framework.common.type import CONFIG_UPSTREAMIP_KEY, LOCAL_HOST
                        import threading
                        proxy = threading.Thread(target=init_taskd_proxy, args=({CONFIG_UPSTREAMIP_KEY : os.getenv("MS_SCHED_HOST", LOCAL_HOST)},))
                        proxy.daemon = True
                        proxy.start()
                          self.msmgr = MSRunPlugin()
                          self.msmgr.register_callbacks("KILL_WORKER", self.kill_workers)
                          self.msmgr.register_callbacks("START_ALL_WORKER", self.start_all_workers)
                          self.msmgr.register_callbacks("START_WORKER_LIST", self.start_worker_list)
                          self.msmgr.register_callbacks("MONITOR", self.monitor_rank_status)
                          self.enable_mindx = True
                          os.environ["MS_ENABLE_RECOVERY"] = str(1)
          ...
    2. Start TaskD Worker.
      • PyTorch-MindSpeed: Add the following information in bold to the QWEN3_for_PyTorch_2.7_code/mindspeed_llm/training/training.py file.
        def pretrain(train_valid_test_dataset_provider,
                     model_provider,
                     model_type,
                     forward_step_func,
                     process_non_loss_data_func=None,
                     extra_args_provider=None,
                     args_defaults={}):
            print_rank_0('time to initialize megatron (seconds): {:.3f}'.format(
                time.time() - _TRAIN_START_TIME))
            print_datetime('after megatron is initialized')
            import torch.distributed as dist
            if dist.is_initialized():
               rank = dist.get_rank()
               from taskd.api.taskd_worker_api import init_taskd_worker
               from taskd.api.taskd_worker_api import start_taskd_worker
               init_taskd_worker(rank,5000)
               start_taskd_worker()
            app_metrics['app_model_init_finish_time'] = one_logger_utils.get_timestamp_in_ms()
            one_logger_utils.on_pretrain_start()
      • MindSpore-MindFormers: Add the following information in bold to the ./mindformers/trainer/base_trainer.py.
            def training_process(
                    self,
                    config: Optional[Union[dict, MindFormerConfig, ConfigArguments, TrainingArguments]] = None,
                    network: Optional[Union[Cell, PreTrainedModel]] = None,
                    dataset: Optional[Union[BaseDataset, GeneratorDataset]] = None,
                    optimizer: Optional[Optimizer] = None,
                    callbacks: Optional[Union[Callback, List[Callback]]] = None,
                    compute_metrics: Optional[Union[dict, set]] = None,
                    **kwargs):
               …
               …
        
                logger.info(".........Starting Training Model..........")
                if get_real_rank() % 8 == 0:
                    pprint(config)
                logger.info(".........Model Compiling, Please Wait a Moment...........")
                try:
                    rank = get_rank()
                    from taskd.api.taskd_worker_api import init_taskd_worker
                    from taskd.api.taskd_worker_api import start_taskd_worker
                    init_taskd_worker(rank,5000)
                    start_taskd_worker()
                except Exception as e:
                    print("failed to call mindcluster taskd")
                model.train(config.runner_config.epochs, dataset,
                            callbacks=callbacks,
                            dataset_sink_mode=config.runner_config.sink_mode,
                            sink_size=config.runner_config.sink_size,
                            initial_epoch=config.runner_config.initial_epoch)

      In the preceding code, 5000 in init_taskd_worker(rank,5000) indicates the upper limit of the value of /user/cluster-info/profiling. For details, see upper_limit_of_disk_in_mb in def init_taskd_worker(rank_id: int, upper_limit_of_disk_in_mb: int = 5000, framework: str = "pt") -> bool.

  5. Modify the job YAML file.
    1. Modify the container port and add port 9601 used for TaskD communication to all pods.
      ...
              spec: 
      ...
                 containers: 
      ... 
                   ports:                           
                      - containerPort: 9601          
                        name: taskd-port
      ...
    2. Mount files.
      1. Mount the lightweight profiling configuration file: You need to flush the data-trace ConfigMap on the host to the /user/cluster-info/datatrace-config/Namespace.data-trace-Job name/ folder. Mount the profilingSwitch file to the specified path /user/cluster-info/datatrace-config/ in the container.
      2. Mount the lightweight profiling flush file: The lightweight profiling data is written to the /user/cluster-info/profiling path in the container. To obtain the data on the host, modify the job YAML file and mount the path.
        • YAML example mounting in the container
          volumeMounts:
          - name: profilingdata
            mountPath: /user/cluster-info/
          - name: profileswitch
            mountPath: /user/cluster-info/datatrace-config
        • YAML example mounting on the host
          volumes:
          - name: profileswitch
            hostPath:
              path: /user/cluster-info/datatrace-config/default.data-trace-default-test-pytorch-fault-mixtral
          - name: profilingdata
            hostPath:
               path: /home/profilingdatapath
  6. Enable lightweight profiling to obtain flushed data using either of the following methods.
    • Modify the gRPC interface provided by ClusterD. If 4.a is configured, use this method. For details about the interface, see ModifyTrainingDataTraceSwitch.

      If lightweight profiling is enabled to obtain flushed data through the gRPC interface provided by ClusterD, the lifecycle of the created data-trace-<Job name> ConfigMap is deleted with the job. If the job does not exist, the interface fails to be called.

    • Modify the data-trace ConfigMap corresponding to the job. If 4.a is not configured, use this method. The operations are as follows:
      The following uses the default-test-pytorch-fault-mixtral job in the default namespace as an example to describe how to enable lightweight profiling to obtain flushed data by editing the ConfigMap.
      1. Run the following command on the master node to query the ConfigMap of the job.
        kubectl get cm
        • If the data-trace-default-test-pytorch-fault-mixtral cm file already exists, go to 3 to edit the file.

          Command output:

          NAME                                              DATA   AGE
          data-trace-default-test-pytorch-fault-mixtral     1      18h
        • If the data-trace-default-test-pytorch-fault-mixtral cm file does not exist, go to 2 to create the file.
      2. Create the ConfigMap file required by lightweight profiling for obtaining the flushed data.
        1. Add the following content to datacm.yaml.
          apiVersion: v1
          kind: ConfigMap
          metadata:
            name: data-trace-default-test-pytorch-fault-mixtral  # The name of cm must be constructed by prefixing it with data-trace and appending the job name.
            labels:
              reset: "true"
          data:
            profilingSwitch: '{"CommunicationOperator":"off","Step":"on","SaveCheckpoint":"on","FP":"on","DataLoader":"on"}'
        2. Run the following command on the master node to create the ConfigMap.
          kubectl apply -f datacm.yaml

          If the following information is displayed, the ConfigMap is created successfully.

          [root@master~]# kubectl apply -f datacm.yaml 
          configmap/data-trace-default-test-pytorch-fault-mixtral created
      3. Run the following command to modify the appending the ConfigMap file.
        kubectl edit cm data-trace-default-test-pytorch-fault-mixtral
      4. To enable communication operators, change the value of CommunicationOperator to on.
        apiVersion: v1
        data:
          profilingSwitch: '{"CommunicationOperator":"on","Step":"on","SaveCheckpoint":"on","FP":"on","DataLoader":"on"}'

        Enabling communication operators may deteriorate training performance. Therefore, you are advised not to enable them.

      5. Press Esc and enter :wq! to save the settings and exit.

Obtaining Performance Degradation Detection Data

  • Flushed data is classified by rank. The lightweight profiling data is written to the /user/cluster-info/profiling directory in the container.
  • For the pod with the environment variable MINDX_TASK_ID, the path of rank 0 data in the container is /user/cluster-info/profiling/$MINDX_TASK_ID/0.
    • If the environment variable does not exist, the data is flushed to the folder named default_task_id_timestamp by default.
    • After the size of /user/cluster-info/profiling reaches the upper limit (see 4.b), the files are aged. By default, 20% of the files with the earliest modification time are deleted each time. During the aging, only the files named with digits in the rank folder in the profiling directory are deleted. You are advised not to manually add other files to the profiling directory. If you manually add other files, TaskD does not delete them, but they occupy space.
    • The lightweight profiling file is named by timestamp. Each record is separated by a newline character. New records are appended to the latest file in the rank folder. If the size of the latest file exceeds 10 MB, TaskD creates a new profiling file. If NFS or other network storage modes are used, when data synchronization is slow, a new file may be created even if the file size does not reach 10 MB.