MindCluster集群调度组件结合MindStudio提供的profiling能力,对集群中的性能劣化故障(慢节点)提供诊断功能。该功能提供动态打点和打点数据持久化功能、可动态启停训练任务打点功能,无需重启任务进行诊断,对训练无损耗。
当前支持的打点数据如表1所示。
打点数据的类型 |
支持的AI框架 |
提供支持的组件 |
---|---|---|
FP (标识前向传播数据) |
PyTorch 说明:
仅支持单算子场景。 |
mstx_torch_plugin |
Step (标识step时延) |
PyTorch、MindSpore |
|
Communication (标识通信算子) |
PyTorch、MindSpore |
|
SaveCheckpoint (标识saveCkpt耗时) |
PyTorch、MindSpore |
|
DataLoader (标识dataloader耗时) |
PyTorch、MindSpore |
|
软件包 |
是否必选 |
说明 |
获取方法 |
使用场景 |
---|---|---|---|---|
mstx_torch_plugin |
否 |
Ascend PyTorch Profiler中的采集并解析msproftx数据功能已经内置了通信算子的打点。为了方便用户在不修改业务代码的基础上获取更多关键阶段的耗时数据,mstx_torch_plugin在Ascend PyTorch Profiler内置了dataloader、forward、step、save_checkpoint这四个关键阶段函数的打点。 说明:
|
PyTorch |
pip install mstx_torch_plugin-1.0-py3-none-any.whl
需保证import的顺序在import torch和import torch_npu后面,示例如下。
import torch import torch_npu import mstx_torch_plugin
def train(forward_step_func, model, optimizer, opt_param_scheduler, train_data_iterator, valid_data_iterator, process_non_loss_data_func, config): # Cache into one-logger for callback …… …… if is_profile_enabled(): prof = get_profiler() prof.start() step_id = iteration while iteration < args.train_iters: stream = torch.npu.current_stream() # 获取当前环境的执行流,用于获取NPU侧时间 range_id = torch.npu.mstx.range_start(f"step {step_id}", stream) # 标识当前训练step的开始 …… …… if args.manual_gc: if args.manual_gc_interval != 0 and iteration % args.manual_gc_interval == 0: gc.collect() if is_profile_enabled(): prof.step() step_id +=1 # 训练step加一,用于标识下一step torch.npu.mstx.range_end(range_id) # 标识当前训练step的结束
source /usr/local/Ascend/ascend-toolkit/set_env.sh
export LD_PRELOAD=/usr/local/Ascend/ascend-toolkit/latest/lib64/libmspti.so:/usr/local/python3.8.17/lib/python3.8/site-packages/taskd/python/cython_api/libs/libtaskd.so
TaskD所在路径可通过以下命令进行查询。回显中的Location字段即为TaskD所在路径。
pip show taskd
修改./MindSpeed-LLM/mindspeed_llm/training/training.py文件,在代码中增加如下加粗字段。
def pretrain(train_valid_test_dataset_provider, model_provider, model_type, forward_step_func, process_non_loss_data_func=None, extra_args_provider=None, args_defaults={}): print_rank_0('time to initialize megatron (seconds): {:.3f}'.format( time.time() - _TRAIN_START_TIME)) print_datetime('after megatron is initialized') import torch.distributed as dist if dist.is_initialized(): rank = dist.get_rank() from taskd.api.taskd_worker_api import init_taskd_worker from taskd.api.taskd_worker_api import start_taskd_worker init_taskd_worker(rank,5000) start_taskd_worker() app_metrics['app_model_init_finish_time'] = one_logger_utils.get_timestamp_in_ms() one_logger_utils.on_pretrain_start()
修改./mindformers/trainer/base_trainer.py文件,在代码中增加如下加粗字段。
def training_process( self, config: Optional[Union[dict, MindFormerConfig, ConfigArguments, TrainingArguments]] = None, network: Optional[Union[Cell, PreTrainedModel]] = None, dataset: Optional[Union[BaseDataset, GeneratorDataset]] = None, optimizer: Optional[Optimizer] = None, callbacks: Optional[Union[Callback, List[Callback]]] = None, compute_metrics: Optional[Union[dict, set]] = None, **kwargs): …… …… logger.info(".........Starting Training Model..........") if get_real_rank() % 8 == 0: pprint(config) logger.info(".........Model Compiling, Please Wait a Moment...........") try: rank = get_rank() from taskd.api.taskd_worker_api import init_taskd_worker from taskd.api.taskd_worker_api import start_taskd_worker init_taskd_worker(rank,5000) start_taskd_worker() except Exception as e: print("failed to call mindcluster taskd") model.train(config.runner_config.epochs, dataset, callbacks=callbacks, dataset_sink_mode=config.runner_config.sink_mode, sink_size=config.runner_config.sink_size, initial_epoch=config.runner_config.initial_epoch)
volumeMounts: - name: profilingdata mountPath: /user/cluster-info/ - name: profileswitch mountPath: /user/cluster-info/datatrace-config
volumes: - name: profileswitch hostPath: path: /user/cluster-info/datatrace-config/default.data-trace-default-test-pytorch-fault-mixtral - name: profilingdata hostPath: path: /home/profilingdatapath
kubectl get cm
apiVersion: v1 kind: ConfigMap metadata: name: data-trace-default-test-pytorch-fault-mixtral # cm的名字需以data-trace为前缀+任务名 labels: reset: "true" data: profilingSwitch: '{"CommunicationOperator":"off","Step":"on","SaveCheckpoint":"on","FP":"on","DataLoader":"on"}'
kubectl apply -f datacm.yaml
回显如下所示,表示ConfigMap创建成功。
[root@master~]# kubectl apply -f datacm.yaml configmap/data-trace-default-test-pytorch-fault-mixtral created
kubectl edit cm data-trace-default-test-pytorch-fault-mixtral
apiVersion: v1 data: profilingSwitch: '{"CommunicationOperator":"on","Step":"on","SaveCheckpoint":"on","FP":"on","DataLoader":"on"}'
开启通信算子后可能造成训练性能下降,不建议常态开启通信算子。