Use After Integration
This section uses the full NPU scheduling feature as an example to describe how to integrate the feature on an AI platform. When delivering a training job, the platform needs to obtain the authentication file, create a client, create a job object, create a namespace, and call the corresponding job delivery API. The example YAML file provided for full NPU scheduling will be converted into an API object of the Go programming language provided by Kubernetes.
Description Before Integration
There are many APIs involved in the integration operations. You can refer to related official websites for detailed information on the APIs as required. This document will not provide further explanation.
- For details about Kubernetes APIs, see the official API library of Kubernetes based on the programming language.
- For details about Ascend Job parameters, see Table 1.
- For details about Volcano Job APIs, see Creating a Volcano Job in Cloud Container Instance API Reference.
- You can run the npu-smi info command to query the number in the chip model name, which is indicated by the Name field in the returned message. As an example below, the value of {xxx} is 910.
Integration Operations
- Obtain the Kubernetes authentication file.
You can select a proper cluster authentication mode as required and create the corresponding cluster configuration using ServiceAccount in InCluster mode. The sample code is as follows:
// Use ServiceAccount to configure a cluster (InCluster mode). if config, err = rest.InClusterConfig(); err != nil { // Use the KubeConfig file to configure a cluster. if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil { panic(err.Error()) } } - Create a client set.
Client, err := NewForConfig(cfg)
The function prototype of NewForConfig(cfg) is NewForConfig(c *rest.Config)(*Clientset, error).
Parameters are described as follows:
- *rest.Config: client configuration file, which is generated by the API provided by Kubernetes. The file contains information such as the cluster host and certificate.
- *Clientset: client set, including the Ascend Job client (or Volcano Job client) and discovery client.
- error: error information.
- Create a job object. To configure resource information using the environment variables, you need to create an Ascend Job object. To configure resource information using a file, you need to create a Volcano Job object.
Before performing this step, you are advised to read Preparation of Job YAML Files carefully to understand the implementation logic and key fields of the example YAML file, which can better assist you in the following operations.
- Create an Ascend Job object.
Create an Ascend Job object and initialize Ascend Job fields. The following is an example:
import ( v1 "ascend-operator-apis/pkg/apis/batch/v1" "ascend-operator-apis/pkg/client/clientset/versioned" commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/clientcmd" ) func initAcJob() v1.AscendJob { job := newAcJob(). initName("default-test-pytorch"). // Initialize the job name. initNameSpace("default"). // Initialize the namespace. initLabels(map[string]string{ // Initialize the job label. "ring-controller.atlas": "ascend-{xxx}b", // Identify the processor type used by a job. "framework": "pytorch", // Name of the training framework in use. "tor-affinity": "normal-schema", // Whether to use switch affinity scheduling. The value large-model-schema indicates that the foundation model scheduling mode is used. The value normal-schema indicates that the common job scheduling mode is used. The value null indicates that switch affinity scheduling is disabled. "podgroup-sched-enable": "true", // Configured only when the openFuyao-customized Kubernetes and volcano-ext are used in the cluster. If the value is true, batch scheduling is enabled. If another value is used or this parameter is not set, batch scheduling is disabled and common scheduling is used. }). initSchedulerName("volcano"). // Initialize the scheduler name. initRunPolicy(&maNum). // Initialize RunPolicy. initSuccessPolicy(). addReplicaSpecs("Master", newReplica(). // Initialize the Master replica. initRcReplicas(&rcNum). // Initialize the number of pod replicas. initRcRestartPolicy(commonv1.RestartPolicyNever). // Initialize the container restart policy. initRcLabels(map[string]string{ // Initialize the Master label. "ring-controller.atlas": "ascend-{xxx}b", // Identify the processor type used by a job. }). // initRcNodeSelector(map[string]string{ // Initialize the NodeSelector of the Master. "host-arch": "huawei-x86", // (Optional) Set it as required. "accelerator-type": "module-{xxx}b-16", // Set this parameter based on the server type. For details about the value, see the YAML parameters. }). initRcVolumes(). // Initialize the mounting options. addRcContainers(newContainer(). initContainerName("ascend"). // Initialize the container name. initContainerImage("pt-arm:b120"). // Initialize the image name. initContainerImagePullPolicy(corev1.PullIfNotPresent). // Initialize the image pulling policy. initContainerEnv(). // Initialize container environment variables. initContainerCommand([]string{"/bin/bash", "-c", "bash train_start.sh ..."}). // Initialize the container startup command. For details about the parameters, see the example YAML file. initContainerArgs([]string{"/bin/bash", "-c", "bash train_start.sh ..."}). // Initialize the container startup command. For details about the parameters, see the example YAML file. initContainerPorts(2222). // Initialize the container port. initContainerLimits("huawei.com/Ascend910", "8"). // Initialize job resources. initContainerRequests("huawei.com/Ascend910", "8"). // Initialize job resources. initContainerVolumeMounts()). // Initialize container mounting options. initReplica()). addReplicaSpecs("Worker", newReplica(). // Initialize the Worker replica. initRcReplicas(&rcNum). // Initialize the number of pod replicas. initRcRestartPolicy(commonv1.RestartPolicyNever). // Initialize the container restart policy. initRcLabels(map[string]string{ // Initialize the Worker label. "ring-controller.atlas": "ascend-{xxx}b", // Identify the processor type used by a job. }). initRcAffinity("default-test-pytorch"). // Initialize the anti-affinity field of the Worker. initRcNodeSelector(map[string]string{ // Initialize the NodeSelector of the Worker. "host-arch": "huawei-x86", // Set this parameter based on the actual architecture. If the architecture is Arm, set the value to huawei-arm. "accelerator-type": "module-{xxx}b-8", // Set this parameter based on the server type. For details about the value, see the YAML parameters. }). initRcVolumes(). addRcContainers(newContainer(). initContainerName("ascend"). // Initialize the container name. initContainerImage("pt-arm:b120"). // Initialize the image name. initContainerImagePullPolicy(corev1.PullIfNotPresent). // Initialize the image pulling policy. initContainerEnv(). // Initialize container environment variables. initContainerCommand([]string{"/bin/bash", "-c", "bash train_start.sh ..."}). // Initialize the container startup command. For details about the parameters, see the example YAML file. initContainerArgs([]string{"/bin/bash", "-c", "bash train_start.sh ..."}). // Initialize the container startup command. For details about the parameters, see the example YAML file. initContainerPorts(2222). // Initialize the container port. initContainerLimits("huawei.com/Ascend910", "8"). // Initialize job resources. initContainerRequests("huawei.com/Ascend910", "8"). // Initialize job resources. initContainerVolumeMounts()). initReplica()) return v1.AscendJob(job) } type acJob v1.AscendJob type Replica commonv1.ReplicaSpec type container corev1.Container func (job acJob) initRunPolicy(n *int32) acJob { job.Spec.RunPolicy = commonv1.RunPolicy{SchedulingPolicy: &commonv1.SchedulingPolicy{MinAvailable: n, Queue: "default"}} return job } ... func (rc Replica) initRcReplicas(rs *int32) Replica { rc.Replicas = rs return rc } ... func (ct container) initContainerEnv() container { ct.Env = []corev1.EnvVar{ { Name: "XDL_IP", ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{FieldPath: "status.hostIP"}, }, }, } return ct } - Create a Volcano Job object.
- Initialize the ConfigMap mounted to a Volcano Job. Specifically, initialize ConfigMap fields. The following is an example:
import "k8s.io/api/core/v1" func newConfigMap(name string) *v1.ConfigMap { cm := &v1.ConfigMap{} cm.Name = name cm.Labels = map[string]string{ "ring-controller.atlas": "ascend-{xxx}b", # Identify the processor type used by a job. } cm.Data = map[string]string{ "hccl.json": `{"status": "initializing"}`, } return cm } - Initialize the Volcano Job. Create a Volcano Job object and initialize Volcano Job fields. The following is an example:
import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "volcano.sh/apis/pkg/apis/batch/v1alpha1" "volcano.sh/apis/pkg/client/clientset/versioned" ) func initJob() v1alpha1.Job { job := newJobBuilder(). initNameSpace("vcjob"). // Initialize the namespace. initName("mindx-dls-test"). // Initialize the job name. initLabels(map[string]string{ // Initialize the job label. "ring-controller.atlas": "ascend-{xxx}b", // Identify the processor type used by a job. }). initMinAvailable(2). // minAvailable indicates the minimum number of pods required for running the job. Only when the number of pods in the running state in the job is not less than minAvailable, the job is considered to be running normally. initSchedulerName("volcano"). // Use Volcano for scheduling. You can change the value based on the actual requirements. initPolicies([]v1alpha1.LifecyclePolicy{{Event: "PodEvicted", Action: "RestartJob"}}). initPlugins(map[string][]string{"ssh": {}, "env": {}, "svc": {}}). // Initialize the scheduling plugin. initMaxRetry(3). // maxRetry indicates the maximum number of times that the job can be restarted. initQueue("default" ). // queue indicates the queue to which the job belongs. addTask(v1alpha1.TaskSpec(newTaskSpec(). initTaskName("default-test"). // Initialize the task name. initTaskReplicas(2). // Initialize the number of task replicas. initTaskAffinity("mindx-dls-test"). // Initialize the task anti-affinity field. The input value is the same as the task name. initTaskLabels(map[string]string{ "app": "mindspore", // Fixed field "ring-controller.atlas": "ascend-{xxx}b", // Identify the processor type used by a job. }). initTaskNodeSelector(map[string]string{ "host-arch": "huawei-x86", // (Optional) Set it as required. "accelerator-type": "module-{xxx}b-8", // Set this parameter based on the server type. }). initTaskVolumes(). // Initialize the mounting options. addTaskContainers(v1.Container(newContainer(). initContainerName("mindspore"). // Initialize the container name. initContainerImage("ms-arm:b120"). // Initialize the image name. initContainerImagePullPolicy("IfNotPresent"). // Initialize the image pulling policy. initContainerLimits("huawei.com/Ascend910", "8"). // Initialize job resources. initContainerRequests("huawei.com/Ascend910", "8"). // Initialize job resources. initContainerVolumeMounts(). // Initialize container mounting options. initContainerEnv("MindSpore"). // Initialize container environment variables. initContainerCommand([]string{"/bin/bash", "-c", "bash train_start.sh ..."}))))) // Initialize the container startup command. For details about the parameters, see the example YAML file. return v1alpha1.Job(job) } type vcJob v1alpha1.Job type vcTask v1alpha1.TaskSpec type container v1.Container // Initialize the job name. func (job *vcJob) initName(n string) *vcJob { job.Name = n return job } ... // Initialize the task name. func (task vcTask) initTaskName(tn string) vcTask { task.Name = tn return task } ... // Initialize container environment variables. func (ct container) initContainerEnv(framework string) container { ct.Env = []v1.EnvVar{ { Name: "mindx-dls-test", // Job name ValueFrom: &v1.EnvVarSource{ FieldRef: &v1.ObjectFieldSelector{FieldPath: "metadata.name"}, }, }, { Name: "XDL_IP", // Fixed field ValueFrom: &v1.EnvVarSource{ FieldRef: &v1.ObjectFieldSelector{FieldPath: "status.hostIP"}, }, }, { Name: "framework", Value: framework, // Name of the training framework in use, which can be MindSpore, PyTorch, or TensorFlow. }, } return ct }
- Initialize the ConfigMap mounted to a Volcano Job. Specifically, initialize ConfigMap fields. The following is an example:
- Create an Ascend Job object.
- Create a namespace. The following uses vcjob as an example:
clientset.CoreV1().Namespaces().Create(context.TODO(), newNameSpace("vcjob"), metav1.CreateOptions{}) - (Optional) If resource information is configured using a file, you need to create a ConfigMap of RankTable. The following is an example:
clientset.CoreV1().ConfigMaps(job.Namespace).Create(context.TODO(), newConfigMap("rings-config-"+job.Name), metav1.CreateOptions{}) - Call the Create API to deliver a training job.
- AscendJob
acjobClient.BatchV1().Jobs("default").Create(context.TODO(), &job, metav1.CreateOptions{}) - VolcanoJob
vcjobClient.BatchV1alpha1().Jobs("vcjob").Create(context.TODO(), &job, metav1.CreateOptions{})
- AscendJob
- View the job progress. Call the Get API to check whether the job is successfully created.
- AscendJob
acjobClient.BatchV1().Jobs("default").Get(context.TODO(), job.Name, metav1.GetOptions{}) - VolcanoJob
vcjobClient.BatchV1alpha1().Jobs("vcjob").Get(context.TODO(), job.Name, metav1.GetOptions{})
- AscendJob
- Delete a job. Call the Delete API to delete the job.
- AscendJob
acjobClient.BatchV1().Jobs("default").Delete(context.TODO(), job.Name, metav1.DeleteOptions{}) - VolcanoJob
vcjobClient.BatchV1alpha1().Jobs("vcjob").Delete(context.TODO(), job.Name, metav1.DeleteOptions{})
- AscendJob
Use After Integration
- Create a corresponding image. For details, see Creating an Image.
- Complete script adaptation. For details, see Script Adaptation.
- Create a job
- Run the job. The platform allows you to configure and create a training job, deliver the job, and view the result.