集成后使用

本章节以整卡调度特性为例,介绍如何将整卡调度特性集成在AI平台上的关键操作步骤。在下发训练任务时,平台需要实现获取认证文件,创建客户端,创建Job对象,创建命名空间和调用接口下发训练任务等,将整卡调度特性提供的示例YAML转换成K8s提供的Go编程语言的API对象。

集成前说明

集成操作中会涉及到很多接口,请用户根据实际情况去相关官网了解接口的详细信息,本文档不再进行二次说明。

集成操作

  1. 获取K8s认证文件。

    用户根据实际情况选择合适的集群认证方式,创建相应的集群配置。使用ServiceAccount创建集群配置(InCluster模式)示例代码如下。

           // 使用ServiceAccount创建集群配置(InCluster模式)
           if config, err = rest.InClusterConfig(); err != nil {
                  // 使用KubeConfig文件创建集群配置
                  if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
                         panic(err.Error())
                  }
           }

  2. 创建客户端clientset。

    Client, err := NewForConfig(cfg)

    NewForConfig(cfg)的函数原型为NewForConfig(c *rest.Config)(*Clientset, error)。

    参数说明如下:

    • *rest.Config:客户端配置文件,由K8s提供的接口生成;包括cluster host、证书等信息。
    • *Clientset:Client集合,包括AscendJob client(或VolcanoJob client)和discovery client。
    • error:错误信息。

  3. 创建Job对象。通过环境变量配置资源信息的用户需要创建Ascend Job对象;通过文件配置资源信息的用户需要创建Volcano Job对象。

    在进行本步骤操作之前,建议用户详细阅读准备任务YAML章节,了解示例YAML实现逻辑和关键字段说明,可以更好地帮助用户进行接下来的操作。

    • 创建Ascend Job对象。

      创建Ascend Job对象,初始化Ascend Job相关字段,示例如下。

      import (
         v1 "ascend-operator-apis/pkg/apis/batch/v1"
         "ascend-operator-apis/pkg/client/clientset/versioned"
         commonv1 "github.com/kubeflow/common/pkg/apis/common/v1"
         corev1 "k8s.io/api/core/v1"
         "k8s.io/apimachinery/pkg/api/resource"
         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
         "k8s.io/client-go/tools/clientcmd"
      )
      
      func initAcJob() v1.AscendJob {
         job := newAcJob().
            initName("default-test-pytorch"). // 初始化任务名
            initNameSpace("default").         // 初始化命名空间
            initLabels(map[string]string{     // 初始化任务标签
               "ring-controller.atlas": "ascend-{xxx}b",   // 标识任务使用的芯片的产品类型
               "framework":             "pytorch",       // 使用的训练框架名
               "tor-affinity":          "normal-schema", // 是否使用交换亲和性调度。value为large-model-schema,表示使用大模型调度模式;value为normal-schema,使用普通任务调度模式;value为null,表示关闭交换机亲和性调度
            }).
            initSchedulerName("volcano"). // 初始化调度器名
            initRunPolicy(&maNum).        // 初始化RunPolicy
            initSuccessPolicy().
            addReplicaSpecs("Master", newReplica(). // 初始化Master副本
                           initRcReplicas(&rcNum).                           // 初始化pod副本数
                           initRcRestartPolicy(commonv1.RestartPolicyNever). // 初始化容器重启策略
                           initRcLabels(map[string]string{                   // 初始化Master的标签
                  "ring-controller.atlas": "ascend-{xxx}b", // 标识任务使用的芯片的产品类型
               }).                                   //
               initRcNodeSelector(map[string]string{ // 初始化Master的NodeSelector
                  "host-arch":        "huawei-x86",     // 可选字段,用户根据实际需求进行配置
                  "accelerator-type": "module-{xxx}b-16", // 用户根据服务器类型进行配置,取值可以参考YAML参数
                           }).
                           initRcVolumes(). // 初始化挂载项
                           addRcContainers(newContainer().
                              initContainerName("ascend").                                             // 初始化容器名
                              initContainerImage("pt-arm:b120").                                       // 初始化镜像名
                              initContainerImagePullPolicy(corev1.PullIfNotPresent).                   // 初始化镜像拉取策略
                              initContainerEnv().                                                      // 初始化容器环境变量
                              initContainerCommand([]string{"/bin/bash", "-c", "bash train_start.sh ..."}).  // 初始化容器启动命令,具体参数参考示例YAML
                              initContainerArgs([]string{"/bin/bash", "-c", "bash train_start.sh ..."}).  // 初始化容器启动命令,具体参数参考示例YAML
                              initContainerPorts(2222).                                                // 初始化容器端口
                              initContainerLimits("huawei.com/Ascend910", "8").                        // 初始化任务资源
                              initContainerRequests("huawei.com/Ascend910", "8").                      // 初始化任务资源
                              initContainerVolumeMounts()).                                            // 初始化容器挂载项
                           initReplica()).
            addReplicaSpecs("Worker", newReplica(). // 初始化Worker副本
                           initRcReplicas(&rcNum).                           // 初始化pod副本数
                           initRcRestartPolicy(commonv1.RestartPolicyNever). // 初始化容器重启策略
                           initRcLabels(map[string]string{                   // 初始化Worker的标签
                  "ring-controller.atlas": "ascend-{xxx}b", // 标识任务使用的芯片的产品类型
               }).
               initRcAffinity("default-test-pytorch"). // 初始化Worker的反亲和性字段
               initRcNodeSelector(map[string]string{   // 初始化Worker的NodeSelector
                  "host-arch":        "huawei-x86",     // 用户根据实际架构配置arm架构value为huawei-arm
                  "accelerator-type": "module-{xxx}b-8", // 用户根据服务器类型进行配置,value值可以参考YAML参数
               }).
               initRcVolumes().
               addRcContainers(newContainer().
                  initContainerName("ascend").                                                  // 初始化容器名
                  initContainerImage("pt-arm:b120").                                            // 初始化镜像名
                  initContainerImagePullPolicy(corev1.PullIfNotPresent).                        // 初始化镜像拉取策略
                  initContainerEnv().                                                           // 初始化容器环境变量
                  initContainerCommand([]string{"/bin/bash", "-c", "bash train_start.sh ..."}). // 初始化容器启动命令,具体参数参考示例YAML
                  initContainerArgs([]string{"/bin/bash", "-c", "bash train_start.sh ..."}).    // 初始化容器启动命令,具体参数参考示例YAML
                  initContainerPorts(2222).                                                     // 初始化容器端口
                  initContainerLimits("huawei.com/Ascend910", "8").                             // 初始化任务资源
                  initContainerRequests("huawei.com/Ascend910", "8").                           // 初始化任务资源
                  initContainerVolumeMounts()).
               initReplica())
         return v1.AscendJob(job)
      }
      
      type acJob v1.AscendJob
      type Replica commonv1.ReplicaSpec
      type container corev1.Container
      
      func (job acJob) initRunPolicy(n *int32) acJob {
         job.Spec.RunPolicy = commonv1.RunPolicy{SchedulingPolicy: &commonv1.SchedulingPolicy{MinAvailable: n, Queue: "default"}}
         return job
      }
      ...
      func (rc Replica) initRcReplicas(rs *int32) Replica {
         rc.Replicas = rs
         return rc
      }
      ...
      func (ct container) initContainerEnv() container {
         ct.Env = []corev1.EnvVar{
            {
               Name: "XDL_IP",
               ValueFrom: &corev1.EnvVarSource{
                  FieldRef: &corev1.ObjectFieldSelector{FieldPath: "status.hostIP"},
               },
            },
         }
         return ct
      }
    • 创建Volcano Job对象。
      1. 初始化Volcano Job挂载的ConfigMap。初始化ConfigMap相关字段,示例如下。
        import "k8s.io/api/core/v1"                                                 
        func newConfigMap(name string) *v1.ConfigMap {
               cm := &v1.ConfigMap{}                           
               cm.Name = name
               cm.Labels = map[string]string{
                      "ring-controller.atlas": "ascend-{xxx}b",  # 标识任务使用的芯片的产品类型
               }
               cm.Data = map[string]string{
                      "hccl.json": `{"status": "initializing"}`,  
               }
               return cm
        }
      2. 初始化Volcano Job。创建Volcano Job对象,初始化Volcano Job相关字段,示例如下。
        import (
           "k8s.io/api/core/v1"
           metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
           "k8s.io/client-go/kubernetes"
           "k8s.io/client-go/tools/clientcmd"
           "volcano.sh/apis/pkg/apis/batch/v1alpha1"
           "volcano.sh/apis/pkg/client/clientset/versioned"
        )
        
        func initJob() v1alpha1.Job {
           job := newJobBuilder().
              initNameSpace("vcjob").       // 初始化命名空间
              initName("mindx-dls-test").   // 初始化任务名
              initLabels(map[string]string{ // 初始化任务标签
                 "ring-controller.atlas": "ascend-{xxx}b", // 标识任务使用的芯片的产品类型
              }).
              initMinAvailable(2).          // minAvailable表示运行该job所要运行的最少Pod数量。只有当job中处于running状态的Pod数量不小于minAvailable时,才认为该job运行正常
              initSchedulerName("volcano"). // 使用Volcano作为调度,用户可根据实际情况修改
              initPolicies([]v1alpha1.LifecyclePolicy{{Event: "PodEvicted", Action: "RestartJob"}}).
              initPlugins(map[string][]string{"ssh": {}, "env": {}, "svc": {}}). // 初始化调度插件
              initMaxRetry(3).                                                   // maxRetry表示该job可以进行的最大重启次数
              initQueue("default" ).                                              // queue表示该job所属的队列
              addTask(v1alpha1.TaskSpec(newTaskSpec().
                 initTaskName("default-test").       // 初始化task名
                 initTaskReplicas(2).                // 初始化task副本数
                 initTaskAffinity("mindx-dls-test"). // 初始化任务反亲和性字段,输入值与任务名相同
                 initTaskLabels(map[string]string{
                    "app":                   "mindspore",   // 固定字段
                    "ring-controller.atlas": "ascend-{xxx}b", //  标识任务使用的芯片的产品类型
                 }).
                 initTaskNodeSelector(map[string]string{
                    "host-arch":        "huawei-x86",     // 可选字段,用户根据实际需求配置
                    "accelerator-type": "module-{xxx}b-8", // 用户根据服务器类型进行配置
                 }).
                 initTaskVolumes(). // 初始化挂载项
                 addTaskContainers(v1.Container(newContainer().
                    initContainerName("mindspore").                                             // 初始化容器名
                    initContainerImage("ms-arm:b120").                                          // 初始化镜像名
                    initContainerImagePullPolicy("IfNotPresent").                               // 初始化镜像拉取策略
                    initContainerLimits("huawei.com/Ascend910", "8").                           // 初始化任务资源
                    initContainerRequests("huawei.com/Ascend910", "8").                         // 初始化任务资源
                    initContainerVolumeMounts().                                                // 初始化容器挂载项
                    initContainerEnv("MindSpore").                                              // 初始化容器环境变量
                    initContainerCommand([]string{"/bin/bash", "-c", "bash train_start.sh ..."}))))) // 初始化容器启动命令,具体参数参考示例YAML
           return v1alpha1.Job(job)
        }
        
        type vcJob v1alpha1.Job
        type vcTask v1alpha1.TaskSpec
        type container v1.Container
        
        // 初始化任务名
        func (job *vcJob) initName(n string) *vcJob {
           job.Name = n
           return job
        }
        ...
        // 初始化task名
        func (task vcTask) initTaskName(tn string) vcTask {
           task.Name = tn
           return task
        }
        ...
        // 初始化容器环境变量
        func (ct container) initContainerEnv(framework string) container {
           ct.Env = []v1.EnvVar{
              {
                 Name: "mindx-dls-test", // 任务名称
                 ValueFrom: &v1.EnvVarSource{
                    FieldRef: &v1.ObjectFieldSelector{FieldPath: "metadata.name"},
                 },
              },
              {
                 Name: "XDL_IP", // 固定字段
                 ValueFrom: &v1.EnvVarSource{
                    FieldRef: &v1.ObjectFieldSelector{FieldPath: "status.hostIP"},
                 },
              },
              {
                 Name:  "framework",
                 Value: framework, // 使用的训练框架名称。支持MindSpore、PyTorch和Tensorflow
              },
           }
           return ct
        }

  4. 创建命名空间,以vcjob为例,示例如下。

    clientset.CoreV1().Namespaces().Create(context.TODO(), newNameSpace("vcjob"), metav1.CreateOptions{})

  5. (可选)如果通过文件配置资源信息,还需要创建ranktable的ConfigMap,示例如下。

    clientset.CoreV1().ConfigMaps(job.Namespace).Create(context.TODO(), newConfigMap("rings-config-"+job.Name), metav1.CreateOptions{})

  6. 调用Create接口,下发训练任务。

    • Ascend Job
      acjobClient.BatchV1().Jobs("default").Create(context.TODO(), &job, metav1.CreateOptions{})
    • Volcano Job
      vcjobClient.BatchV1alpha1().Jobs("vcjob").Create(context.TODO(), &job, metav1.CreateOptions{})

  7. 查看任务进程。调用Get接口查看job是否创建成功。

    • Ascend Job
      acjobClient.BatchV1().Jobs("default").Get(context.TODO(), job.Name, metav1.GetOptions{})
    • Volcano Job
      vcjobClient.BatchV1alpha1().Jobs("vcjob").Get(context.TODO(), job.Name, metav1.GetOptions{})

  8. 删除任务。调用Delete接口删除任务。

    • Ascend Job
      acjobClient.BatchV1().Jobs("default").Delete(context.TODO(), job.Name, metav1.DeleteOptions{})
    • Volcano Job
      vcjobClient.BatchV1alpha1().Jobs("vcjob").Delete(context.TODO(), job.Name, metav1.DeleteOptions{})

集成后使用

  1. 制作相应的镜像,可参考制作镜像章节进行操作。
  2. 完成相应的脚本适配,可参考脚本适配章节进行操作。
  3. 创建任务。
  4. 运行训练任务。可通过平台配置并创建训练任务,下发任务后查看结果。