开发者
资源
NPU虚拟化软切分参考实践

NPU虚拟化软切分参考实践

昇腾部署算力切分

发表于 2026/03/10

非商用声明

该文档提供的内容为参考实践,仅供用户参考使用,用户可参考文档构建自己的软件,产品化方案需进行安全、可靠性加固,不能直接将相关Demo或镜像文件集成到商用产品中。

1 方案介绍

1.1 背景

在教育AI实训场景中,学生通过各类模型训练和推理实验,掌握NLP、CV、LLM等模型知识。在该场景下,对性能要求相对较低,但是对硬件成本比较敏感。需要同时支持多个学生进行上机操作,通常单卡需要同时运行不少于8个实训任务。

针对该场景,采用软切分方案,基于时分复用的算力分配方案实现更细粒度的算力分配,可支持单卡同时运行≥8个实训任务,大幅降低AI教学的成本,并显著提升硬件资源利用率。

1.2 方案简介

本软切分方案主要基于时间片轮转实现算力资源控制,支持多种资源调度模式。方案示意图如下:

图1 软切分方案示意图

1.3 关键规格

1、支持硬件:本切分方案支持Atlas 300I A2、Atlas 800I A2。

2、支持场景:软切分特性支持推理、训练场景。

3、AI Core切分粒度不低于1%。

4、显存切分最小粒度1MB。

5、支持模型:已验证模型支持情况(支持模型不限于下表,如超出本表范围,可按需自行验证)

模型

类型

Atlas 300I A2

Atlas 800I A2

YOLOv8

训练、推理

YOLOv12

训练、推理

ResNet50

训练、推理

Bert+ERNIE

训练、推理

BiLstm+Crf

训练、推理

TextRCNN

训练、推理

FaceNet

训练、推理

Transformer

训练、推理

U-Net

训练、推理

BERT

训练、推理

MobileNet

训练、推理

Qwen3-VL-8B

推理

DeepSeek-R1-Distill-Qwen-7B

推理

bge-m3

推理

Qwen3-32B-w8a8

推理

Qwen3-0.6B

推理

DeepSeek-R1-1.5B

推理

2 使用指导

2.1 软件配置

本参考实践使用的软件配套版本如下:

软件/镜像

版本

说明

Ascend HDK

25.5.0

NPU驱动固件

CANN

8.5.0

昇腾异构计算架构

Docker

18.09.0

用于部署容器,支持18.09.x及以上版本

Kubernetes

1.17.x~1.34.x,推荐使用1.19.x及以上版本。

用于容器调度、管理(使用Docker部署则不需要)

OS

openEuler release 22.03 (LTS-SP4)

物理机操作系统(请使用昇腾官方兼容的OS)

vCANN-RT

ubs-virt/ubs-virt-enpu/README.md · openEuler/ubs-virt - AtomGit | GitCode

软切分相关组件,用于算力和显存资源控制

Volcano

26.0.0

用于在Kubernetes集群场景下将容器调度到合适的计算节点(使用Docker部署则不需要)

Ascend Device Plugin

26.0.0

用于在Kubernetes集群场景下采集NPU资源信息,生成资源配置文件(使用Docker部署则不需要)

Ascend Operator

26.0.0

用于在Kubernetes集群场景下做任务下发(使用Docker部署则不需要)

ClusterD26.0.0用于在Kubernetes集群场景下做NPU资源调度(使用Docker部署则不需要)

2.2 使用约束

1、每个容器分配的资源总和不能超过单卡资源。

2、一个容器最多只能使用一个物理NPU。

3、NPU需开启共享模式。

4、仅支持普通容器,不支持特权容器。

5、由于vCANN-RT解决方案使用了共享内存,需确保是可信且被授权的用户使用。

6、单个物理NPU卡支持的最大容器数量为100个,单个软切分容器内支持的最大进程数为128。

7、若用户自己编译软切分动态库,则需要保证宿主机上CANN的版本,跟镜像中CANN版本保持一致。

2.3 使用流程

本软切分方案使用流程图如下所示。

图1

2.3.1 准备工作

编译机:用于编译软切分库;请确保编译机与执行机的编译器版本、OS版本、系统内核保持一致,避免出现兼容性问题。

执行机:进行软切分业务的昇腾服务器

2.3.1.1 编译机任务

请在编译机上完成以下操作:

1. 从vCANN-RT仓库拉取源码:

git clone https://gitcode.com/openeuler/ubs-virt.git

2. 设置CANN环境变量:

source /usr/local/Ascend/ascend-toolkit/set_env.sh

3. 执行编译脚本(若当前编译环境安装的CANN版本不是8.5.0,则不需要增加编译参数 “8.5.0” ):

cd ./ubs-virt/ubs-virt-enpu/vcann-rt/
bash make_build.sh 8.5.0

4. 编译完成之后,会在build目录下面产生相应的编译产物

2.3.1.2 执行机任务

请在执行机上进行如下操作,安装软切分库:

1. 将编译产物拷贝到所有执行机的对应目录下。

cp libvruntime.so /opt/enpu/vcann-rt/lib/
cp enpu-monitor /opt/enpu/vcann-rt/tools/

2.(k8s部署必选)创建preload文件,创建方法如下:

1)创建ld.so.preload文件。

vim ld.so.preload

2)写入libvruntime.so的固定安装路径,用于后面K8s挂载vCANN-RT到容器

/opt/enpu/vcann-rt/lib/libvruntime.so

3.(可选)若编译时出现 libboundscheck 相关报错,需要先编译安装libboundscheck.so。

git clone https://gitcode.com/openeuler/libboundscheck.git
cd libboundscheck/
make CC=gcc
cp lib/libboundscheck.so /usr/lib64/libboundscheck

仓库链接:libboundscheck

4. 对某张NPU卡使能软切分前,需要在执行机上开启容器共享模式,确保NPU卡上无进程占用后,执行以下命令完成开启:

#设置指定设备的所有芯片的容器共享模式
npu-smi set -t device-share -i ${id} -d ${value}
#设置指定设备的指定芯片的容器共享模式
npu-smi set -t device-share -i ${id} -c ${chip_id} -d ${value}
#查看对应设备当前容器共享状态
npu-smi info -t device-share -i ${id}

若使用k8s部署,全部芯片必须开启容器共享模式:

for i in {0..7}; do npu-smi set -t device-share -i ${i} -d 1 ; done

参数说明

id

设备id。通过npu-smi info -l 命令查出的NPU ID即为设备id。

chip_id

芯片id。通过npu-smi info -m 命令查出的Chip ID即为芯片id。

value

容器共享模式使能状态,默认禁用。


    0:禁用


    1:使能

参考链接:ubs-virt/ubs-virt-enpu/README.md

2.3.2 部署方式1:基于Kubernetes部署

Kubernetes软切分方案各组件调用流程图如下。

图1 调用流程图

2.3.2.1 前置条件

1、执行机已安装Kubernetes

2、执行机已安装Docker

3、执行机已按2.3.1章节完成准备工作

2.3.2.2 安装MindCluster组件

1. 下载所需软件包:MindCluster 26.0.0

1)Ascend-mindxdl-device-plugin_26.0.0_linux-aarch64.zip

2)Ascend-mindxdl-volcano_26.0.0_linux-aarch64.zip

3)Ascend-mindxdl-ascend-operator_26.0.0_linux-aarch64.zip

4)Ascend-mindxdl-clusterd_26.0.0_linux-aarch64.zip

2. 安装部署:链接

1) 安装前准备:参考安装前准备,准备好各组件的镜像,以及安装所需的环境(节点标签、用户、日志目录、命名空间等)。

2) 给当前节点添加软切分所需标签:

kubectl label node ${host_name} huawei.com/scheduler.chip1softsharedev.enable=true --overwrite

说明:

${host_name}可通过 kubectl get node -A命令查询;

之后可通过 kubectl get node --show-labels 命令查询标签是否添加成功。

3) 安装device-plugin:参考安装device-plugin,启动device-plugin组件服务。

注意:必须下边的device-plugin-volcano-v26.0.0.yaml文件来启动device-plugin组件。

apiVersion: v1
kind: ServiceAccount
metadata:
  name: ascend-device-plugin-sa-910
  namespace: kube-system
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: pods-node-ascend-device-plugin-role-910
rules:
  - apiGroups: [""]
    resources: ["pods"]
    verbs: ["get", "list", "update", "watch", "patch"]
  - apiGroups: [""]
    resources: ["nodes"]
    verbs: ["get", "patch"]
  - apiGroups: [ "" ]
    resources: [ "nodes/proxy" ]
    verbs: [ "get" ]
  - apiGroups: [""]
    resources: ["nodes/status"]
    verbs: ["get", "patch", "update"]
  - apiGroups: [""]
    resources: ["configmaps"]
    verbs: ["get", "create", "update", "list", "watch"]
  - apiGroups: [ "" ]
    resources: [ "events" ]
    verbs: [ "create" ]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: pods-node-ascend-device-plugin-rolebinding-910
subjects:
  - kind: ServiceAccount
    name: ascend-device-plugin-sa-910
    namespace: kube-system
roleRef:
  kind: ClusterRole
  name: pods-node-ascend-device-plugin-role-910
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: ascend-device-plugin-daemonset-910
  namespace: kube-system
spec:
  selector:
    matchLabels:
      name: ascend-device-plugin-ds
  updateStrategy:
    type: RollingUpdate
  template:
    metadata:
      ##### For Kubernetes versions lower than 1.19, seccomp is used with annotations.
      annotations:
        scheduler.alpha.kubernetes.io/critical-pod: ""
        seccomp.security.alpha.kubernetes.io/pod: runtime/default
      labels:
        name: ascend-device-plugin-ds
    spec:
      ##### For Kubernetes version 1.19 and above, seccomp is used with securityContext:seccompProfile
#      securityContext:
#        seccompProfile:
#          type: RuntimeDefault
      tolerations:
        - key: CriticalAddonsOnly
          operator: Exists
        - key: huawei.com/Ascend910
          operator: Exists
          effect: NoSchedule
        - key: "device-plugin"
          operator: "Equal"
          value: "v2"
          effect: NoSchedule
      priorityClassName: "system-node-critical"
      nodeSelector:
        accelerator: huawei-Ascend910
      serviceAccountName: ascend-device-plugin-sa-910
      containers:
      - image: ascend-k8sdeviceplugin:v26.0.0
        name: device-plugin-01
        resources:
          requests:
            memory: 500Mi
            cpu: 500m
          limits:
            memory: 500Mi
            cpu: 500m
        command: [ "/bin/bash", "-c", "--"]
        args: [ "device-plugin  -useAscendDocker=true -volcanoType=true -presetVirtualDevice=true
                 -logFile=/var/log/mindx-dl/devicePlugin/devicePlugin.log -logLevel=0 -shareDevCount=100 -softShareDevConfigDir=/share_device/" ]
        securityContext:
          privileged: true
          readOnlyRootFilesystem: true
        imagePullPolicy: Never
        volumeMounts:
          - name: device-plugin
            mountPath: /var/lib/kubelet/device-plugins
          - name: vnpucfg
            mountPath: /etc/vnpu.cfg
          - name: vnpu-cut
            mountPath: /run/vnpu_cfg_lock
          - name: pod-resource
            mountPath: /var/lib/kubelet/pod-resources
          - name: hiai-driver
            mountPath: /usr/local/Ascend/driver
            readOnly: true
          - name: log-path
            mountPath: /var/log/mindx-dl/devicePlugin
          - name: tmp
            mountPath: /tmp
          - name: reset-config-dir
            mountPath: /user/restore/reset
          - name: lingqu-log
            mountPath: /var/log/lingqu
          - name: data-trace-file-dir
            mountPath: /user/cluster-info/datatrace-config
          - name: dpu-config # dpu config for A5
            mountPath: /user/mindx-dl/dpu
          - name: localtime
            mountPath: /etc/localtime
            readOnly: true
          - name: enpu-config-dir
            mountPath: /etc/enpu/
          - name: share-device-config-dir
            mountPath: /share_device/


        env:
          - name: NODE_NAME
            valueFrom:
              fieldRef:
                fieldPath: spec.nodeName
          - name: HOST_IP
            valueFrom:
              fieldRef:
                fieldPath: status.hostIP
      volumes:
        - name: device-plugin
          hostPath:
            path: /var/lib/kubelet/device-plugins
        - name: pod-resource
          hostPath:
            path: /var/lib/kubelet/pod-resources
        - name: hiai-driver
          hostPath:
            path: /usr/local/Ascend/driver
        - name: log-path
          hostPath:
            path: /var/log/mindx-dl/devicePlugin
            type: Directory
        - name: reset-config-dir
          hostPath:
            path: /user/restore/reset
            type: DirectoryOrCreate
        - name: data-trace-file-dir
          hostPath:
            path: /user/cluster-info/datatrace-config
            type: DirectoryOrCreate
        - name: vnpu-cut
          hostPath:
            path: /run/vnpu_cfg_lock
            type: DirectoryOrCreate
        - name: tmp
          hostPath:
            path: /tmp
        - name: vnpucfg
          hostPath:
            path: /etc/vnpu.cfg
            type: File
        - name: lingqu-log
          hostPath:
            path: /var/log/lingqu
            type: DirectoryOrCreate
        - name: dpu-config # dpu config for A5
          hostPath:
            path: /user/mindx-dl/dpu-config
            type: DirectoryOrCreate
        - name: localtime
          hostPath:
            path: /etc/localtime
        - name: enpu-config-dir
          hostPath:
            path: /etc/enpu/
        - name: share-device-config-dir
          hostPath:
            path: /share_device/
            type: DirectoryOrCreate

其中这三部分代码为该文件涉及修改处。

       args: [ "device-plugin  -useAscendDocker=true -volcanoType=true -presetVirtualDevice=true
                 -logFile=/var/log/mindx-dl/devicePlugin/devicePlugin.log -logLevel=0 -shareDevCount=100 -softShareDevConfigDir=/share_device/" ]
         - name: enpu-config-dir
           mountPath: /etc/enpu/
         - name: share-device-config-dir
           mountPath: /share_device/
        - name: enpu-config-dir
          hostPath:
            path: /etc/enpu/
        - name: share-device-config-dir
          hostPath:
            path: /share_device/
            type: DirectoryOrCreate

4) 安装volcano:参考安装volcano,启动volcano组件服务。

5) 安装clusterd:参考 安装clusterd ,启动clusterd组件服务。

6) 安装ascend-operator:参考安装ascend-operator,启动ascend-operator组件服务。

警告:

建议严格按照 3 -> 4 -> 5 -> 6 的顺序来启动MindCluster组件,否则可能会导致k8s出现异常。

2.3.2.3 启动业务容器

1. 创建vnpu-base.yaml文件。yaml文件内容可从 README 处获取

说明

可根据实际业务进行参数配置:

1)name: 容器名

2)namespace: 命名空间名称

3) huawei.com/scheduler.softShareDev.policy: 调度策略,默认为弹性模式。1:固定配额模式(fixed-share), 2: 弹性模式(elastic), 3: 争抢模式(best-effort)。

       - 固定配额模式(fixed-shared mode):各容器按配比严格执行时间片。

             a)若有暂未分配的NPU资源,则在最后一个容器分配到的时间片消耗完后,睡眠一段时间。此时NPU将闲置空转。

             b)某容器无任务运行,仍会给定比例消耗完该分配的时间片。

       - 弹性模式(elastic mode):由于固定配额模式可能出现NPU资源浪费,在弹性模式下,在各容器按配比执行时间片的基础上,优化了调度逻辑。

             a)最后一个容器分配的时间片消耗完,马上将NPU使用权释放给下一个容器,跳过睡眠逻辑。

             b)若某容器无任务运行,则直接跳过未消耗完的时间片,切换至另一个容器。

       - 争抢模式(best-effort mode):容器之间自行争抢NPU资源,该模式的资源利用率最高,但无法保证容器的QoS。

4)huawei.com/scheduler.softShareDev.aicoreQuota:算力AI Core配额,取值必须为1~100之间的整数。单位:(%)(注意:单卡绑定的所有容器分配的资源之和不能超过单卡的总资源)

5)huawei.com/scheduler.softShareDev.hbmQuota:显存配额,取值必须为1的整数倍。单位:(GB)(注意:单卡绑定的所有容器分配的资源之和不能超过单卡的总资源)

6)image: 镜像名

7)resources.limits.huawei.com/Ascend910:与aicoreQuota保持一致

8)resources.requests.huawei.com/Ascend910:与aicoreQuota保持一致

注意:

a) 虚拟化环境检测工具准备:用户需要保证使用的镜像中/usr/bin目录内包含systemd-detect-virt命令行工具。

该工具用于检测系统的运行环境是否为虚拟化环境和虚拟化方式。若容器内未安装systemd-detect-virt工具,软切分动态库在调用dcmi接口时会出现故障。

b)

若在使用mindie镜像时出现如下报错:

请在vnpu-base.yaml中添加以下mindie相关挂载目录:

2. 根据上述yaml文件的metadata.namespace值,创建命名空间vnpu。

kubectl create namespace vnpu

3. 启动容器。

kubectl apply -f vnpu-base.yaml

4. 启动AI任务前,可通过环境变量配置日志级别:

export ENPU_LOG_LEVEL=1

说明:

日志级别由高到低分别是FATAL(0), ERROR(1), WARN(2), INFO(3), DEBUG(4)

默认日志级别为INFO。

为获得良好使用体验,避免使用过程中大量日志打屏,推荐先设为ERROR。

AI任务启动时,会自动拉起软切分算力控制和显存控制服务。回显内容“global init Success”,表示软切分相关服务启动成功。

5. k8s拉起容器后,进入容器运行AI任务。

# 查看容器
kubectl get pods -n ${namespace}

6. 此外,在容器内可通过查询配置文件获取NPU资源配额等信息:

cat /etc/enpu/npu_info.config

7. 在容器内可通过监测工具查询NPU资源配额和内存使用情况等信息:

/opt/enpu/vcann-rt/tools/enpu-monitor

2.3.3 部署方式2:基于纯Docker部署

Docker软切分方案架构图如下所示:

图1 Docker软切分方案架构图

当不依赖k8s组件使用vCANN-RT时,需要用户在启动容器时自行挂载软切分相关动态库、文件和设备。例如软件分动态库、配置文件、共享内存设备和物理NPU设备。具体步骤见下文。

2.3.3.1 前置条件

1、执行机已安装Docker

2、执行机已安装软切分库

3、执行机已按2.3.1章节完成准备工作

2.3.3.2 创建npu_info配置文件

针对每个容器,在主机侧任意处创建一个配置文件npu_info.config,用于映射到容器中。

mkdir -p /data/vnpu1
vim /data/vnpu1/npu_info.config

说明:

由于不同容器的配置内容不同,需确保每个容器的配置文件在主机侧独立存储明确区分

可通过文件名后缀路径区分,例如:/data/vnpu1/npu_info.config,/data/vnpu2/npu_info.config

配置文件npu_info.config的格式和字段示例如下:

physical-npu-id=0
virtual-npu-id=0
aicore-quota=20memory-quota=1024
shm-id=7426Exx-xxxxxxx-xxxxxxx-xxxxxxx-x00301E3
scheduling-policy=1

最后,设置文件为644权限:

chmod 644 ./npu_info.config

注意:

当用户错误配置文件时,只有在容器业务拉起后才会报错。例如,若用户配置aicore-quota=120,算力资源配额已经超出100%。在拉起容器时不会报错,只有当容器内使用NPU的进程启动后,才会返回报错。此时,用户需要在宿主机上修改配置文件,并重新拉起容器。

说明:

1. physical-npu-id:物理NPU id。physical-npu-id=0 表示当前容器使用第0张物理NPU。

   - 可通过执行ls /dev/davinci*命令获取芯片的物理id。/dev/davinci0,表示芯片的物理id为0。

2. virtual-npu-id:vNPU id。vNPU id需要从0开始配置。且同一个物理NPU下的vNPU id不允许重复,若多个容器配置相同的vNPU id,代表着这几个容器将抢占同一个时间片。virtual-npu-id=0 表示当前容器使用的是第1个NPU时间片。vNPU id取值范围为0-99。

3. aicore-quota:AI Core资源配额,单位为 %,表示算力使用时间比例。当前每个time slice默认为100ms,暂不支持动态配置。假设用户申请了20%的算力资源,那么该用户有20ms的NPU使用权。

4. memory-quota:显存资源配额,单位为 MB,表示显存资源使用容量。当前容器内所有进程使用的显存数量不能超过显存资源配额。

5. shm-id:共享内存文件名称。该文件名称采用物理NPU对应的VDie ID,每个物理NPU的VDie ID可保证全局唯一。

   - 通过npu-smi info -t board -i ${id} -c ${chip_id}命令,查询VDie ID。每个NPU对应不同的id和chip_id。获取方式参考2.3.1.2节的 参数说明。

     

   - 配置共享内存文件名称:查询到VDie ID后,使用"-"连接数字。例如,shm-id=7426Exx-xxxxxxx-xxxxxxx-xxxxxxx-x00301E3。

     

6. scheduling-policy:调度策略,推荐配置为2。1 表示 fixed-share mode,2 表示 elastic mode,3 表示 best-effort mode。三种模式的具体含义参考 2.3.1.5节 的说明。

2.3.3.3 启动业务容器

1 .创建docker容器,挂载相关路径。加粗部分为涉及修改处

docker run -it --name=container_name \
 --device=/dev/davinci0:/dev/davinci0 \
 --device=/dev/davinci_manager \
 --device=/dev/hisi_hdc:/dev/hisi_hdc \
 -v /usr/local/sbin:/usr/local/sbin \
 -v /usr/local/Ascend/driver:/usr/local/Ascend/driver \
 -v /dev/shm:/dev/shm \
 -v /opt/enpu/vcann-rt/lib/libvruntime.so:/opt/enpu/vcann-rt/lib/libvruntime.so \
 -v /opt/enpu/vcann-rt/tools/enpu-monitor:/opt/enpu/vcann-rt/tools/enpu-monitor \
 -v ${config_path}/npu_info.config:/etc/enpu/vcann-rt/npu_info.config \
 -v ${preload_path}/ld.so.preload:/etc/ld.so.preload \
 image_name /bin/bash

说明:

${config_path}: 主机侧npu_info配置文件所在路径,可存放在自定义路径。容器内为固定路径。

${preload_path}:主机侧预加载动态库文件所在路径,可存放在自定义路径。容器内为固定路径。

注意:

虚拟化环境检测工具准备:用户需要保证使用的镜像中/usr/bin目录内包含systemd-detect-virt命令行工具。

该工具用于检测系统的运行环境是否为虚拟化环境和虚拟化方式。若容器内未安装systemd-detect-virt工具,软切分动态库在调用dcmi接口时会出现故障。

2. 拉起容器后,进入容器运行AI任务。启动任务前,可通过环境变量配置日志级别,例如:

export ENPU_LOG_LEVEL=1

说明:

日志级别由高到低分别是FATAL(0), ERROR(1), WARN(2), INFO(3), DEBUG(4)。

默认日志级别为INFO。

为获得良好使用体验,避免使用过程中大量日志打屏,推荐先设为ERROR(1)。

AI任务启动时,会自动拉起vCANN-RT算力控制和显存控制服务。回显内容“global init Success”,表示vCANN-RT服务启动成功。

3. 此外,在容器内可通过查询配置文件获取NPU资源配额等信息:

cat /etc/enpu/npu_info.config

4. 在容器内可通过监测工具查询NPU资源配额和内存使用情况等信息:

/opt/enpu/vcann-rt/tools/enpu-monitor

3 典型场景参考实践

当前的高校AI教学实训课程一个班有数十名学生同时上实验课,通过虚拟化软切分方案,只需一台4卡 Atlas 300I A2(64G)的服务器即可满足该实训要求。

以医疗命名实体识别实验这一典型实训场景为例,本章将围绕该场景下的典型实践,从场景介绍、部署方案及部署步骤三个部分展开,以说明切分和部署的设计思路和操作方法。通过采用软切分方案满足实训场景下的多模型同时训练的需求。该AI教学实训课程场景具备一定的通用性,也可为其他场景提供参考;用户可根据自身模型需求,对部署方案进行相应调整和验证。

3.1 场景介绍

医疗文本通常包含大量临床案例与医学知识,蕴含丰富的医学价值,但多以非结构化形式存在(即无固定格式、无统一字段的自然语言文本,如门诊病历、出院小结、医学文献等纯文字内容,无法被计算机直接识别、解析与利用)。为高效挖掘其中的医学知识,命名实体识别(NER)是从非结构化文本中提取疾病、症状、药物等关键医学实体的核心基础技术。

本实验基于医疗相关教科书和研究论文,利用已标注的医学语料,构建并训练 BiLstm+Crf 模型,实现对医疗命名实体的自动识别。

本实验所需文件获取方式如下:

表1 文件下载方式表

文件目录名称

下载地址

作用

dataset

wget https://staticfile.eduplus.net/dataSet/systemLib/5f9714775ecd4854bedea362f4dd4e0d.zip

实验数据集

scripts

wget https://staticfile.eduplus.net/dataSet/systemLib/81c5f0240e4d48ffb9ad899b6a4913b0.zip

conlleval_rev.pl:该文件为perl语言脚本文件,其功能是对模型预测的实体结果与真实结果进行统计,并得到分类报告。

eval.py:该文件为调用conlleval_rev.pl脚本的代码。

const.py:该文件中保存的是本实验中涉及的所有实体标注标签类别字典,由于内容较多,所以预先给出。

best_checkpoints

wget https://staticfile.eduplus.net/dataSet/systemLib/33057dc817344334a00f2277d5ec8fa1.zip

模型的预训练参数文件lstm_crf.pth和训练完的模型文件lstm_crf_model.pth。

checkpoints

用户自行创建该文件夹

用于保存训练好的模型文件

result

用户自行创建该文件夹

用于保存模型训练过程中的验证结果

data.py

【附录文件】获取

数据预处理脚本

data_loader.py

【附录文件】获取

数据加载脚本

model.py

【附录文件】获取

BiLSTM+CRF模型文件

main.py

【附录文件】获取

模型训练脚本

predict.py

【附录文件】获取

推理脚本

3.2 部署方案

使用一张300I A2(64G)单卡,通过软切分控制每个容器分配到10%的AICORE资源和6GB的显存资源。

每名学生使用一个容器,完成BiLstm+Crf 模型的训练,最后使用模型实现对医疗实体的自动识别。达成单卡满足十名学生同时进行AI教学实训任务的目标。

3.3 部署步骤

3.3.1 部署方式1:Kubernetes部署

  1. 执行命令 npu-smi info 确认服务器上所有NPU卡空闲(如下图所示0号卡上无任何进程)。

执行以下命令,开启所有NPU卡的容器共享模式。

for i in {0..7}; do npu-smi set -t device-share -i ${i} -d 1 ; done

看到回显如下,代表开启成功。

2. 安装vCANN-RT:参考2.3.1节,完成vCANN-RT安装。

3. 参考2.3.2.2节,确保三个k8s组件完成安装。

4. 在当前路径下创建yaml目录, 进入该目录并创建vnpu-container1.yaml 文件,该yaml文件内容可参考2.3.2.3节获取,并根据实际需求做相应修改。

5. 参考上述步骤,创建其余九个容器的yaml文件(vnpu-container1.yaml ~ vnpu-container10.yaml)。

6. 根据vnpu-container1.yaml 中 metadata.namespace 的值 ,执行 kubectl create namespace vnpu 创建命名空间。

7. 执行命令 kubectl apply -f ./ ,拉起所有的pod。

执行 kubectl get pods -n vnpu 查看到命名空间中创建的pod。

执行 kubectl exec -it vnpu-sglang1-default0-0 -n vnpu bash 进入容器。

在容器中执行 cat /etc/enpu/npu_info.config 查看npu_info.config的内容是否符合对应yaml的预设值。

执行 /opt/enpu/vcann-rt/tools/enpu-monitor 查询NPU资源配额和内存使用情况等信息。

8. 参考 3.1节中的文件下载方式表。下载所有实验文件到当前容器中同一目录下。

9. 在容器中执行训练脚本main.py,待训练完成后,可在checkpoints文件夹下得到训练好的模型文件lstm_crf_model.pth。注意调整合适的batchsize,保证训练最大显存占用始终不超过预设值6GB上限。若batchsize设置的过大,显存占用可能会超过预设值6GB上限,从而导致训练终止。

10. 修改推理脚本predict.py中的model_path为训练好的模型文件路径。

11. 执行推理脚本predict.py,完成推理。

3.3.2 部署方式2:Docker部署

  1. 执行命令 npu-smi info 查询空闲NPU卡(如下图所示0号卡上无任何进程),可对其进行切分。

2. 执行命令 npu-smi set -t device-share -i 0 -d 1 开启该设备的容器共享模式。看到回显如下,设置成功。

3. 参考2.3.1节,完成vCANN-RT安装。

4. 参考2.3.3.2节,准备好每个容器的配置文件npu_info.config

5. 参考2.3.3.3节, 拉起并进入容器。

执行 cat /etc/enpu/npu_info.config 查看 npu_info.config 是否符合预设值。

执行 /opt/enpu/vcann-rt/tools/enpu-monitor 查询NPU资源配额和内存使用情况等信息。

6. 参考 3.1节中的文件下载方式表。下载所有实验文件到当前容器中同一目录下。

7. 在容器中执行训练脚本main.py,待训练完成后,可在checkpoints文件夹下得到训练好的模型文件lstm_crf_model.pth。注意调整合适的batchsize,保证训练最大显存占用始终不超过预设值6GB上限。若batchsize设置的过大,显存占用可能会超过预设值6GB上限,从而导致训练终止。

8. 修改推理脚本predict.py中的model_path为之前训练得到的模型文件路径。

9. 在容器中执行推理脚本 predict.py,完成推理。

4 附录

4.1 data.py

import pickle, os
from scripts.const import *
def read_corpus(corpus_path, label_path, is_train=True):
    sents = []#保存所有句子
    labels = []#保存所有标签
    data = []#保存所有数据
    with open(corpus_path, encoding='utf-8') as fr:#打开文本数据文件
        lines_co = fr.readlines()#读取所有行
    with open(label_path) as fl:#打开标签文件
        lines_lb = fl.readlines()#读取所有行
    if not is_train:#如果不是训练,就加载已经创建好的词典文件
        word2id = read_dictionary('./dataset/vocab.pkl')
    else:#如果是训练,则需要重新创建词典文件
        word2id = {}
    for line_co, line_lb in zip(lines_co, lines_lb):
        sent_,tag_ = line_co.strip().split(),line_lb.strip().split()#去空格
        if len(sent_)==0:#句子长度为0 就不处理
            continue
        data.append((sent_, tag_))#原数据放入data
        sentence_id,label_ = [],[]#保存词id构成的序列和标签id构成的序列
        for word in sent_:
            word = word.lower()
            if word.isdigit():#判断是数字就转换成<NUM>
                word = '<NUM>'
            if is_train:
                if word not in word2id:
                    word2id[word] = len(word2id)+1#加入词典
            else:
                if word not in word2id:#不在词典内就是未知词
                    word = '<UNK>'
            sentence_id.append(word2id[word])#转换成id加入列表
        for tag in tag_:
            label = tag2label[tag]
            label_.append(label)
        sents.append(sentence_id)#保存到所有句子中
        labels.append(label_)#保存到所有标签序列中
    if is_train:
        word2id['<UNK>'] = len(word2id)+1
        word2id['<PAD>'] = PAD
        print('vocabulary length:', len(word2id))
        with open('./dataset/vocab.pkl', 'wb') as fw:
            pickle.dump(word2id, fw)
    return sents, labels, data
def read_dictionary(vocab_path):
    vocab_path = os.path.join(vocab_path)#拼接路径
    with open(vocab_path, 'rb') as fr:
        word2id = pickle.load(fr)#加载词典
    return word2id#返回词典
def get_vector(path):
    with open(path,'r',encoding='utf-8') as f:
        vectors={}#空字典 用来保存读取的词向量
        for line in f.readlines():
            line=line.strip().split(' ')#空格分隔
            word,vector = line[0],line[1:]#第一个是字,后面的是向量的值
            vector=[float(i) for i in vector]#将数值字符串转换成小数
            if len(vector)==100:#长度为100才对
                vectors[word]=vector#放入字典
    return vectors#返回

4.2 data_loader.py

import numpy as np
import torch
from scripts.const import *
# 先尝试导入torch_npu(与main.py保持一致,确保NPU环境可用)
try:
    import torch_npu
except ImportError:
    raise ImportError("未找到torch_npu依赖,请安装匹配版本的昇腾PyTorch适配包")
class DataLoader(object):
    # 1. 修改初始化参数:cuda → npu,默认值适配main.py的NPU配置
    def __init__(self, sents,labels, npu=True, batch_size=64, shuffle=False, evaluation=False):
        self.npu = npu  # 替换原self.cuda为self.npu,记录是否使用NPU
        self.sents = sents
        self.labels = labels
        self.evaluation = evaluation
        self._batch_size = batch_size
        if not evaluation:
            self._augment()#实现数据增强
        self.sents,self.labels = np.asarray(self.sents),np.asarray(self.labels)
        if not evaluation:
            self._clean()#实现数据清理
        self.sents_size = len(self.sents)
        self._step = 0#用来记录迭代时的批次下标
        self.num_batchs = self.sents_size // batch_size#计算批次数量
        self.divided = True#记录是否整除
        if self.sents_size % batch_size != 0:#不能整除
            self.divided = False
            self.num_batchs += 1#批次数量+1
        if shuffle:
            self._shuffle()#打乱数据
        elif not evaluation:
            self._sorted()#排序数据
    def _augment(self):
        # 前后两句拼接成一个句子(原有逻辑不变,无需修改)
        self.sents = self.sents + [self.sents[i] + self.sents[i+1] for i  in range(len(self.sents)-1)]
        self.labels = self.labels + [self.labels[i] + self.labels[i + 1] for i in range(len(self.labels) - 1)]
    
    def _clean(self):
        # 删除标签全部是O的句子,就是没有实体的句子(原有逻辑不变,无需修改)
        sums = [len(set(item))>1 for item in self.labels]#计算句子中非O的个数
        indices = np.where(sums)[0]#筛选出非全O的句子(修正原注释表述误差)
        self.sents = self.sents[indices]#布尔索引实现文本筛选
        self.labels = self.labels[indices]#对应的标签也要去掉
    def _shuffle(self):
        # 打乱数据(原有逻辑不变,无需修改)
        indices = np.random.permutation(self.sents_size)#获取乱序下标
        self.sents = self.sents[indices]#文本数据重新索引
        self.labels = self.labels[indices]#标签数据重新索引
    def _sorted(self):
        # 按句子长度排序(原有逻辑不变,无需修改)
        lens = np.array([len(sent) for sent in self.sents])#计算句子长度
        indices = np.argsort(lens)#排序
        self.sents = self.sents[indices]#重新索引
        self.labels = self.labels[indices]#重新索引
    def __iter__(self):
        return self
    def __next__(self):
        # 批次迭代逻辑(原有逻辑不变,无需修改)
        if self._step == self.num_batchs:
            self._step = 0
            raise StopIteration()
        _start = self._step * self._batch_size
        if (self._step == self.num_batchs-1 and self.divided == False):
            _sp = self.sents_size % self._batch_size
        else:
            _sp = self._batch_size
        self._step += 1
        if not self.evaluation:
            indices = np.random.permutation(_sp)
        else:
            indices = np.arange(_sp)
        word, seq_len_list = self._pad_to_longest(self.sents[_start:_start + _sp][indices])
        label, _ = self._pad_to_longest(self.labels[_start:_start + _sp][indices])
        return word, label, seq_len_list
    def _pad_to_longest(self, insts):
        # 2. 仅修改张量设备迁移逻辑:cuda() → npu(),适配NPU环境
        seq_len_list = [len(inst) for inst in insts]
        max_len = max(len(inst) for inst in insts)
        inst_data = np.array(
            [inst + [PAD] * (max_len - len(inst)) for inst in insts])
        if self.evaluation:
            with torch.no_grad():
                inst_data_tensor = torch.from_numpy(inst_data)
        else:
            inst_data_tensor = torch.from_numpy(inst_data)
        # 替换原CUDA设备迁移为NPU设备迁移,与self.npu参数联动
        if self.npu:
            inst_data_tensor = inst_data_tensor.npu()  # 等价于inst_data_tensor.to('npu')
        return inst_data_tensor, seq_len_list

4.3 model.py

import torch
import torch.nn as nn
from torch.nn import init
import numpy as np
from scripts.const import *
# 补充:导入torch_npu(与其他文件保持一致,确保NPU环境完整性,避免导入异常)
try:
    import torch_npu
except ImportError:
    raise ImportError("未找到torch_npu依赖,请安装匹配版本的昇腾PyTorch适配包")
class CRF(nn.Module):
    def __init__(self, label_size, device):
        super().__init__()
        self.label_size = label_size
        self.device=device
        self.transitions = nn.Parameter(
            torch.randn(label_size, label_size))  # 定义转移矩阵
        init.xavier_uniform_(self.transitions)  # 初始化
        self.transitions.data[START, :].fill_(-10000.)  # 起始行全部设为-1000
        self.transitions.data[:, STOP].fill_(-10000.)  # 终止列全部设为-1000
    def _log_sum_exp(self,input, keepdim=False):
        assert input.dim() == 2
        max_scores, _ = input.max(dim=-1, keepdim=True)# max_score维度是1
        output = input - max_scores#减去最大值,目的是防止指数运算上溢
        return max_scores + torch.log(torch.sum(torch.exp(output), dim=-1, keepdim=keepdim))
    def _score_sentence(self, feats, tags):
        score = torch.zeros(tags.shape[0]).to(self.device)
        s_score = torch.LongTensor([[START]] * tags.shape[0]).to(self.device)#初始化分值
        tags = torch.cat([s_score, tags], dim=-1)
        for i in range(feats.shape[1]):#遍历每一个token
            feat=feats[:,i,:]#获取当前token的发射概率
            #加上转移概率和发射概率
            score = score + self.transitions[tags[:,i + 1], tags[:,i]] + feat[range(feat.shape[0]),tags[:,i + 1]]
        score = score + self.transitions[STOP, tags[:,-1]]
        return score
    def forward(self,input):
        # 初始化路径分值
        init_vvars = torch.full((1, self.label_size), -10000.).to(self.device)
        init_vvars[0][START] = 0
        forward_var_list = []
        forward_var_list.append(torch.stack([init_vvars] * input.shape[0]).squeeze(1))  # 第一个时刻
        for feat_index in range(input.shape[1]):
            gamar_r_l = forward_var_list[feat_index].unsqueeze(1)  # [128,1,33]
            next_tag_var = gamar_r_l + self.transitions  # 128,33,33  加上转移概率
            t_r1_k = torch.unsqueeze(input[:, feat_index, :], -1)  # 128,33,1 发射概率
            forward_var_new = next_tag_var + t_r1_k  # 128,33,33 加上发射概率
            forward_var_list.append(torch.logsumexp(forward_var_new, dim=2))#求和
        terminal_var = forward_var_list[-1] + self.transitions[STOP].repeat([input.shape[0], 1])
        return torch.logsumexp(terminal_var, dim=1)
    def _viterbi_decode(self, feats):
        backpointers = []
        #初始化路径分值
        init_vvars = torch.full((1, self.label_size), -10000.).to(self.device)
        init_vvars[0][START] = 0
        forward_var_list = []
        forward_var_list.append(torch.stack([init_vvars] * feats.shape[0]))#第一个时刻
        for feat_index in range(feats.shape[1]):
            gamar_r_l = forward_var_list[feat_index]#[128,1,33]
            next_tag_var = gamar_r_l + self.transitions#128,33,33  加上转移概率
            t_r1_k = torch.unsqueeze(feats[:,feat_index,:], -1)#128,33,1 发射概率
            forward_var_new = next_tag_var + t_r1_k#128,33,33 加上发射概率
            #取出当前时刻的最大分值状态集状态下标
            viterbivars_t, bptrs_t = torch.max(forward_var_new, dim=-1)  # 128,33  128,33
            forward_var_list.append(viterbivars_t.unsqueeze(1))
            backpointers.append(bptrs_t)
        #转移到最后的stop
        terminal_var = forward_var_list[-1] + self.transitions[STOP]
        # 找到最大路径终止时刻tag下标
        path_score,best_tag_id = torch.max(terminal_var.squeeze(1), dim=-1)
        #回溯解码
        best_tag_id=best_tag_id.unsqueeze(1)
        best_path = [best_tag_id]
        for bptrs_t in reversed(backpointers):
            best_tag_id = torch.gather(bptrs_t,1,best_tag_id)#获取上一时刻路径状态
            best_path.append(best_tag_id)
        best_path.pop()#START不要
        best_path.reverse()#逆转
        return path_score, torch.cat(best_path, dim=-1)
class BiLSTM(nn.Module):
    def __init__(self, word_ebd_dim, lstm_hsz, lstm_layers, dropout, batch_size,vec,word2id,device):
        super().__init__()
        self.lstm_layers = lstm_layers
        self.lstm_hsz = lstm_hsz
        self.batch_size = batch_size
        self.word2id=word2id
        self.word_size=len(word2id)
        self.word_ebd_dim=word_ebd_dim
        self.device=device
        self.scope=1
        self.vec=vec
        if vec:#如果预训练字向量存在
            vecs =self._get_vec()#获取字向量
            self.word_ebd = nn.Embedding.from_pretrained(vecs)#使用预训练的字向量实例化嵌入层
        else:
            self.word_ebd = nn.Embedding(self.word_size, self.word_ebd_dim)#随机初始化嵌入层
            self.word_ebd.weight.data.uniform_(self.scope, self.scope)
        self.lstm = nn.LSTM(word_ebd_dim,
                            hidden_size=lstm_hsz // 2,
                            num_layers=lstm_layers,
                            batch_first=True,
                            dropout=dropout,
                            bidirectional=True)#实现BiLstm层
    def _get_vec(self):
        weight = torch.Tensor(self.word_size, self.word_ebd_dim).uniform_(-self.scope,self.scope)
        for k,v in self.vec.items():
            if k in self.word2id.keys():
                index = self.word2id[k]#获取词对应的id
                weight[index, :] = torch.from_numpy(np.array(v))#将该词对应的嵌入向量修改为预训练向量
        return weight
    def forward(self, words):
        encode = self.word_ebd(words)#嵌入计算
        # 初始化状态
        hidden=self.init_hidden(words.shape[0])
        output, hidden = self.lstm(encode,hidden)#sltm计算
        return output, hidden
    def init_hidden(self,batch_size):
        return (torch.zeros(self.lstm_layers * 2, batch_size, self.lstm_hsz // 2).to(self.device),
                    torch.zeros(self.lstm_layers * 2, batch_size, self.lstm_hsz // 2).to(self.device))
class Model(nn.Module):
    def __init__(self, args):
        super().__init__()
        for k, v in args.__dict__.items():#参数赋值
            self.__setattr__(k, v)
        self.bilstm = BiLSTM(self.word_ebd_dim,
                             self.lstm_hsz, self.lstm_layers,
                             self.dropout, self.batch_size,
                             self.pre_trained_vec,self.word2id,self.device
                             )# bilstm层
        self.logistic = nn.Linear(self.lstm_hsz, self.label_size)#Linear层
        self.crf = CRF(self.label_size, self.device)#crf层
        torch.nn.init.xavier_normal_(self.logistic.weight)#初始化linear参数
        self.logistic.bias.data.fill_(0)
    def forward(self, words, labels, hidden=None):
        output, _ = self.bilstm(words)# bilstm计算
        output = self.logistic(output)# linear层计算
        pre_score = self.crf.forward(output)#所有路径分值和
        label_score = self.crf._score_sentence(output, labels)
        return (pre_score - label_score).mean(), None
    def predict(self, word):
        lstm_out, _ = self.bilstm(word)
        out = self.logistic(lstm_out)
        return self.crf._viterbi_decode(out)

4.4 main.py

import argparse  # 补充缺失的argparse导入
import torch
import numpy as np  # 可选:保留潜在需要的numpy支持
# 1. 导入并初始化NPU相关依赖(在torch之后导入,避免NameError)
try:
    import torch_npu
    torch.npu.init()  # 初始化NPU环境,启用PyTorch对昇腾NPU的支持
except ImportError:
    raise ImportError("未找到torch_npu依赖,请安装匹配版本的昇腾PyTorch适配包")
from model import Model
from data_loader import DataLoader
from data import read_corpus, tag2label, read_dictionary, get_vector
import os
from scripts.eval import conlleval
import time
parser = argparse.ArgumentParser(description='LSTM_CRF')
parser.add_argument('--epochs', type=int, default=100,
                    help='number of epochs for train')
parser.add_argument('--batch-size', type=int, default=64,
                    help='batch size for training')
parser.add_argument('--test-size', type=int, default=64,
                    help='batch size for testing')
parser.add_argument('--seed', type=int, default=1111,
                    help='random seed')
# 2. 适配NPU:将use-cuda改为use-npu,保持参数格式一致性(兼容data_loader)
parser.add_argument('--use-npu', action='store_true', default=True,
                    help='enables npu (replace use-cuda for ascend NPU)')
parser.add_argument('--lr', type=float, default=0.005,
                    help='learning rate')
parser.add_argument('--use-crf', action='store_true',
                    help='use crf')
parser.add_argument('--mode', type=str, default='train',
                    help='train mode or test mode')
parser.add_argument('--save', type=str, default='./checkpoints/lstm_crf.pth',
                    help='path to save the final parameters of model')
parser.add_argument('--save-model', type=str, default='./checkpoints/lstm_crf_model.pth',
                    help='path to save the final model')
parser.add_argument('--save-epoch', action='store_true',
                    help='save every epoch')
parser.add_argument('--data', type=str, default='dataset',
                    help='location of the data corpus')
parser.add_argument('--word-ebd-dim', type=int, default=100,
                    help='number of word embedding dimension')
parser.add_argument('--dropout', type=float, default=0.5,
                    help='the probability for dropout')
parser.add_argument('--lstm-hsz', type=int, default=256,
                    help='BiLSTM hidden size')
parser.add_argument('--lstm-layers', type=int, default=2,
                    help='biLSTM layer numbers')
parser.add_argument('--l2', type=float, default=0.002,
                    help='l2 regularization')
parser.add_argument('--clip', type=float, default=5,
                    help='gradient clipping')
parser.add_argument('--result_path', type=str, default='./result',
                    help='result-path')
parser.add_argument('--vocab-path', type=str, default='./dataset/vocab.pkl',
                    help='vocab-path')
parser.add_argument('--vec-path', type=str, default='./dataset/vec.txt',
                    help='vec-path')
args = parser.parse_args()
torch.manual_seed(args.seed)
# 3. 设备选择逻辑:从CUDA切换为NPU,保持参数逻辑一致性
args.device = torch.device("npu" if args.use_npu else "cpu")
# 补充:若需指定具体NPU卡号,可修改为 torch.device("npu:0" if args.use_npu else "cpu")
def train(epoch):
    model.train()
    total_loss = 0
    for word, label, _ in train_data:
        optimizer.zero_grad()
        loss, _ = model(word, label)
        loss.backward()
        # 4. 适配NPU:梯度裁剪(torch原生接口,已被torch_npu兼容,保留原逻辑)
        torch.nn.utils.clip_grad_norm_(model.parameters(), args.clip)
        optimizer.step()
        print("epoch:{},step:{}/{},loss:{}".format(epoch, train_data._step, train_data.num_batchs, loss.item()))
        total_loss += loss.detach()
    return total_loss / train_data.num_batchs
def evaluate(epoch, current_test_data):  # 新增参数:接收每个epoch全新的test_data,解决迭代器耗尽
    model.eval()
    eval_loss = 0
    model_predict = []
    label2tag = {}
    for tag, lb in tag2label.items():
        label2tag[lb] = tag if lb!=0 else lb
    label_list = []
    with torch.no_grad():
        for word, label, seq_len_list in current_test_data:
            loss, _ = model(word, label)
            _,pred = model.predict(word)  # 模型预测
            for i, seq_len in enumerate(seq_len_list):
                # 5. 适配NPU:NPU张量需先转CPU再转numpy(numpy不支持NPU张量,保留原逻辑)
                pred_ = list(pred[i][:seq_len].cpu().numpy())
                label_list.append(pred_)
            eval_loss += loss.detach().item()
    # 保持原逻辑:构建评估数据格式
    for label_, (sent, tag) in zip(label_list, data_origin):
        tag_ = [label2tag[label__] for label__ in label_]
        sent_res = []
        if len(label_) != len(sent):
            print(sent)
            print(len(label_))
            print(tag)
        for i in range(len(sent)):
            sent_res.append([sent[i], tag[i], tag_[i]])
        model_predict.append(sent_res)
    
    # 确保结果目录存在(原逻辑补充,避免报错)
    os.makedirs(args.result_path, exist_ok=True)
    label_path = os.path.join(args.result_path, 'label_' + str(epoch))
    metric_path = os.path.join(args.result_path, 'result_metric_' + str(epoch))
    
    # 保持原逻辑:输出完整评估指标
    for line in conlleval(model_predict, label_path, metric_path):
        print(line)
    
    return eval_loss / current_test_data.num_batchs
# 加载数据(保留原逻辑,不修改数据读取流程)
sents_train, labels_train,_ = read_corpus(os.path.join('.', args.data, 'train_data.txt'), os.path.join('.', args.data, 'train_label.txt'))
sents_test, labels_test, data_origin = read_corpus(os.path.join('.', args.data, 'test_data.txt'), os.path.join('.', args.data, 'test_label.txt'), is_train=False)
args.label_size = len(tag2label)
# 6. 适配NPU:将cuda参数改为npu(兼容data_loader.py的参数格式),初始化训练数据Loader
train_data = DataLoader(sents_train, labels_train, npu=args.use_npu, batch_size=args.batch_size)
# 加载预训练词向量和词典(保留原逻辑)
args.pre_trained_vec = get_vector(args.vec_path)
args.word2id = read_dictionary(args.vocab_path)
# 实例化模型并迁移至NPU设备(保留原逻辑,适配NPU设备)
model = Model(args)
model = model.to(args.device)
# 7. 适配NPU:加载模型参数,map_location指定为NPU设备,避免设备不匹配
if os.path.exists(args.save):
    # 优化:避免重复加载模型参数(原代码重复调用torch.load,此处修正)
    state_dict = torch.load(args.save, map_location=args.device)
    model.load_state_dict(state_dict)
    print(f"已成功加载预训练模型参数:{args.save}")
else:
    print(f"未找到预训练模型参数:{args.save},将从头开始训练")
# 定义优化器(保留原逻辑,torch.optim原生接口已被torch_npu兼容)
optimizer = torch.optim.Adam(model.parameters(), lr=args.lr)  # betas=(0.9, 0.98), eps=1e-9, weight_decay=args.l2)
train_loss = []
if args.mode == 'train':
    print('-' * 90)
    # 确保模型保存目录存在(原逻辑补充,避免报错)
    os.makedirs(os.path.dirname(args.save), exist_ok=True)
    for epoch in range(1, args.epochs + 1):
        epoch_start_time = time.time()
        loss = train(epoch)  # 训练(保留原逻辑)
        
        # 保留原格式输出:epoch开始信息、耗时、损失
        print('| start of epoch {:3d} | time: {:2.2f}s | loss {:5.6f}'.format(epoch, time.time() - epoch_start_time, loss))
        
        # 8. 关键修复:每个epoch重新初始化test_data(解决迭代器耗尽),兼容NPU参数
        current_test_data = DataLoader(
            sents_test, 
            labels_test, 
            npu=args.use_npu,  # 适配data_loader的npu参数
            evaluation=True, 
            batch_size=args.test_size
        )
        
        # 模型评估(传入全新的test_data,保留原逻辑)
        eval_loss = evaluate(epoch, current_test_data)
        
        # 保存模型参数和模型对象(保留原逻辑,适配NPU设备)
        torch.save(model.state_dict(), args.save)
        torch.save(model, args.save_model)

4.5 predict.py

from data import tag2label, read_dictionary
import torch
import numpy as np
import time  # 1. 导入time模块,用于时间统计
# 导入所有自定义类
from model import Model, BiLSTM, CRF
# 导入所有需要信任的PyTorch原生组件
from torch.nn import Embedding, LSTM, Linear
# 导入并初始化NPU相关依赖
try:
    import torch_npu
    torch.npu.init()  # 初始化NPU环境
except ImportError:
    raise ImportError("未找到torch_npu依赖,请安装匹配版本的昇腾PyTorch适配包")
# 模型与词典路径
model_path='checkpoints/lstm_crf_model.pth'#模型路径
vocab_path='dataset/vocab.pkl'#词典路径
# 加载词典并构建标签映射
word2id=read_dictionary(vocab_path)#加载词典
label2tag={v:k for k,v in tag2label.items()}#构建tag词典
# 待预测文本
text='随着生活水平的提高,2型糖尿病已经成为继心脑血管病、恶性肿瘤之后影响人类健康的第三大因素'
# 数据处理
raw_chars = [char for char in text.strip()]  # 保留原始字符列表
processed_chars = ['<NUM>' if char.isdigit() else char for char in text.strip()]
words=[word2id[char] if char in word2id.keys() else word2id['<UNK>'] for char in processed_chars]
# 设备切换:NPU
device = torch.device('npu')
# 加载模型
with torch.serialization.safe_globals([Model, BiLSTM, CRF, Embedding, LSTM, Linear]):
    model = torch.load(model_path, map_location='npu').to(device)
# 同步更新模型内部组件设备属性
model.crf.device = device
model.bilstm.device = device
# 准备模型输入张量
input_tensor = torch.from_numpy(np.array([words])).to(device)
# ========== 核心:预测耗时计算(关键步骤) ==========
# 2. 记录推理前的时间戳(可选:使用time.perf_counter(),精度更高)
start_time = time.time()  # 简易统计,单位:秒
# start_time = time.perf_counter()  # 高精度统计,适合短时间间隔(推荐)
# 模型预测(核心推理步骤)
_, pred = model.predict(input_tensor)
# 3. 记录推理后的时间戳
end_time = time.time()
# end_time = time.perf_counter()
# 4. 计算预测耗时(差值即为耗时)
predict_duration = end_time - start_time
# 结果处理:转换为[字符, 标签]的键值对形式
pred_labels = [label2tag[p] for p in pred[0].cpu().numpy()]
result_pairs = [[char, label] for char, label in zip(raw_chars, pred_labels)]
# ========== 打印耗时结果(格式化输出,更易读) ==========
print("="*50)
print(f"预测完成!核心推理耗时:{predict_duration:.6f} 秒")
print(f"(折合:{predict_duration * 1000:.3f} 毫秒)")
print("="*50)
# 按行打印键值对结果
for pair in result_pairs:
    print(pair)
# ========== 可选:统计完整流程耗时(从程序启动到结果输出) ==========
# 若需统计完整流程,可在程序开头添加 `total_start = time.time()`,此处计算:
# total_end = time.time()
# total_duration = total_end - total_start
# print(f"完整流程(含加载、处理、推理、结果格式化)耗时:{total_duration:.6f} 秒")


本页内容