使用7.1.RC1及以上版本TaskD
MindCluster集群调度组件结合MindStudio提供的profiling能力,对集群中的性能劣化故障(慢节点)提供诊断功能。该功能提供动态打点和打点数据持久化功能、可动态启停训练任务打点功能,无需重启任务进行诊断,对训练无损耗。
当前支持的打点数据如表1所示。
打点数据的类型 |
支持的AI框架 |
提供支持的组件 |
---|---|---|
FP (标识前向传播数据) |
PyTorch 说明:
仅支持单算子场景。 |
mstx_torch_plugin |
Step (标识Step时延) |
PyTorch、MindSpore |
|
Communication (标识通信算子) |
PyTorch、MindSpore |
|
SaveCheckpoint (标识SaveCheckpoint耗时) |
PyTorch、MindSpore |
|
DataLoader (标识DataLoader耗时) |
PyTorch、MindSpore |
|
使用约束
- 当前Step、SaveCheckpoint、FP、DataLoader仅支持同步开启。如需关闭以上四类打点数据,需同时关闭Communication。
- Communication通信算子数据支持单独开启、关闭。
- 动态轻量打点功能与MindStudio的全量打点功能不可同时开启,开启全量打点功能会导致性能劣化故障不能正常采集数据。
前提条件
准备软件包
配置性能劣化故障检测
本方案仅针对7.1.RC1及以上版本的TaskD组件。如使用7.1.RC1以下版本的组件请参考使用其他版本TaskD章节进行操作。
- (可选)在容器内安装mstx_torch_plugin。
- (可选)在PyTorch场景下,非原生优化器或不使用mstx_torch_plugin的情况下,为获取训练的Step耗时数据需修改训练脚本中的训练迭代循环,需增加Step打点代码。以下示例为PyTorch-MindSpeed场景,需修改./mindspeed_llm/training/training.py文件增加如下加粗字段。MindSpore场景请根据实际情况修改。
def train(forward_step_func, model, optimizer, opt_param_scheduler, train_data_iterator, valid_data_iterator, process_non_loss_data_func, config): # Cache into one-logger for callback …… …… if is_profile_enabled(): prof = get_profiler() prof.start() step_id = iteration while iteration < args.train_iters: stream = torch.npu.current_stream() # 获取当前环境的执行流,用于获取NPU侧时间 range_id = torch.npu.mstx.range_start(f"step {step_id}", stream) # 标识当前训练step的开始 …… …… if args.manual_gc: if args.manual_gc_interval != 0 and iteration % args.manual_gc_interval == 0: gc.collect() if is_profile_enabled(): prof.step() step_id +=1 # 训练step加一,用于标识下一step torch.npu.mstx.range_end(range_id) # 标识当前训练step的结束
- 在容器内,以CANN软件包的运行用户登录环境,执行source ${install_path}/set_env.sh命令设置环境变量。其中${install_path}为CANN软件的安装目录。示例如下。
source /usr/local/Ascend/ascend-toolkit/set_env.sh
- 训练启动前,在训练脚本中导入LD_PRELOAD环境变量。该环境变量允许系统提前加载指定的so文件。示例如下。
export LD_PRELOAD=/usr/local/Ascend/ascend-toolkit/latest/lib64/libmspti.so:/usr/local/python3.8.17/lib/python3.8/site-packages/taskd/python/cython_api/libs/libtaskd.so
- libmspti.so:该so由MindStudio提供,集成在CANN包内。当使用默认安装路径时,路径为:/usr/local/Ascend/ascend-toolkit/latest/lib64/libmspti.so。
- libtaskd.so:该so由TaskD组件提供,安装该whl包后,路径为: TaskD所在路径/taskd/python/cython_api/libs/libtaskd.so。
TaskD所在路径可通过以下命令进行查询。回显中的Location字段即为TaskD所在路径。
pip show taskd
- 在分布式环境初始化完成,能够获取到全局rank之后,修改训练脚本,在训练脚本中拉起TaskD Manager,在管理进程中拉起TaskD Proxy,在训练进程内部拉起TaskD Worker。
- 拉起TaskD Manager和TaskD Proxy。
- PyTorch场景
- 创建manager.py文件,放在训练脚本同一目录下。manager.py文件内容如下所示。
from taskd.api import init_taskd_manager, start_taskd_manager import os job_id=os.getenv("MINDX_TASK_ID") node_nums=XX # 用户填入任务节点总数 proc_per_node=XX # 用户填入任务每个节点的训练进程数量 init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node}) start_taskd_manager()
- 在训练脚本中增加执行以下代码,拉起TaskD Manager和TaskD Proxy。
sed -i '/import logging/i import taskd.python.adaptor.patch' $(pip3 show torch | grep Location | awk -F ' ' '{print $2}')/torch/distributed/run.py if [[ "${RANK}" -eq 0 ]]; then export MASTER_ADDR=${POD_IP} python manager.py & fi torchrun ...
- 创建manager.py文件,放在训练脚本同一目录下。manager.py文件内容如下所示。
- MindSpore场景
- 创建manager.py文件,放在和训练脚本同一目录下,manager.py文件内容如下所示。
from taskd.api import init_taskd_manager, start_taskd_manager import os job_id=os.getenv("MINDX_TASK_ID") node_nums=XX # 用户填入任务节点总数 proc_per_node=XX # 用户填入任务每个节点的训练进程数量 init_taskd_manager({"job_id":job_id, "node_nums": node_nums, "proc_per_node": proc_per_node}) start_taskd_manager()
- 在训练脚本中增加执行以下代码拉起TaskD Manager。
if [[ "${MS_SCHED_HOST}" -eq "${POD_IP}" ]]; then python manager.py & fi msrun ...
- 修改mindspore/python/mindspore/parallel/cluster/process_entity/_api.py文件,拉起TaskD Proxy。示例如下。
... if ("TTP:1" in tft_env) or ("UCE:1" in tft_env) or ("ARF:1" in tft_env): try: from taskd.python.framework.agent.ms_mgr.msrun_plugin import MSRunPlugin from taskd .api.taskd_proxy_api import init_taskd_proxy from taskd.python.framework.common.type import CONFIG_UPSTREAMIP_KEY, LOCAL_HOST import threading proxy = threading.Thread(target=init_taskd_proxy, args=({CONFIG_UPSTREAMIP_KEY : os.getenv("MS_SCHED_HOST", LOCAL_HOST)},)) proxy.daemon = True proxy.start() self.msmgr = MSRunPlugin() self.msmgr.register_callbacks("KILL_WORKER", self.kill_workers) self.msmgr.register_callbacks("START_ALL_WORKER", self.start_all_workers) self.msmgr.register_callbacks("START_WORKER_LIST", self.start_worker_list) self.msmgr.register_callbacks("MONITOR", self.monitor_rank_status) self.enable_mindx = True os.environ["MS_ENABLE_RECOVERY"] = str(1) ...
- 创建manager.py文件,放在和训练脚本同一目录下,manager.py文件内容如下所示。
- PyTorch场景
- 拉起TaskD Worker。
- PyTorch-MindSpeed场景:修改./MindSpeed-LLM/mindspeed_llm/training/training.py文件,在代码中增加如下加粗字段。
def pretrain(train_valid_test_dataset_provider, model_provider, model_type, forward_step_func, process_non_loss_data_func=None, extra_args_provider=None, args_defaults={}): print_rank_0('time to initialize megatron (seconds): {:.3f}'.format( time.time() - _TRAIN_START_TIME)) print_datetime('after megatron is initialized') import torch.distributed as dist if dist.is_initialized(): rank = dist.get_rank() from taskd.api.taskd_worker_api import init_taskd_worker from taskd.api.taskd_worker_api import start_taskd_worker init_taskd_worker(rank,5000) start_taskd_worker() app_metrics['app_model_init_finish_time'] = one_logger_utils.get_timestamp_in_ms() one_logger_utils.on_pretrain_start()
- MindSpore-MindFormers场景:修改./mindformers/trainer/base_trainer.py文件,在代码中增加如下加粗字段。
def training_process( self, config: Optional[Union[dict, MindFormerConfig, ConfigArguments, TrainingArguments]] = None, network: Optional[Union[Cell, PreTrainedModel]] = None, dataset: Optional[Union[BaseDataset, GeneratorDataset]] = None, optimizer: Optional[Optimizer] = None, callbacks: Optional[Union[Callback, List[Callback]]] = None, compute_metrics: Optional[Union[dict, set]] = None, **kwargs): …… …… logger.info(".........Starting Training Model..........") if get_real_rank() % 8 == 0: pprint(config) logger.info(".........Model Compiling, Please Wait a Moment...........") try: rank = get_rank() from taskd.api.taskd_worker_api import init_taskd_worker from taskd.api.taskd_worker_api import start_taskd_worker init_taskd_worker(rank,5000) start_taskd_worker() except Exception as e: print("failed to call mindcluster taskd") model.train(config.runner_config.epochs, dataset, callbacks=callbacks, dataset_sink_mode=config.runner_config.sink_mode, sink_size=config.runner_config.sink_size, initial_epoch=config.runner_config.initial_epoch)
- PyTorch-MindSpeed场景:修改./MindSpeed-LLM/mindspeed_llm/training/training.py文件,在代码中增加如下加粗字段。
- 拉起TaskD Manager和TaskD Proxy。
- 修改任务YAML。
- 修改容器暴露端口,在所有的Pod下增加TaskD通信使用的端口9601。
ports: - containerPort: 9601 name: taskd-port
- 挂载轻量profiling落盘文件:轻量profiling数据写在容器内的/user/cluster-info/profiling路径下。如需在宿主机获取,请修改任务YAML,将该路径挂出。
- 容器内YAML挂载示例如下。
volumeMounts: - name: profilingdata mountPath: /user/cluster-info/
- 宿主机内YAML挂载示例如下。
volumes: - name: profileswitch hostPath: path: /user/cluster-info/datatrace-config/default.data-trace-default-test-pytorch-fault-mixtral
- 容器内YAML挂载示例如下。
- 修改容器暴露端口,在所有的Pod下增加TaskD通信使用的端口9601。
- 开启轻量profiling获取落盘数据。修改任务对应的data-trace ConfigMap或ClusterD提供的gRPC接口,接口信息见ModifyTrainingDataTraceSwitch,动态开启或关闭轻量profiling能力。
- 如果通过ClusterD提供的gRPC接口这种方式开启或修改轻量profiling获取落盘数据,创建的data-trace-<任务名称> ConfigMap的生命周期会随着任务的删除而删除。当任务不存在的时候,该接口会调用失败。
- 如果通过修改任务对应data-trace ConfigMap这种方式开启轻量profiling获取落盘数据,建议在创建ConfigMap时,指定OwnerReference为相应的任务。
- 以上2种方式不建议同时使用。
以default命名空间下的名为default-test-pytorch-fault-mixtral的任务为例,以编辑ConfigMap的方式开启轻量profiling获取落盘数据,示例如下。- 在master节点执行以下命令查询该任务对应的配置ConfigMap。
kubectl get cm
- 执行以下命令,创建配置轻量profiling获取落盘数据所需ConfigMap文件。
- 将以下内容写入datacm.yaml。
apiVersion: v1 kind: ConfigMap metadata: name: data-trace-default-test-pytorch-fault-mixtral # cm的名字需以data-trace为前缀+任务名 labels: reset: "true" data: profilingSwitch: '{"CommunicationOperator":"off","Step":"on","SaveCheckpoint":"on","FP":"on","DataLoader":"on"}'
- 在master节点执行以下命令,创建ConfigMap。
kubectl apply -f datacm.yaml
回显如下所示,表示ConfigMap创建成功。
[root@master~]# kubectl apply -f datacm.yaml configmap/data-trace-default-test-pytorch-fault-mixtral created
- 将以下内容写入datacm.yaml。
- 执行以下命令编辑ConfigMap文件。
kubectl edit cm data-trace-default-test-pytorch-fault-mixtral
- 如需开启通信算子,请将CommunicationOperator字段的取值改为“on”。
apiVersion: v1 data: profilingSwitch: '{"CommunicationOperator":"on","Step":"on","SaveCheckpoint":"on","FP":"on","DataLoader":"on"}'
开启通信算子后可能造成训练性能下降,不建议常态开启通信算子。
- 按“Esc”键,输入:wq!保存并退出。
获取性能劣化故障检测数据
- 落盘数据按rank进行分类,轻量profiling数据写在容器内的/user/cluster-info/profiling路径。
- 对于存在环境变量MINDX_TASK_ID的Pod,rank 0数据在容器内的路径为/user/cluster-info/profiling/$MINDX_TASK_ID/0。
- 如无该环境变量,默认会落盘到名为default_task_id_时间戳的文件夹内。
- /user/cluster-info/profiling达到配置的上限大小后,将进行文件老化,默认每次删除修改时间最早的20%个文件。老化过程中仅删除profiling目录下rank文件夹中的以数字命名的文件,建议不手动添加其他文件到profiling文件夹下。如果用户手动添加其他文件,TaskD不会将该文件删除,但该文件会占用空间。
- 轻量profiling文件以时间戳命名,各条记录以换行分割,每次追加写入rank下最新文件。最新文件大小超过10MB时,TaskD会新建profiling文件。如果使用NFS等网络存储方式,当数据同步较慢时,可能存在文件大小未达到10MB即创建新文件的情况。
父主题: 性能劣化故障