本章节以整卡调度特性为例,介绍如何将整卡调度特性集成在AI平台上的关键操作步骤。在下发训练任务时,平台需要实现获取认证文件,创建客户端,创建Job对象,创建命名空间和调用接口下发训练任务等,将整卡调度特性提供的示例YAML转换成K8s提供的Go编程语言的API对象。
集成操作中会涉及到很多接口,请用户根据实际情况去相关官网了解接口的详细信息,本文档不再进行二次说明。
用户根据实际情况选择合适的集群认证方式,创建相应的集群配置。使用ServiceAccount创建集群配置(InCluster模式)示例代码如下。
// 使用ServiceAccount创建集群配置(InCluster模式) if config, err = rest.InClusterConfig(); err != nil { // 使用KubeConfig文件创建集群配置 if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil { panic(err.Error()) } }
Client, err := NewForConfig(cfg)
NewForConfig(cfg)的函数原型为NewForConfig(c *rest.Config)(*Clientset, error)。
参数说明如下:
创建Ascend Job对象,初始化Ascend Job相关字段,示例如下。
import ( v1 "ascend-operator-apis/pkg/apis/batch/v1" "ascend-operator-apis/pkg/client/clientset/versioned" commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/clientcmd" ) func initAcJob() v1.AscendJob { job := newAcJob(). initName("default-test-pytorch"). // 初始化任务名 initNameSpace("default"). // 初始化命名空间 initLabels(map[string]string{ // 初始化任务标签 "ring-controller.atlas": "ascend-{xxx}b", // 标识任务使用的芯片的产品类型 "framework": "pytorch", // 使用的训练框架名 "tor-affinity": "normal-schema", // 是否使用交换亲和性调度。value为large-model-schema,表示使用大模型调度模式;value为normal-schema,使用普通任务调度模式;value为null,表示关闭交换机亲和性调度 }). initSchedulerName("volcano"). // 初始化调度器名 initRunPolicy(&maNum). // 初始化RunPolicy initSuccessPolicy(). addReplicaSpecs("Master", newReplica(). // 初始化Master副本 initRcReplicas(&rcNum). // 初始化pod副本数 initRcRestartPolicy(commonv1.RestartPolicyNever). // 初始化容器重启策略 initRcLabels(map[string]string{ // 初始化Master的标签 "ring-controller.atlas": "ascend-{xxx}b", // 标识任务使用的芯片的产品类型 }). // initRcNodeSelector(map[string]string{ // 初始化Master的NodeSelector "host-arch": "huawei-x86", // 可选字段,用户根据实际需求进行配置 "accelerator-type": "module-{xxx}b-16", // 用户根据服务器类型进行配置,取值可以参考YAML参数 }). initRcVolumes(). // 初始化挂载项 addRcContainers(newContainer(). initContainerName("ascend"). // 初始化容器名 initContainerImage("pt-arm:b120"). // 初始化镜像名 initContainerImagePullPolicy(corev1.PullIfNotPresent). // 初始化镜像拉取策略 initContainerEnv(). // 初始化容器环境变量 initContainerCommand([]string{"/bin/bash", "-c", "bash train_start.sh ..."}). // 初始化容器启动命令,具体参数参考示例YAML initContainerArgs([]string{"/bin/bash", "-c", "bash train_start.sh ..."}). // 初始化容器启动命令,具体参数参考示例YAML initContainerPorts(2222). // 初始化容器端口 initContainerLimits("huawei.com/Ascend910", "8"). // 初始化任务资源 initContainerRequests("huawei.com/Ascend910", "8"). // 初始化任务资源 initContainerVolumeMounts()). // 初始化容器挂载项 initReplica()). addReplicaSpecs("Worker", newReplica(). // 初始化Worker副本 initRcReplicas(&rcNum). // 初始化pod副本数 initRcRestartPolicy(commonv1.RestartPolicyNever). // 初始化容器重启策略 initRcLabels(map[string]string{ // 初始化Worker的标签 "ring-controller.atlas": "ascend-{xxx}b", // 标识任务使用的芯片的产品类型 }). initRcAffinity("default-test-pytorch"). // 初始化Worker的反亲和性字段 initRcNodeSelector(map[string]string{ // 初始化Worker的NodeSelector "host-arch": "huawei-x86", // 用户根据实际架构配置arm架构value为huawei-arm "accelerator-type": "module-{xxx}b-8", // 用户根据服务器类型进行配置,value值可以参考YAML参数 }). initRcVolumes(). addRcContainers(newContainer(). initContainerName("ascend"). // 初始化容器名 initContainerImage("pt-arm:b120"). // 初始化镜像名 initContainerImagePullPolicy(corev1.PullIfNotPresent). // 初始化镜像拉取策略 initContainerEnv(). // 初始化容器环境变量 initContainerCommand([]string{"/bin/bash", "-c", "bash train_start.sh ..."}). // 初始化容器启动命令,具体参数参考示例YAML initContainerArgs([]string{"/bin/bash", "-c", "bash train_start.sh ..."}). // 初始化容器启动命令,具体参数参考示例YAML initContainerPorts(2222). // 初始化容器端口 initContainerLimits("huawei.com/Ascend910", "8"). // 初始化任务资源 initContainerRequests("huawei.com/Ascend910", "8"). // 初始化任务资源 initContainerVolumeMounts()). initReplica()) return v1.AscendJob(job) } type acJob v1.AscendJob type Replica commonv1.ReplicaSpec type container corev1.Container func (job acJob) initRunPolicy(n *int32) acJob { job.Spec.RunPolicy = commonv1.RunPolicy{SchedulingPolicy: &commonv1.SchedulingPolicy{MinAvailable: n, Queue: "default"}} return job } ... func (rc Replica) initRcReplicas(rs *int32) Replica { rc.Replicas = rs return rc } ... func (ct container) initContainerEnv() container { ct.Env = []corev1.EnvVar{ { Name: "XDL_IP", ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{FieldPath: "status.hostIP"}, }, }, } return ct }
import "k8s.io/api/core/v1"
func newConfigMap(name string) *v1.ConfigMap {
cm := &v1.ConfigMap{}
cm.Name = name
cm.Labels = map[string]string{
"ring-controller.atlas": "ascend-{xxx}b", # 标识任务使用的芯片的产品类型
}
cm.Data = map[string]string{
"hccl.json": `{"status": "initializing"}`,
}
return cm
}
import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "volcano.sh/apis/pkg/apis/batch/v1alpha1" "volcano.sh/apis/pkg/client/clientset/versioned" ) func initJob() v1alpha1.Job { job := newJobBuilder(). initNameSpace("vcjob"). // 初始化命名空间 initName("mindx-dls-test"). // 初始化任务名 initLabels(map[string]string{ // 初始化任务标签 "ring-controller.atlas": "ascend-{xxx}b", // 标识任务使用的芯片的产品类型 }). initMinAvailable(2). // minAvailable表示运行该job所要运行的最少Pod数量。只有当job中处于running状态的Pod数量不小于minAvailable时,才认为该job运行正常 initSchedulerName("volcano"). // 使用Volcano作为调度,用户可根据实际情况修改 initPolicies([]v1alpha1.LifecyclePolicy{{Event: "PodEvicted", Action: "RestartJob"}}). initPlugins(map[string][]string{"ssh": {}, "env": {}, "svc": {}}). // 初始化调度插件 initMaxRetry(3). // maxRetry表示该job可以进行的最大重启次数 initQueue("default" ). // queue表示该job所属的队列 addTask(v1alpha1.TaskSpec(newTaskSpec(). initTaskName("default-test"). // 初始化task名 initTaskReplicas(2). // 初始化task副本数 initTaskAffinity("mindx-dls-test"). // 初始化任务反亲和性字段,输入值与任务名相同 initTaskLabels(map[string]string{ "app": "mindspore", // 固定字段 "ring-controller.atlas": "ascend-{xxx}b", // 标识任务使用的芯片的产品类型 }). initTaskNodeSelector(map[string]string{ "host-arch": "huawei-x86", // 可选字段,用户根据实际需求配置 "accelerator-type": "module-{xxx}b-8", // 用户根据服务器类型进行配置 }). initTaskVolumes(). // 初始化挂载项 addTaskContainers(v1.Container(newContainer(). initContainerName("mindspore"). // 初始化容器名 initContainerImage("ms-arm:b120"). // 初始化镜像名 initContainerImagePullPolicy("IfNotPresent"). // 初始化镜像拉取策略 initContainerLimits("huawei.com/Ascend910", "8"). // 初始化任务资源 initContainerRequests("huawei.com/Ascend910", "8"). // 初始化任务资源 initContainerVolumeMounts(). // 初始化容器挂载项 initContainerEnv("MindSpore"). // 初始化容器环境变量 initContainerCommand([]string{"/bin/bash", "-c", "bash train_start.sh ..."}))))) // 初始化容器启动命令,具体参数参考示例YAML return v1alpha1.Job(job) } type vcJob v1alpha1.Job type vcTask v1alpha1.TaskSpec type container v1.Container // 初始化任务名 func (job *vcJob) initName(n string) *vcJob { job.Name = n return job } ... // 初始化task名 func (task vcTask) initTaskName(tn string) vcTask { task.Name = tn return task } ... // 初始化容器环境变量 func (ct container) initContainerEnv(framework string) container { ct.Env = []v1.EnvVar{ { Name: "mindx-dls-test", // 任务名称 ValueFrom: &v1.EnvVarSource{ FieldRef: &v1.ObjectFieldSelector{FieldPath: "metadata.name"}, }, }, { Name: "XDL_IP", // 固定字段 ValueFrom: &v1.EnvVarSource{ FieldRef: &v1.ObjectFieldSelector{FieldPath: "status.hostIP"}, }, }, { Name: "framework", Value: framework, // 使用的训练框架名称。支持MindSpore、PyTorch和Tensorflow }, } return ct }
clientset.CoreV1().Namespaces().Create(context.TODO(), newNameSpace("vcjob"), metav1.CreateOptions{})
clientset.CoreV1().ConfigMaps(job.Namespace).Create(context.TODO(), newConfigMap("rings-config-"+job.Name), metav1.CreateOptions{})
acjobClient.BatchV1().Jobs("default").Create(context.TODO(), &job, metav1.CreateOptions{})
vcjobClient.BatchV1alpha1().Jobs("vcjob").Create(context.TODO(), &job, metav1.CreateOptions{})
acjobClient.BatchV1().Jobs("default").Get(context.TODO(), job.Name, metav1.GetOptions{})
vcjobClient.BatchV1alpha1().Jobs("vcjob").Get(context.TODO(), job.Name, metav1.GetOptions{})
acjobClient.BatchV1().Jobs("default").Delete(context.TODO(), job.Name, metav1.DeleteOptions{})
vcjobClient.BatchV1alpha1().Jobs("vcjob").Delete(context.TODO(), job.Name, metav1.DeleteOptions{})