开发者
资源

平台适配

重调度模式

MindCluster Volcano中重调度代码示例如下:

func (tp *module) PreStartAction(ssn *framework.Session) error {
 moduleFullName := util.NPUCardName + util.ModuleAcceleratorType
 klog.V(util.LogInfoLev).Infof("Entering PreStartAction of %s...", moduleFullName)
 defer klog.V(util.LogInfoLev).Infof("Leaving PreStartAction of %s", moduleFullName)
 if tp == nil || ssn == nil || tp.FrameAttr.KubeClient == nil {
  return fmt.Errorf("%s handler not enabled or ssn is nil: %s", moduleFullName, util.ArgumentError)
 }
 tp.reHandle = rescheduling.New(&tp.ScheduleEnv, rescheduling.CmFaultJobKind)
 if tp.reHandle == nil {
  klog.V(util.LogErrorLev).Infof("create new fault handler failed.")
  return fmt.Errorf("%s reSchedule not enabled: %s", moduleFullName, util.ArgumentError)
 }
 tp.reHandle.NewReScheduler()
 tp.reHandle.SynCacheFaultNodeWithSession(util.NPUCardName)
 tp.reHandle.AddFaultNodeWithSession(util.NPUCardName)
 tp.reHandle.SynCacheFaultJobWithSession(ssn, util.NPUCardName, util.NPUCardNamePre)
 tp.reHandle.SynCacheNodeRankOccMapWithSession(ssn)
 // 1. restart Fault Jobs that are recorded in cache
 if restartErr := tp.reHandle.RestartNeedForceDeleteJobs(ssn); restartErr != nil {
  klog.V(util.LogErrorLev).Infof("%s RestartNeedForceDeleteJobs: %s", moduleFullName, restartErr.Error())
 }
 // 2. get all the new jobs in session
 runningJobs, getRunErr := tp.reHandle.GetRunningJobs(ssn, util.NPUCardName, util.ModuleAcceleratorType)
 if getRunErr != nil {
  klog.V(util.LogInfoLev).Infof("%s GetRunningJobs: %s", moduleFullName, getRunErr.Error())
 }
 // 3. get nodes of session and fault jobs 
 err := tp.reHandle.AddFaultJobWithSession(runningJobs, util.NPUCardName, util.NPUCardNamePre)
 if err != nil {
  klog.V(util.LogErrorLev).Infof("%s AddFaultJobWithSession", moduleFullName)
 }
 // 4. restart the fault jobs
 if restartErr := tp.reHandle.RestartFaultJobs(ssn); restartErr != nil {
  klog.V(util.LogErrorLev).Infof("%s RestartFaultJobs: %s", moduleFullName, restartErr.Error())
  return restartErr
 }
 // 5. save structure for later allocation process
 tp.reHandle.GenerateNodeRankIndexTaskMap()
 return nil
}

故障包括了内部的节点故障、芯片故障、参数面网络故障、业务面故障,将作为对外的信息放在K8s的ConfigMap中,以供外部查询和使用。查询命令为kubectl describe cm -n volcano-system vcjob-fault-npu-cm。

表1 回显参数说明

参数名

描述

fault-node

节点维度的故障信息

NodeName

节点名称

FaultDeviceList

故障列表

- fault_type

故障类型对象,对象包含fault_type、npu_name、large_model_fault_level、fault_level、fault_handling和fault_code等6个字段

  • NodeUnhealthy:节点故障
  • CardUnhealthy:芯片故障
  • CardNetworkUnhealthy:参数面网络故障(芯片网络相关故障)

- npu_name

故障的芯片名称,节点故障时为空

- large_model_fault_level

故障处理类型,节点故障时取值为空

  • NotHandleFault:不做处理
  • RestartRequest:推理场景需要重新执行推理请求,训练场景重新执行训练业务
  • RestartBusiness:需要重新执行业务
  • FreeRestartNPU:直接复位芯片并重新执行业务
  • RestartNPU:直接复位芯片并重新执行业务
  • SeparateNPU:隔离芯片
  • PreSeparateNPU:预隔离芯片,根据训练任务实际运行情况判断是否重调度
说明:

large_model_fault_level、fault_level和fault_handling参数功能一致,推荐使用fault_handling。

- fault_level

- fault_handling

- fault_code

故障码,由英文逗号拼接而成的字符串

  • Disconnected:芯片网络不连通故障
  • heartbeatTimeOut:节点心跳丢失故障

FaultTasks

任务维度的故障信息列表,包含Reason字段

- Reason

故障原因,字段就是故障列表下的五个字段组成的字符串

  • 故障类型为NodeUnhealthy,即节点故障时,会直接重新执行训练或推理任务,并隔离故障节点,将任务重新调度到其他节点执行任务,如果没有冗余的节点,则任务会持续处于Pending状态。
  • 故障类型为CardUnhealthy和CardNetworkUnhealthy,即芯片故障和参数面网络故障时,会根据具体故障类型决定是否重新执行任务、隔离芯片、复位芯片等,具体参考“fault_handling”字段的执行策略。
  • 故障类型为业务面故障时,MindCluster Volcano会检测是否开启无条件重试功能,开启后会重新调度到本节点并重新执行训练或推理任务,重试次数减1;当重试次数为0或者没有开启无条件重试功能时,不会对业务容器故障进行处理。

优雅容错模式

优雅容错模式基于故障重调度模式,集成优雅容错模式前请先完成故障重调度模式的集成。

  • PyTorch框架的模型,需要集群调度组件提供管理进程样例,用户可直接在训练脚本中执行该样例启动管理进程,以下为执行参考样例。
    训练脚本启动管理进程reset_process示例:
    # 单机多卡和分布式
    if [ $# == 6 ]; then
        export DEVICE_NUM=$1
        export SERVER_NUM=$2
        export RANK_SIZE=$1
        export RANK_TABLE_FILE=$3
     
        export SERVER_ID=$4
        device_each_server=$((DEVICE_NUM / SERVER_NUM))
        rank_start=$((${device_each_server} * SERVER_ID))
     
        DATA_PATH=$5
        CONFIG_PATH=$6
     
        # 先启动后台任务,最后留一个前台任务查看日志输出
        for((i=$((${device_each_server}-1)); i>=0; i--))
        do
            rankid=$((rank_start + i))
            export DEVICE_ID=${i}
            export RANK_ID=${rankid}
            rm -rf ${ROOT_PATH}/train_parallel${rankid}
            mkdir ${ROOT_PATH}/train_parallel${rankid}
            cp ${ROOT_PATH}/../*.py ${ROOT_PATH}/train_parallel${rankid}
            cp ${ROOT_PATH}/*.sh ${ROOT_PATH}/train_parallel${rankid}
            cp -r ${ROOT_PATH}/../src ${ROOT_PATH}/train_parallel${rankid}
            cd ${ROOT_PATH}/train_parallel${rankid} || exit
            echo "start training for rank $RANK_ID, device $DEVICE_ID"
            env > env.log
            python ${ROOT_PATH}/../train.py --run_distribute=True --device_num=${RANK_SIZE} --data_path=${DATA_PATH} --config_path=${CONFIG_PATH} &> log &
            train_pids[$i]=$!
        done
    else
        echo "Invalid input parameter, usage: main.sh device_count server_count rank_table_file server_id dataset config_file_path" | tee -a log
        exit 1
    fi
    python -u ${ROOT_PATH}/reset_process.py -p "${train_pids[@]}"
    wait
    reset_process基于Python语言编写,其中关键能力模块示例如下,用户可参考关键模块能力实现该管理进程能力。
    # Reset_Process中关键模块示例如下,用户可参考不同模块实现自己的管理进程
    # 获取故障卡RANK和恢复卡RANK
        def get_fault_ranks(self):
            fault_rank_list = self._get_ranks_from_cm(self.reset_cm_path, "unrecovered")
            if len(fault_rank_list) != 0:
                self.fault_rank_list = fault_rank_list
            return fault_rank_list
     
        def get_recover_ranks(self):
            recover_rank_list = self._get_ranks_from_cm(self.reset_cm_path, "recovered")
            if len(recover_rank_list) != 0:
                self.recover_rank_list = recover_rank_list
            return recover_rank_list
    # 停止训练进程
    def _kill_abnormal_process(self, abnormal_rank_list: list):
            if self.killed_abnormal:
                return
            try:
                logger.info(f"to kill abnormal rank {abnormal_rank_list}")
                self._process_manager.kill_fault_process(abnormal_rank_list)
                self.killed_abnormal = True
            except Exception as e:
                logger.error(f"an unexpected error {e} occur when kill abnormal process")
                self.exit_recover_process()
                         
        def _kill_normal_process(self, normal_rank_list: list):
            if self.killed_normal:
                return
            try:
                logger.info(f"to kill normal rank {normal_rank_list}")
                self._process_manager.kill_fault_process(normal_rank_list)
                self.killed_normal = True
            except Exception as e:
                logger.error(f"an unexpected error {e} occur when kill normal process")
                self.exit_recover_process()
    # 重启训练进程
    def restore_train_process(self):
            """
            Recover all target processes in this node
            """
            new_pid_list = multiprocessing.Manager().list()
            if not self.all_stopped() or self._restart:
                return new_pid_list
     
            process = []
            for rank in self._cmd_dict:
                command = self._cmd_dict[rank]
                pwd_path = self._env_dict[rank]['PWD']
                env_info = self._env_dict[rank]
                p = multiprocessing.Process(target=_run_recover, args=(command, pwd_path, env_info, new_pid_list))
                process.append(p)
                p.start()
     
            for p in process:
                p.join()
            self._restart = True
            logger.info(f"new pids are:{new_pid_list}")
       return new_pid_list
  • PyTorch框架的模型,安装集群调度组件断点续训whl包(mindx_elastic-0.0.1-py3-none-linux_{arch}.whl)后,可以通过将启动训练的python -m torch.distributed.launch xxx命令替换成python -m mindx_elastic.api.run xxx,从而实现PyTorch进程管理能力的拓展。