MindSpore Scenarios (Based on MindFormers)

This section describes how to configure online stress testing. For details about its features, restrictions, supported products, and working principles, see Online Stress Testing.

Prerequisite

Procedure

  1. After the distributed environment is initialized and the global rank can be obtained, modify the training script to start TaskD Manager, start TaskD Proxy in the management process, and start TaskD Worker in the training process.
    1. Start TaskD Manager.
      1. Create a manager.py file and save it to the directory where the training script is called. The content of the manager.py file is as follows:
        from taskd.api import init_taskd_manager, start_taskd_manager
        import os
        
        job_id=os.getenv("MINDX_TASK_ID")
        node_nums=XX         # Total number of nodes (set by yourself)
        proc_per_node=XX     # Number of training processes on each node (set by yourself)
        
        init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node})
        start_taskd_manager()

        For details about the parameters in the manager.py file, see def init_taskd_manager(config:dict) -> bool:.

      2. Add the following code to the training script to start TaskD Manager.
        export TASKD_PROCESS_ENABLE="on"
        if [[ "${MS_SCHED_HOST}" == "${POD_IP}" ]]; then
            python manager.py &   # Determined by the current path.
        fi
            
        msrun ...
    2. Start TaskD Worker. Add the following information in bold to the ./mindformers/trainer/base_trainer.py file.
          def training_process(
                  self,
                  config: Optional[Union[dict, MindFormerConfig, ConfigArguments, TrainingArguments]] = None,
                  network: Optional[Union[Cell, PreTrainedModel]] = None,
                  dataset: Optional[Union[BaseDataset, GeneratorDataset]] = None,
                  optimizer: Optional[Optimizer] = None,
                  callbacks: Optional[Union[Callback, List[Callback]]] = None,
                  compute_metrics: Optional[Union[dict, set]] = None,
                  **kwargs):
             …
             …
      
              logger.info(".........Starting Training Model..........")
              if get_real_rank() % 8 == 0:
                  pprint(config)
              logger.info(".........Model Compiling, Please Wait a Moment...........")
              try:
                  rank = get_rank()
                  from taskd.api.taskd_worker_api import init_taskd_worker
                  from taskd.api.taskd_worker_api import start_taskd_worker
                  init_taskd_worker(rank,5000,"ms")
                  start_taskd_worker()
              except Exception as e:
                  print("failed to call mindcluster taskd")
              model.train(config.runner_config.epochs, dataset,
                          callbacks=callbacks,
                          dataset_sink_mode=config.runner_config.sink_mode,
                          sink_size=config.runner_config.sink_size,
                          initial_epoch=config.runner_config.initial_epoch)
  2. Modify the training framework code to enable online stress testing.

    Edit the QWEN3_for_MS_code/scripts/msrun_launcher.sh file and add the following information to the file.

    export MS_ENABLE_TFT='{TTP:1,TSP:1}'          # Enable dying gasp and online stress testing.

    If the error message "the libtaskd.so has not been loaded" is displayed during training, import the environment variable LD_PRELOAD to the training script. This environment variable allows the system to load the specified .so file in advance. Example:

    export LD_PRELOAD=/usr/local/Ascend/cann/lib64/libmspti.so:/usr/local/python3.10.5/lib/python3.10/site-packages/taskd/python/cython_api/libs/libtaskd.so
    • libmspti.so: This .so file is provided by MindStudio and integrated in the CANN package. The default installation path is /usr/local/Ascend/cann/lib64/libmspti.so.
    • libtaskd.so: This .so file is provided by TaskD. After the whl package is installed, the path is TaskD installation path/taskd/python/cython_api/libs/libtaskd.so.

      You can run the following command to query the path where TaskD is located. The Location field in the command output is the target path.

      pip show taskd
  3. Modify the job YAML file.
    Add the following fields in bold in a job YAML file to enable process-level rescheduling and add port 9601 for TaskD communication to all pods.
    ...  
       labels:  
         ...  
         fault-scheduling: "force"
     ... 
    ...  
       annotations:  
         ...  
         recover-strategy: "recover"   # Recovery policy. recover indicates that process-level rescheduling is enabled.
     ... 
    ...
    spec:
      replicaSpecs:
        Master:
          template:
            spec:
              containers:
              - name: ascend # do not modify
                ...
                command:                           # training command, which can be modified
                  - /bin/bash
                  - -c
                  - |
                   cd /job/code/;bash scripts/msrun_launcher.sh "run_mindformer.py --config configs/qwen3/pretrain_qwen3_32b_4k.yaml --auto_trans_ckpt False --use_parallel True --run_mode train"
                 ports:                           
                    - containerPort: 9601          
                      name: taskd-port
    ...
        Worker:
          template:
            spec:
              containers:
              - name: ascend # do not modify
                ...
                command:                           # training command, which can be modified
                  - /bin/bash
                  - -c
                  - |
                   cd /job/code/;bash scripts/msrun_launcher.sh "run_mindformer.py --config configs/qwen3/pretrain_qwen3_32b_4k.yaml --auto_trans_ckpt False --use_parallel True --run_mode train"
                 ports:                           
                    - containerPort: 9601          
                      name: taskd-port
    ...