昇腾社区首页
中文
注册

平台适配

重调度模式

MindCluster Volcano中重调度代码示例如下:

func (tp *module) PreStartAction(ssn *framework.Session) error {
 moduleFullName := util.NPUCardName + util.ModuleAcceleratorType
 klog.V(util.LogInfoLev).Infof("Entering PreStartAction of %s...", moduleFullName)
 defer klog.V(util.LogInfoLev).Infof("Leaving PreStartAction of %s", moduleFullName)
 if tp == nil || ssn == nil || tp.FrameAttr.KubeClient == nil {
  return fmt.Errorf("%s handler not enabled or ssn is nil: %s", moduleFullName, util.ArgumentError)
 }
 tp.reHandle = rescheduling.New(&tp.ScheduleEnv, rescheduling.CmFaultJobKind)
 if tp.reHandle == nil {
  klog.V(util.LogErrorLev).Infof("create new fault handler failed.")
  return fmt.Errorf("%s reSchedule not enabled: %s", moduleFullName, util.ArgumentError)
 }
 tp.reHandle.NewReScheduler()
 tp.reHandle.SynCacheFaultNodeWithSession(util.NPUCardName)
 tp.reHandle.AddFaultNodeWithSession(util.NPUCardName)
 tp.reHandle.SynCacheFaultJobWithSession(ssn, util.NPUCardName, util.NPUCardNamePre)
 tp.reHandle.SynCacheNodeRankOccMapWithSession(ssn)
 // 1. restart Fault Jobs that are recorded in cache
 if restartErr := tp.reHandle.RestartNeedForceDeleteJobs(ssn); restartErr != nil {
  klog.V(util.LogErrorLev).Infof("%s RestartNeedForceDeleteJobs: %s", moduleFullName, restartErr.Error())
 }
 // 2. get all the new jobs in session
 runningJobs, getRunErr := tp.reHandle.GetRunningJobs(ssn, util.NPUCardName, util.ModuleAcceleratorType)
 if getRunErr != nil {
  klog.V(util.LogInfoLev).Infof("%s GetRunningJobs: %s", moduleFullName, getRunErr.Error())
 }
 // 3. get nodes of session and fault jobs 
 err := tp.reHandle.AddFaultJobWithSession(runningJobs, util.NPUCardName, util.NPUCardNamePre)
 if err != nil {
  klog.V(util.LogErrorLev).Infof("%s AddFaultJobWithSession", moduleFullName)
 }
 // 4. restart the fault jobs
 if restartErr := tp.reHandle.RestartFaultJobs(ssn); restartErr != nil {
  klog.V(util.LogErrorLev).Infof("%s RestartFaultJobs: %s", moduleFullName, restartErr.Error())
  return restartErr
 }
 // 5. save structure for later allocation process
 tp.reHandle.GenerateNodeRankIndexTaskMap()
 return nil
}

故障包括了内部的节点故障、芯片故障、参数面网络故障、业务面故障,将作为对外的信息放在K8s的ConfigMap中,以供外部查询和使用。查询命令为kubectl describe cm -n volcano-system vcjob-fault-npu-cm,命令回显的参数说明见表1

表1 回显参数说明

参数名

描述

fault-node

节点维度的故障信息

NodeName

节点名称

FaultDeviceList

故障列表

- fault_type

故障类型对象,对象包含fault_type、npu_name、large_model_fault_level、fault_level、fault_handling和fault_code等6个字段

  • NodeUnhealthy:节点故障
  • CardUnhealthy:芯片故障
  • CardNetworkUnhealthy:芯片网络故障

- npu_name

故障的芯片名称,节点故障时为空

- large_model_fault_level

故障处理类型,节点故障时取值为空

  • NotHandleFault:不做处理
  • RestartRequest:推理场景需要重新执行推理请求,训练场景重新执行训练业务
  • RestartBusiness:需要重新执行业务
  • FreeRestartNPU:直接复位芯片并重新执行业务
  • RestartNPU:直接复位芯片并重新执行业务
  • SeparateNPU:隔离芯片
  • PreSeparateNPU:预隔离芯片,根据训练任务实际运行情况判断是否重调度
说明:

large_model_fault_level、fault_level和fault_handling参数功能一致,推荐使用fault_handling。

- fault_level

- fault_handling

- fault_code

故障码,由英文逗号拼接而成的字符串

  • Disconnected:芯片网络不连通故障
  • heartbeatTimeOut:节点心跳丢失故障

FaultTasks

任务维度的故障信息列表,包含Reason字段

- Reason

故障原因,字段就是故障列表下的五个字段组成的字符串

  • 故障类型为NodeUnhealthy,即节点故障时,会直接重新执行训练或推理任务,并隔离故障节点,将任务重新调度到其他节点执行任务,如果没有冗余的节点,则任务会持续处于Pending状态。
  • 故障类型为CardUnhealthy和CardNetworkUnhealthy,即芯片故障和参数面网络故障时,会根据具体故障类型决定是否重新执行任务、隔离芯片、复位芯片等,具体参考“fault_handling”字段的执行策略。
  • 故障类型为业务面故障时,MindCluster Volcano会检测是否开启无条件重试功能,开启后会重新调度到本节点并重新执行训练或推理任务,重试次数减1;当重试次数为0或者没有开启无条件重试功能时,不会对业务容器故障进行处理。

优雅容错模式

优雅容错模式基于故障重调度模式,集成优雅容错模式前请先完成故障重调度模式集成。集群调度组件提供管理进程样例,用户可直接在训练脚本中执行该样例启动管理进程,以下为执行参考样例。
训练脚本启动管理进程reset_process示例:
# 单机多卡和分布式
if [ $# == 6 ]; then
    export DEVICE_NUM=$1
    export SERVER_NUM=$2
    export RANK_SIZE=$1
    export RANK_TABLE_FILE=$3
 
    export SERVER_ID=$4
    device_each_server=$((DEVICE_NUM / SERVER_NUM))
    rank_start=$((${device_each_server} * SERVER_ID))
 
    DATA_PATH=$5
    CONFIG_PATH=$6
 
    # 先启动后台任务,最后留一个前台任务查看日志输出
    for((i=$((${device_each_server}-1)); i>=0; i--))
    do
        rankid=$((rank_start + i))
        export DEVICE_ID=${i}
        export RANK_ID=${rankid}
        rm -rf ${ROOT_PATH}/train_parallel${rankid}
        mkdir ${ROOT_PATH}/train_parallel${rankid}
        cp ${ROOT_PATH}/../*.py ${ROOT_PATH}/train_parallel${rankid}
        cp ${ROOT_PATH}/*.sh ${ROOT_PATH}/train_parallel${rankid}
        cp -r ${ROOT_PATH}/../src ${ROOT_PATH}/train_parallel${rankid}
        cd ${ROOT_PATH}/train_parallel${rankid} || exit
        echo "start training for rank $RANK_ID, device $DEVICE_ID"
        env > env.log
        python ${ROOT_PATH}/../train.py --run_distribute=True --device_num=${RANK_SIZE} --data_path=${DATA_PATH} --config_path=${CONFIG_PATH} &> log &
        train_pids[$i]=$!
    done
else
    echo "Invalid input parameter, usage: main.sh device_count server_count rank_table_file server_id dataset config_file_path" | tee -a log
    exit 1
fi
python -u ${ROOT_PATH}/reset_process.py -p "${train_pids[@]}"
wait
Reset_Process基于Python语言编写,其中关键能力模块示例如下,用户可参考关键模块能力实现该管理进程能力。
# Reset_Process中关键模块示例如下,用户可参考不同模块实现自己的管理进程
# 获取故障卡RANK和恢复卡RANK
    def get_fault_ranks(self):
        fault_rank_list = self._get_ranks_from_cm(self.reset_cm_path, "unrecovered")
        if len(fault_rank_list) != 0:
            self.fault_rank_list = fault_rank_list
        return fault_rank_list
 
    def get_recover_ranks(self):
        recover_rank_list = self._get_ranks_from_cm(self.reset_cm_path, "recovered")
        if len(recover_rank_list) != 0:
            self.recover_rank_list = recover_rank_list
        return recover_rank_list
# 停止训练进程
def _kill_abnormal_process(self, abnormal_rank_list: list):
        if self.killed_abnormal:
            return
        try:
            logger.info(f"to kill abnormal rank {abnormal_rank_list}")
            self._process_manager.kill_fault_process(abnormal_rank_list)
            self.killed_abnormal = True
        except Exception as e:
            logger.error(f"an unexpected error {e} occur when kill abnormal process")
            self.exit_recover_process()
                     
    def _kill_normal_process(self, normal_rank_list: list):
        if self.killed_normal:
            return
        try:
            logger.info(f"to kill normal rank {normal_rank_list}")
            self._process_manager.kill_fault_process(normal_rank_list)
            self.killed_normal = True
        except Exception as e:
            logger.error(f"an unexpected error {e} occur when kill normal process")
            self.exit_recover_process()
# 重启训练进程
def restore_train_process(self):
        """
        Recover all target processes in this node
        """
        new_pid_list = multiprocessing.Manager().list()
        if not self.all_stopped() or self._restart:
            return new_pid_list
 
        process = []
        for rank in self._cmd_dict:
            command = self._cmd_dict[rank]
            pwd_path = self._env_dict[rank]['PWD']
            env_info = self._env_dict[rank]
            p = multiprocessing.Process(target=_run_recover, args=(command, pwd_path, env_info, new_pid_list))
            process.append(p)
            p.start()
 
        for p in process:
            p.join()
        self._restart = True
        logger.info(f"new pids are:{new_pid_list}")
   return new_pid_list