PyTorch Scenario (Based on MindSpeed-LLM)

This section describes how to configure suspension and switchback of link failover communication. For details about its features, restrictions, supported products, and working principles, see Suspension and Switchback of Link Failover Communication.

Prerequisite

Procedure

  1. After the distributed environment is initialized and the global rank can be obtained, modify the training script. Start TaskD Manager in the training script and start TaskD Worker in the training process.
    1. Start TaskD Manager.
      1. Create a manager.py file as follows and save it to the directory where the training script is called.
        from taskd.api import init_taskd_manager, start_taskd_manager
        import os
         
        job_id=os.getenv("MINDX_TASK_ID")
        node_nums=XX         # Total number of nodes (set by yourself)
        proc_per_node=XX     # Number of training processes on each node (set by yourself)
         
        init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node})
        start_taskd_manager()

        For details about the parameters in the manager.py file, see def init_taskd_manager(config:dict) -> bool:.

      2. Add the following code to the training script to start TaskD Manager.
        sed -i '/import os/i import taskd.python.adaptor.patch' $(pip3 show torch | grep Location | awk -F ' ' '{print $2}')/torch/distributed/run.py
        export TASKD_PROCESS_ENABLE="on"
        if [[ "${RANK}" == 0 ]]; then
            export MASTER_ADDR=${POD_IP}
            python manager.py &            # Determined by the current path.
        fi
            
        torchrun ...
    2. Start TaskD Worker.
      Modify the QWEN3_for_PyTorch_2.7_code/mindspeed_llm/training/training.py file and add the following information in bold to the code.
      def pretrain(train_valid_test_dataset_provider,
                   model_provider,
                   model_type,
                   forward_step_func,
                   process_non_loss_data_func=None,
                   extra_args_provider=None,
                   args_defaults={}):
          print_rank_0('time to initialize megatron (seconds): {:.3f}'.format(
              time.time() - _TRAIN_START_TIME))
          print_datetime('after megatron is initialized')
          import torch.distributed as dist
          if dist.is_initialized():
             rank = dist.get_rank()
             from taskd.api.taskd_worker_api import init_taskd_worker
             from taskd.api.taskd_worker_api import start_taskd_worker
             init_taskd_worker(rank,5000,"pt")
             start_taskd_worker()
          app_metrics['app_model_init_finish_time'] = one_logger_utils.get_timestamp_in_ms()
          one_logger_utils.on_pretrain_start()

    If the error message "the libtaskd.so has not been loaded" is displayed during training, import the environment variable LD_PRELOAD to the training script. This environment variable allows the system to load the specified .so file in advance. Example:

    export LD_PRELOAD=/usr/local/Ascend/cann/lib64/libmspti.so:/usr/local/lib/python3.10/dist-packages/taskd/python/cython_api/libs/libtaskd.so
    • libmspti.so: This .so file is provided by MindStudio and integrated in the CANN package. The default installation path is /usr/local/Ascend/cann/lib64/libmspti.so.
    • libtaskd.so: This .so file is provided by TaskD. After the whl package is installed, the path is TaskD installation path/taskd/python/cython_api/libs/libtaskd.so.

      You can run the following command to query the path where TaskD is located. The Location field in the command output is the target path.

      pip show taskd
  2. Modify the training framework code.
    1. Go to the mindcluster-deploy repository, access a branch based on mindcluster-deploy Version Description, obtain the train_start.sh file in the samples/train/resumable-training/fault-tolerance/without-ranktable/pytorch/Qwen3 directory, and create the following directory structure on the management node.
      root@ubuntu:/data/atlas_dls/public/code/QWEN3_for_PyTorch_2.7_code/scripts#
      scripts/
      └── train_start.sh
    2. Configure the train_start.sh script and add the following information in bold to the script.
      # Enable HCCL operator re-execution. Re-execution occurs when an SDMA or RDMA CQE error is reported during the execution of a communication operator. In this case, HCCL attempts to re-run the operator.
      export HCCL_OP_RETRY_ENABLE="L0:0, L1:1, L2:1"  
  3. Modify the job YAML file.
    Add the following fields in bold in a job YAML file to enable process-level online recovery and add port 9601 for TaskD communication to all pods.
    ...  
       labels:  
         ...  
         fault-scheduling: "force"
     ... 
    ...  
       annotations:  
         ...  
         recover-strategy: "retry"    # Recovery policy. retry indicates that process-level online recovery is enabled.
     ... 
    ...
    spec:
      replicaSpecs:
        Master:
          template:
            spec:
              containers:
              - name: ascend # do not modify
                ...
                args:
                  - | 
                    ... 
                    bash scripts/train_start.sh /job/code /job/output pretrain_gpt.py \
                      ...
                 ports:                           
                    - containerPort: 9601          
                      name: taskd-port
    ...
        Worker:
          template:
            spec:
              containers:
              - name: ascend # do not modify
                ...
                args:
                  - |
                    ...
                    bash scripts/train_start.sh /job/code /job/output pretrain_gpt.py \
                      ...
                 ports:                           
                    - containerPort: 9601          
                      name: taskd-port
    ...