Using TaskD 7.1.RC1 or Later
MindCluster cluster scheduling components provide the diagnosis function for performance degradation (slow nodes) in a cluster based on the profiling capability provided by MindStudio. This function provides the capability of dynamic dotting and data persistence, allowing dotting to be enabled or disabled in real time without requiring job restart for diagnosis, ensuring uninterrupted training.
Table 1 describes the supported dotting data.
Dotting Data Type |
Supported Framework |
Required Component |
|---|---|---|
FP (forward propagation data) |
PyTorch NOTE:
Only the single-operator scenario is supported. |
mstx_torch_plugin |
Step (step latency) |
PyTorch, MindSpore |
|
Communication (communication operator) |
PyTorch, MindSpore |
|
SaveCheckpoint (time consumed by SaveCheckpoint) |
PyTorch, MindSpore |
|
DataLoader (time consumed by DataLoader) |
PyTorch, MindSpore |
|
Restrictions
- Currently, Step, SaveCheckpoint, FP, and DataLoader dotting can be enabled synchronously. To disable the dotting of the preceding four types, you need to also disable Communication.
- Communication operator data dotting can be enabled or disabled separately.
- Dynamic lightweight dotting and full dotting of MindStudio cannot be enabled at the same time. Enabling full dotting can cause data collection failures due to performance deterioration.
Prerequisite
- (Optional) ClusterD, Ascend Device Plugin, and Volcano have been installed. (Versions of these MindCluster components must match the TaskD version.)
- torch_npu (optional; required in the PyTorch scenario; version ≥ 7.1.RC1), MindSpore (optional; required in the MindSpore scenario; version ≥ 2.7.0 ), CANN (mandatory; version ≥ 8.2.RC1), and TaskD (mandatory) have been installed in the container.
Obtaining the Software Package
Configuring Performance Degradation Detection
This solution applies only to TaskD 7.1.RC1 or later. If you use TaskD with a version earlier than 7.1.RC1, see Using TaskD of Other Versions.
- Select either of the following methods as required.
- Install mstx_torch_plugin in the container.
- If a non-native optimizer is used in the PyTorch scenario or mstx_torch_plugin is not used, you need to modify the training iteration in the training script by adding the step dotting code to obtain the time consumed by the training step.The following uses the PyTorch-MindSpeed scenario as an example. You need to add the following fields in bold to the ./mindspeed_llm/training/training.py file. In the MindSpore scenario, modify the file as required.
def train(forward_step_func, model, optimizer, opt_param_scheduler, train_data_iterator, valid_data_iterator, process_non_loss_data_func, config): # Cache into one-logger for callback … … if is_profile_enabled(): prof = get_profiler() prof.start() step_id = iteration while iteration < args.train_iters: stream = torch.npu.current_stream() # Obtain the environment's execution stream to capture the time consumed by NPUs. range_id = torch.npu.mstx.range_start(f"step {step_id}", stream) # Start of a training step. … … if args.manual_gc: if args.manual_gc_interval != 0 and iteration % args.manual_gc_interval == 0: gc.collect() if is_profile_enabled(): prof.step() step_id +=1 # Increase the training step by 1 to identify the next step. torch.npu.mstx.range_end(range_id) # End of a training step.
- In the container, log in to the environment as the running user of the CANN package and run the source ${install_path}/set_env.sh command to set environment variables. ${install_path} is the CANN installation directory. Example:
source /usr/local/Ascend/cann/set_env.sh
- Before starting the training, import the environment variable LD_PRELOAD to the training script. This environment variable allows the system to load the specified .so file in advance. Example:
export LD_PRELOAD=/usr/local/Ascend/cann/lib64/libmspti.so:/usr/local/python3.10.5/lib/python3.10/site-packages/taskd/python/cython_api/libs/libtaskd.so
- libmspti.so: This .so file is provided by MindStudio and integrated in the CANN package. The default installation path is /usr/local/Ascend/cann/lib64/libmspti.so.
- libtaskd.so: This .so file is provided by TaskD. After the whl package is installed, the path is TaskD installation path/taskd/python/cython_api/libs/libtaskd.so.
You can run the following command to query the path where TaskD is located. The Location field in the command output is the target path.
pip show taskd
- After the distributed environment is initialized and the global rank can be obtained, modify the training script to start TaskD Manager, start TaskD Proxy in the management process, and start TaskD Worker in the training process.
- (Optional) Start TaskD Manager and TaskD Proxy. The following steps are required if you use gRPC to enable lightweight profiling to obtain the flushed data. If you use ConfigMap, skip them.
- PyTorch
- Create a manager.py file and save it to the directory where the training script is called. The content of the manager.py file is as follows:
from taskd.api import init_taskd_manager, start_taskd_manager import os job_id=os.getenv("MINDX_TASK_ID") node_nums=XX # Total number of nodes (set by yourself) proc_per_node=XX # Number of training processes on each node (set by yourself) init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node}) start_taskd_manager()
For details about the parameters in the manager.py file, see def init_taskd_manager(config:dict) -> bool:.
- Add the following code to the training script to start TaskD Manager and TaskD Proxy.
sed -i '/import os/i import taskd.python.adaptor.patch' $(pip3 show torch | grep Location | awk -F ' ' '{print $2}')/torch/distributed/run.py if [[ "${RANK}" -eq 0 ]]; then export MASTER_ADDR=${POD_IP} python manager.py & fi torchrun ...
- Create a manager.py file and save it to the directory where the training script is called. The content of the manager.py file is as follows:
- MindSpore
- Create a manager.py file and save it to the directory where the training script is called. The content of the manager.py file is as follows:
from taskd.api import init_taskd_manager, start_taskd_manager import os job_id=os.getenv("MINDX_TASK_ID") node_nums=XX # Total number of nodes (set by yourself) proc_per_node=XX # Number of training processes on each node (set by yourself) init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node}) start_taskd_manager()
For details about the parameters in the manager.py file, see def init_taskd_manager(config:dict) -> bool:.
- Add the following code to the training script to start TaskD Manager.
if [[ "${MS_SCHED_HOST}" -eq "${POD_IP}" ]]; then python manager.py & fi msrun ... - Modify the mindspore/python/mindspore/parallel/cluster/process_entity/_api.py file to start TaskD Proxy. Example:
... if ("TTP:1" in tft_env) or ("UCE:1" in tft_env) or ("ARF:1" in tft_env): try: from taskd.python.framework.agent.ms_mgr.msrun_plugin import MSRunPlugin from taskd .api.taskd_proxy_api import init_taskd_proxy from taskd.python.framework.common.type import CONFIG_UPSTREAMIP_KEY, LOCAL_HOST import threading proxy = threading.Thread(target=init_taskd_proxy, args=({CONFIG_UPSTREAMIP_KEY : os.getenv("MS_SCHED_HOST", LOCAL_HOST)},)) proxy.daemon = True proxy.start() self.msmgr = MSRunPlugin() self.msmgr.register_callbacks("KILL_WORKER", self.kill_workers) self.msmgr.register_callbacks("START_ALL_WORKER", self.start_all_workers) self.msmgr.register_callbacks("START_WORKER_LIST", self.start_worker_list) self.msmgr.register_callbacks("MONITOR", self.monitor_rank_status) self.enable_mindx = True os.environ["MS_ENABLE_RECOVERY"] = str(1) ...
- Create a manager.py file and save it to the directory where the training script is called. The content of the manager.py file is as follows:
- PyTorch
- Start TaskD Worker.
- PyTorch-MindSpeed: Add the following information in bold to the QWEN3_for_PyTorch_2.7_code/mindspeed_llm/training/training.py file.
def pretrain(train_valid_test_dataset_provider, model_provider, model_type, forward_step_func, process_non_loss_data_func=None, extra_args_provider=None, args_defaults={}): print_rank_0('time to initialize megatron (seconds): {:.3f}'.format( time.time() - _TRAIN_START_TIME)) print_datetime('after megatron is initialized') import torch.distributed as dist if dist.is_initialized(): rank = dist.get_rank() from taskd.api.taskd_worker_api import init_taskd_worker from taskd.api.taskd_worker_api import start_taskd_worker init_taskd_worker(rank,5000) start_taskd_worker() app_metrics['app_model_init_finish_time'] = one_logger_utils.get_timestamp_in_ms() one_logger_utils.on_pretrain_start()
- MindSpore-MindFormers: Add the following information in bold to the ./mindformers/trainer/base_trainer.py.
def training_process( self, config: Optional[Union[dict, MindFormerConfig, ConfigArguments, TrainingArguments]] = None, network: Optional[Union[Cell, PreTrainedModel]] = None, dataset: Optional[Union[BaseDataset, GeneratorDataset]] = None, optimizer: Optional[Optimizer] = None, callbacks: Optional[Union[Callback, List[Callback]]] = None, compute_metrics: Optional[Union[dict, set]] = None, **kwargs): … … logger.info(".........Starting Training Model..........") if get_real_rank() % 8 == 0: pprint(config) logger.info(".........Model Compiling, Please Wait a Moment...........") try: rank = get_rank() from taskd.api.taskd_worker_api import init_taskd_worker from taskd.api.taskd_worker_api import start_taskd_worker init_taskd_worker(rank,5000) start_taskd_worker() except Exception as e: print("failed to call mindcluster taskd") model.train(config.runner_config.epochs, dataset, callbacks=callbacks, dataset_sink_mode=config.runner_config.sink_mode, sink_size=config.runner_config.sink_size, initial_epoch=config.runner_config.initial_epoch)
In the preceding code, 5000 in init_taskd_worker(rank,5000) indicates the upper limit of the value of /user/cluster-info/profiling. For details, see upper_limit_of_disk_in_mb in def init_taskd_worker(rank_id: int, upper_limit_of_disk_in_mb: int = 5000, framework: str = "pt") -> bool.
- PyTorch-MindSpeed: Add the following information in bold to the QWEN3_for_PyTorch_2.7_code/mindspeed_llm/training/training.py file.
- (Optional) Start TaskD Manager and TaskD Proxy. The following steps are required if you use gRPC to enable lightweight profiling to obtain the flushed data. If you use ConfigMap, skip them.
- Modify the job YAML file.
- Modify the container port and add port 9601 used for TaskD communication to all pods.
... spec: ... containers: ... ports: - containerPort: 9601 name: taskd-port ... - Mount files.
- Mount the lightweight profiling configuration file: You need to flush the data-trace ConfigMap on the host to the /user/cluster-info/datatrace-config/Namespace.data-trace-Job name/ folder. Mount the profilingSwitch file to the specified path /user/cluster-info/datatrace-config/ in the container.
- Mount the lightweight profiling flush file: The lightweight profiling data is written to the /user/cluster-info/profiling path in the container. To obtain the data on the host, modify the job YAML file and mount the path.
- YAML example mounting in the container
volumeMounts: - name: profilingdata mountPath: /user/cluster-info/ - name: profileswitch mountPath: /user/cluster-info/datatrace-config
- YAML example mounting on the host
volumes: - name: profileswitch hostPath: path: /user/cluster-info/datatrace-config/default.data-trace-default-test-pytorch-fault-mixtral - name: profilingdata hostPath: path: /home/profilingdatapath
- YAML example mounting in the container
- Modify the container port and add port 9601 used for TaskD communication to all pods.
- Enable lightweight profiling to obtain flushed data using either of the following methods.
- Modify the gRPC interface provided by ClusterD. If 4.a is configured, use this method. For details about the interface, see ModifyTrainingDataTraceSwitch.
If lightweight profiling is enabled to obtain flushed data through the gRPC interface provided by ClusterD, the lifecycle of the created data-trace-<Job name> ConfigMap is deleted with the job. If the job does not exist, the interface fails to be called.
- Modify the data-trace ConfigMap corresponding to the job. If 4.a is not configured, use this method. The operations are as follows:The following uses the default-test-pytorch-fault-mixtral job in the default namespace as an example to describe how to enable lightweight profiling to obtain flushed data by editing the ConfigMap.
- Run the following command on the master node to query the ConfigMap of the job.
kubectl get cm
- Create the ConfigMap file required by lightweight profiling for obtaining the flushed data.
- Add the following content to datacm.yaml.
apiVersion: v1 kind: ConfigMap metadata: name: data-trace-default-test-pytorch-fault-mixtral # The name of cm must be constructed by prefixing it with data-trace and appending the job name. labels: reset: "true" data: profilingSwitch: '{"CommunicationOperator":"off","Step":"on","SaveCheckpoint":"on","FP":"on","DataLoader":"on"}' - Run the following command on the master node to create the ConfigMap.
kubectl apply -f datacm.yaml
If the following information is displayed, the ConfigMap is created successfully.
[root@master~]# kubectl apply -f datacm.yaml configmap/data-trace-default-test-pytorch-fault-mixtral created
- Add the following content to datacm.yaml.
- Run the following command to modify the appending the ConfigMap file.
kubectl edit cm data-trace-default-test-pytorch-fault-mixtral
- To enable communication operators, change the value of CommunicationOperator to on.
apiVersion: v1 data: profilingSwitch: '{"CommunicationOperator":"on","Step":"on","SaveCheckpoint":"on","FP":"on","DataLoader":"on"}'
Enabling communication operators may deteriorate training performance. Therefore, you are advised not to enable them.
- Press Esc and enter :wq! to save the settings and exit.
- Run the following command on the master node to query the ConfigMap of the job.
- Modify the gRPC interface provided by ClusterD. If 4.a is configured, use this method. For details about the interface, see ModifyTrainingDataTraceSwitch.
Obtaining Performance Degradation Detection Data
- Flushed data is classified by rank. The lightweight profiling data is written to the /user/cluster-info/profiling directory in the container.
- For the pod with the environment variable MINDX_TASK_ID, the path of rank 0 data in the container is /user/cluster-info/profiling/$MINDX_TASK_ID/0.
- If the environment variable does not exist, the data is flushed to the folder named default_task_id_timestamp by default.
- After the size of /user/cluster-info/profiling reaches the upper limit (see 4.b), the files are aged. By default, 20% of the files with the earliest modification time are deleted each time. During the aging, only the files named with digits in the rank folder in the profiling directory are deleted. You are advised not to manually add other files to the profiling directory. If you manually add other files, TaskD does not delete them, but they occupy space.
- The lightweight profiling file is named by timestamp. Each record is separated by a newline character. New records are appended to the latest file in the rank folder. If the size of the latest file exceeds 10 MB, TaskD creates a new profiling file. If NFS or other network storage modes are used, when data synchronization is slow, a new file may be created even if the file size does not reach 10 MB.