昇腾社区首页
中文
注册

集成示例

断点续训提供的故障发现、任务重调度等功能,当K8s集群中执行分布式训练任务发生故障时,可自动重调度和进行训练重启恢复。通过集成相关组件,可为第三方AI平台提供断点续训的能力。本文集成示例以Go编程语言作为实例,创建训练任务。

表1 断点续训通过平台集成方案使用流程和说明

关键流程

操作步骤

说明

平台集成

安装部署

在云平台的K8s集群中安装并配置好集群调度组件或自行适配可替换的组件。

平台二次开发

  • 平台将断点续训yaml示例转换成K8s提供的API(推荐使用Go编程语言)对象。
  • 平台使用K8s提供的API创建任务。
  • 平台使用K8s提供的API获取任务运行结果。

准备任务

准备镜像

准备训练镜像。

适配脚本

完成训练脚本适配。

启动训练

创建任务

通过平台配置并创建任务。

运行任务

通过平台查看任务运行结果。

前提条件

  • 平台训练集群已完成K8s纳管。
  • 平台需具备基本任务创建、下发以及执行和显示结果的能力。
  • 平台已完成前提条件的准备。
  • 平台已实现断点续训的故障发现故障处理训练重启功能。
  • 提前了解准备任务yaml章节提供的yaml示例和yaml参数,可以更好地帮助用户进行接下来的操作。

平台集成

  • 以下操作步骤中仅给出了部分示例代码,该代码不能直接使用。
  • 以下操作步骤的整体逻辑,可以参见组件调用流程章节。
  • 使用MindCluster HCCL Controller的用户需要创建Volcano Job对象;使用MindCluster Ascend Operator的用户需要创建Ascend Job对象。
  1. 安装部署。
    • 在云平台的K8s集群中安装并配置好集群调度中的MindCluster VolcanoMindCluster NodeDMindCluster Ascend Device PluginMindCluster HCCL ControllerMindCluster Ascend Operator组件。
    • 若云平台使用自己的调度器,未部署MindCluster Volcano组件,可参考MindCluster Volcano实现逻辑(MindX DL 亲和性调度方案说明)对平台的调度器进行适配。
  2. 创建ConfigMap,示例如下。该ConfigMap“rings-config-”开头,用于生成hccl配置信息,后面需要带上任务名,否则MindCluster Volcano无法正确识别该ConfigMap
    import v1 "k8s.io/api/core/v1"
    func newConfigMap(name string) *v1.ConfigMap {
           cm := &v1.ConfigMap{}
           cm.Name = name
           cm.Labels = map[string]string{
                  "ring-controller.atlas": "ascend-{xxx}b",    # 用于识别Atlas A2 训练系列产品Atlas 训练系列产品Atlas 训练系列产品不需要该参数
           }
           cm.Data = map[string]string{
                  "hccl.json": `{"status": "initializing"}`,
           }
           return cm
    }
    clientset.CoreV1().ConfigMaps(job.Namespace).Create(context.TODO(), newConfigMap("rings-config-"+job.Name), metav1.CreateOptions{})                      
  3. 创建Job对象。开启重调度需要配置metedate.labels.fault-scheduling为force,示例如下:
    • 创建Volcano Job对象。以vcjob的yaml示例,开启重调度需要配置metedate.labels.fault-scheduling为force,编码示例如下。
      import (
         "k8s.io/api/core/v1"
         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
         "k8s.io/client-go/kubernetes"
         "k8s.io/client-go/tools/clientcmd"
         "volcano.sh/apis/pkg/apis/batch/v1alpha1"
         "volcano.sh/apis/pkg/client/clientset/versioned"
      )
      
      func initJob() v1alpha1.Job {
         job := newJobBuilder().
            initNameSpace("vcjob").       // 初始化命名空间
            initName("mindx-dls-test").   // 初始化任务名
            initLabels(map[string]string{ // 初始化任务标签
               "ring-controller.atlas": "ascend-{xxx}b", // 固定字段,根据实际情况填写取值
               "fault-scheduling":      "force",       // 开启重调度模式,force为故障强制删除任务
               "fault-retry-times":   "3",  // 开启业务面故障无条件重试能力,'3'为最大重试次数;同时需要将容器重启策略设置为Never
            }).
            initMinAvailable(2).          // minAvailable表示运行该job所要运行的最少Pod数量。只有当job中处于running状态的Pod数量不小于minAvailable时,才认为该job运行正常
            initSchedulerName("volcano"). // 使用volcano作为调度,用户可根据实际情况修改
            initPolicies([]v1alpha1.LifecyclePolicy{{Event: "PodEvicted", Action: "RestartJob"}}).
            initPlugins(map[string][]string{"ssh": {}, "env": {}, "svc": {}}). // 初始化调度插件
            initMaxRetry(3).                                                   // maxRetry表示当该job可以进行的最大重启次数,取值需要大于或等于fault-retry-times的取值
            initQueue("default" ).                                              // queue表示该job所属的队列
            addTask(v1alpha1.TaskSpec(newTaskSpec().
               initTaskName("default-test").       // 初始化task名
               initTaskReplicas(2).                // 初始化task副本数
               initTaskAffinity("mindx-dls-test"). // 初始化任务反亲和性字段,输入值与任务名相同
               initTaskLabels(map[string]string{
                  "app":                   "mindspore",   // 固定字段
                  "ring-controller.atlas": "ascend-{xxx}b", //  固定字段,根据实际情况填写
               }).
               initTaskNodeSelector(map[string]string{
                  "host-arch":        "huawei-x86",     // 用户根据实际架构配置,arm架构取值为huawei-arm
                  "accelerator-type": "module-{xxx}b-8", // 用户根据服务器类型进行配置,value值可以参考yaml参数
               }).
               initTaskVolumes(). // 初始化挂载项
               addTaskContainers(v1.Container(newContainer().
                  initContainerName("mindspore").                                             // 初始化容器名
                  initContainerImage("ms-arm:b120").                                          // 初始化镜像名
                  initContainerImagePullPolicy("IfNotPresent").                               // 初始化镜像拉取策略
                  initContainerLimits("huawei.com/Ascend910", "8").                           // 初始化任务资源
                  initContainerRequests("huawei.com/Ascend910", "8").                         // 初始化任务资源
                  initContainerVolumeMounts().                                                // 初始化容器挂载项
                  initContainerEnv("MindSpore").                                              // 初始化容器环境变量
                  initContainerCommand([]string{"/bin/bash", "-c", "bash train_start.sh ..."}))))) // 初始化容器启动命令,具体参数参考示例yaml
                  initContainerRestartPolicy("Never").                                   // 初始化容器重启策略,开启无条件重试时需要设置为Never
         return v1alpha1.Job(job)
      }
      
      type vcJob v1alpha1.Job
      type vcTask v1alpha1.TaskSpec
      type container v1.Container
      
      // 初始化任务名
      func (job *vcJob) initName(n string) *vcJob {
         job.Name = n
         return job
      }
      ...
      // 初始化task名
      func (task vcTask) initTaskName(tn string) vcTask {
         task.Name = tn
         return task
      }
      ...
      // 初始化容器环境变量
      func (ct container) initContainerEnv(framework string) container {
         ct.Env = []v1.EnvVar{
            {
               Name: "mindx-dls-test", // 任务名称
               ValueFrom: &v1.EnvVarSource{
                  FieldRef: &v1.ObjectFieldSelector{FieldPath: "metadata.name"},
               },
            },
            {
               Name: "XDL_IP", // 固定字段
               ValueFrom: &v1.EnvVarSource{
                  FieldRef: &v1.ObjectFieldSelector{FieldPath: "status.hostIP"},
               },
            },
            {
               Name:  "framework",
               Value: framework, // 使用的训练框架名称,如MindSpore
            },
         }
         return ct
      }

      需要在vcjob类型任务的yaml中配置失败重试机制:"maxRetry"

    • 创建Ascend Job对象。以acjob的yaml为例,开启重调度模式需要配置metedate.labels.fault-scheduling为force,编码示例如下。
      import (
         v1 "ascend-operator-apis/pkg/apis/batch/v1"
         "ascend-operator-apis/pkg/client/clientset/versioned"
         commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
         corev1 "k8s.io/api/core/v1"
         "k8s.io/apimachinery/pkg/api/resource"
         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
         "k8s.io/client-go/tools/clientcmd"
      )
      
      func initAcJob() v1.AscendJob {
         job := newAcJob().
            initName("default-test-pytorch"). // 初始化任务名
            initNameSpace("default").         // 初始化命名空间
            initLabels(map[string]string{     // 初始化任务标签
               "ring-controller.atlas": "ascend-{xxx}b",   // 固定字段,使用Atlas 训练系列产品时不需要该字段
               "fault-scheduling":      "force",         // 开启重调度模式,force为故障强制删除任务
               "fault-retry-times":   "3",  // 开启业务面故障的无条件重试能力,'3'为最大重试次数;同时需要将容器重启策略设置为Never
               "framework":             "pytorch",       // 使用的训练框架名,根据实际框架类型填写取值
               "tor-affinity":          "normal-schema", // 是否使用交换亲和性调度,large-model-schema表示使用大模型调度模式,normal-schema表示使用普通任务调度模式,null表示关闭交换机亲和性调度
            }).
            initSchedulerName("volcano"). // 初始化调度器名
            initRunPolicy(&maNum).        // 初始化RunPolicy
            initSuccessPolicy().
            addReplicaSpecs("Master", newReplica(). // 初始化Master副本
                           initRcReplicas(&rcNum).                           // 初始化Pod副本数
                           initRcRestartPolicy(commonv1.RestartPolicyNever). // 初始化容器重启策略
                           initRcLabels(map[string]string{                   // 初始化Master的标签
                  "ring-controller.atlas": "ascend-{xxx}b", // 固定字段,根据实际情况填写取值
               }).                                   //
               initRcNodeSelector(map[string]string{ // 初始化Master的NodeSelector
                  "host-arch":        "huawei-x86",     // 用户根据实际架构配置,arm架构取值为huawei-arm
                  "accelerator-type": "module-{xxx}b-8", // 用户根据服务器类型进行配置,取值可以参考yaml参数
                           }).
                           initRcVolumes(). // 初始化挂载项
                           addRcContainers(newContainer().
                              initContainerName("ascend").                                             // 初始化容器名
                              initContainerImage("pt-arm:b120").                                       // 初始化镜像名
                              initContainerImagePullPolicy(corev1.PullIfNotPresent).                   // 初始化镜像拉取策略
                              initContainerEnv().                                                      // 初始化容器环境变量
                              initContainerCommand([]string{"/bin/bash", "-c", "bash train_start.sh ..."}).  // 初始化容器启动命令,具体参数参考示例yaml
                              initContainerArgs([]string{"/bin/bash", "-c", "bash train_start.sh ..."}).  // 初始化容器启动命令,具体参数参考示例yaml
                              initContainerPorts(2222).                                                // 初始化容器端口
                              initContainerLimits("huawei.com/Ascend910", "8").                        // 初始化任务资源
                              initContainerRequests("huawei.com/Ascend910", "8").                      // 初始化任务资源
                              initContainerVolumeMounts()).                                            // 初始化容器挂载项
                           initReplica()).
            addReplicaSpecs("Worker", newReplica(). // 初始化Worker副本
                           initRcReplicas(&rcNum).                           // 初始化pod副本数
                           initRcRestartPolicy(commonv1.RestartPolicyNever). // 初始化容器重启策略
                           initRcLabels(map[string]string{                   // 初始化Worker的标签
                  "ring-controller.atlas": "ascend-{xxx}b", // 固定字段,使用Atlas 训练系列产品时不需要该字段
               }).
               initRcAffinity("default-test-pytorch"). // 初始化Worker的反亲和性字段
               initRcNodeSelector(map[string]string{   // 初始化Worker的NodeSelector
                  "host-arch":        "huawei-x86",     // 用户根据实际架构配置,arm架构取值为huawei-arm
                  "accelerator-type": "module-{xxx}b-16", // 用户根据服务器类型进行配置,value值可以参考yaml参数
               }).
               initRcVolumes().
               addRcContainers(newContainer().
                  initContainerName("ascend").                                                  // 初始化容器名
                  initContainerImage("pt-arm:b120").                                            // 初始化镜像名
                  initContainerImagePullPolicy(corev1.PullIfNotPresent).                        // 初始化镜像拉取策略
                  initContainerEnv().                                                           // 初始化容器环境变量
                  initContainerCommand([]string{"/bin/bash", "-c", "bash train_start.sh ..."}). // 初始化容器启动命令,具体参数参考示例yaml
                  initContainerArgs([]string{"/bin/bash", "-c", "bash train_start.sh ..."}).    // 初始化容器启动命令,具体参数参考示例yaml
                  initContainerPorts(2222).                                                     // 初始化容器端口
                  initContainerLimits("huawei.com/Ascend910", "8").                             // 初始化任务资源
                  initContainerRequests("huawei.com/Ascend910", "8").                           // 初始化任务资源
                  initContainerVolumeMounts()).
                  initContainerRestartPolicy("Never").                                   // 初始化容器重启策略,开启无条件重试时需要设置为Never
               initReplica())
         return v1.AscendJob(job)
      }
      
      type acJob v1.AscendJob
      type Replica commonv1.ReplicaSpec
      type container corev1.Container
      
      func (job acJob) initRunPolicy(n *int32) acJob {
         job.Spec.RunPolicy = commonv1.RunPolicy{SchedulingPolicy: &commonv1.SchedulingPolicy{MinAvailable: n, Queue: "default"}}
         return job
      }
      ...
      func (rc Replica) initRcReplicas(rs *int32) Replica {
         rc.Replicas = rs
         return rc
      }
      ...
      func (ct container) initContainerEnv() container {
         ct.Env = []corev1.EnvVar{
            {
               Name: "XDL_IP",
               ValueFrom: &corev1.EnvVarSource{
                  FieldRef: &corev1.ObjectFieldSelector{FieldPath: "status.hostIP"},
               },
            },
         }
         return ct
      }
  4. 平台实现通过API创建配置任务。
    1. 创建ConfigMap
      clientset.CoreV1().ConfigMaps(job.Namespace).Create(context.TODO(), newConfigMap(configMapName), metav1.CreateOptions{})
    2. 下发训练任务。
      • Volcano Job 示例:
        vcjobClient.BatchV1alpha1().Jobs("vcjob").Create(context.TODO(), &job, metav1.CreateOptions{})
      • Ascend Job 示例:
        acjobClient.BatchV1().Jobs("default").Create(context.TODO(), &job, metav1.CreateOptions{})
  5. 查询任务信息。
    • Volcano Job 示例:
      vcjobClient.BatchV1alpha1().Jobs("vcjob").Get(context.TODO(), job.Name, metav1.GetOptions{})
    • Ascend Job 示例:
      acjobClient.BatchV1().Jobs("default").Get(context.TODO(), job.Name, metav1.GetOptions{})

集成后使用

平台用户在平台集成后使用断点续训的操作和示例可参考通过平台使用