昇腾社区首页
中文
注册

异步推理+callback回调处理

关于异步推理+callback回调处理的接口调用流程,请依次参见主要接口调用流程Callback场景

基本原理

异步推理场景下,关键接口的调用流程如下:

  1. 调用acl.init接口初始化ACL。
  2. 按顺序调用acl.rt.set_deviceacl.rt.create_contextacl.rt.create_stream接口依次申请运行管理资源:DeviceContextStream,确保可以使用这些资源执行运算、管理任务。
    如果不显式创建Context和Stream,您可以使用acl.rt.set_device接口隐式创建的默认Context和默认Stream,但默认Context和默认Stream存在如下限制:
    • 一个Device对应一个默认Context,默认Context不能通过acl.rt.destroy_context接口来释放。
    • 一个Device对应一个默认Stream,默认Stream不能通过acl.rt.destroy_stream接口来释放。默认Stream作为接口入参时,直接传0。
  3. 申请模型推理资源。
    1. 加载模型。
      加载模型数据分为以下4种方式,当由用户管理内存时,为确保内存不浪费,在申请工作内存、权值内存前,需要调用acl.mdl.query_size接口查询模型运行时所需工作内存、权值内存的大小。
    2. 调用acl.mdl.get_desc接口获取成功加载的模型的描述信息。
    3. 初始化内存,用于存放模型推理的输入数据、输出数据。

      此处需要用户可自行定义函数实现以下关键点:

      1. 调用acl.mdl.create_dataset接口创建aclmdlDataset类型的数据(描述模型的输入/输出)。
      2. 模型可能存在多个输入/输出,调用acl.create_data_buffer接口创建aclDataBuffer类型的数据(描述每个输入/输出数据的内存地址、内存大小)。

        模型输入数据的内存大小,根据实际读入的图片数据的大小来确定。

        模型输出数据的内存大小,可调用acl.mdl.get_output_size_by_index接口根据3.b中模型描述信息获取每个输出需占用的内存大小。

      3. 调用acl.mdl.add_dataset_buffer接口向aclmdlDataset中增加aclDataBuffer。
  4. 执行模型异步推理和callback(用于处理模型推理的结果)。
    此处需要用户可自行定义函数实现以下关键点:
    1. 创建新线程(例如t1),在线程函数内调用acl.rt.process_report接口,等待指定时间后,触发回调函数(例如CallBackFunc,用于处理模型推理结果)。
    2. 调用acl.rt.subscribe_report接口,指定处理Stream上回调函数(CallBackFunc)的线程(t1),回调函数需要提前创建,用于处理模型推理的结果(例如,将推理结果写入文件、从推理结果中获取topn置信度的类别标识等)。
    3. 调用acl.mdl.execute_async接口执行异步模型推理。
    4. 调用acl.rt.launch_callback接口,在Stream的任务队列中增加一个需要在Host/Device上执行的回调函数(CallBackFunc)。
    5. 调用acl.rt.synchronize_stream接口,阻塞应用程序运行,直到指定Stream中的所有任务都完成。
    6. 调用acl.rt.unsubscribe_report接口,取消线程注册,Stream上的回调函数(CallBackFunc)不再由指定线程(t1)处理。
    7. 模型推理结束后,调用acl.mdl.unload接口卸载模型。
  5. 所有数据处理结束后,按顺序调用acl.rt.destroy_streamacl.rt.destroy_contextacl.rt.reset_device接口依次释放运行管理资源,包括StreamContextDevice
  6. 调用acl.finalize接口实现ACL去初始化。

示例代码

您可以从样例介绍中查看完整样例代码。

在acl_resnet50_async样例中:

  • 运行可执行文件,不带参数时:
    • 执行模型异步推理的次数默认为10次;
    • device id默认使用 编号为 0 的设备
    • callback间隔默认为1,表示1次异步推理后,下发一次callback任务;
    • 内存池中的内存块的个数默认为10个。
  • 运行可执行文件,带参数时:
    • 第一个参数表示使用设备ID;
    • 第二个参数表示执行模型异步推理的次数;
    • 第三个参数表示下发callback间隔,参数值为0时表示不下发callback任务,参数值为非0值(例如m)时表示m次异步推理后下发一次callback任务;
    • 第四个参数表示内存池中内存块的个数,内存块个数需大于等于模型异步推理的次数。用户可根据输入图片数量,来调整内存块的个数,例如有2张输入图片、内存块个数为2时,则1张图片1个内存块;例如有3张输入图片、内存块个数为10时,则执行10次循环,每(10/3取整)个内存块对应同一张图片,剩下1个内存块随机对应1张图片。内存块中存放是模型推理的输入数据、输出数据,若多个内存块对应的是同一张图片,则多个内存块中存放的是相同的输入数据、输出数据,用于输入图片少但又想模拟大量图片数据的场景。
    • 第五个参数表示要加载执行的模型的路径
    • 第六个参数表示输入的图片路径。支持图片格式如下:

      IMG_EXT = ['.jpg', '.JPG', '.png', '.PNG', '.bmp', '.BMP', '.jpeg', '.JPEG']

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import acl
import argparse
# ......

images_list = [os.path.join(args.images_path, img) for img in os.listdir(args.images_path) \
               if os.path.splitext(img)[1] in IMG_EXT]
data_list = []
for image in images_list:
    # 自定义函数transfer_pic,完成以下功能:
    # 加载图片到内存,并resize成 224x224尺寸
    transfer_pic(image)
    dst_im = np.fromfile(image.replace(".jpg", ".bin"), dtype=np.byte)
    data_list.append(dst_im)

# 1.资源初始化: 样例中通过Net类实现资源初始化操作
# 1.1 pyACL初始化
ret = acl.init()
# 1.2 运行管理资源申请
ret = acl.rt.set_device(self.device_id)
self.context, ret = acl.rt.create_context(self.device_id)
self.stream, ret = acl.rt.create_stream()
# 获取当前昇腾AI软件栈的运行模式,根据不同的运行模式,后续的内存申请、内存复制等接口调用方式不同
self.run_mode, ret = acl.rt.get_run_mode()
# 1.3 申请模型推理资源
# 1.4 加载模型
# 加载离线模型文件,模型加载成功,返回标识模型的ID。
self.model_id, ret = acl.mdl.load_from_file(self.model_path)
# 1.5 根据模型的ID,获取该模型的描述信息
self.model_desc = acl.mdl.create_desc()
ret = acl.mdl.get_desc(self.model_desc, self.model_id)

# 2.模型推理
# 2.1 根据输入参数内存池中内存块的个数申请内存,拷贝图片数据到device侧
def _data_interaction(self, images_dataset_list):
    for idx in range(self.memory_pool):
        img_idx = idx % len(images_dataset_list)
        img_input = self._load_input_data(images_dataset_list[img_idx])
        infer_ouput = self._load_output_data()
        self.dataset_list.append([img_input, infer_ouput])
# 2.2 创建线程tid,并将该tid线程指定为处理Stream上回调函数的线程
# 其中ProcessCallback为线程函数,在该函数内调用acl.rt.process_report接口,等待指定时间后,触发回调函数处理
tid, ret = acl.util.start_thread(self._process_callback, [self.context, 50])
# 2.3 指定处理Stream上回调函数的线程
ret = acl.rt.subscribe_report(tid, self.stream)
# 2.4 创建回调函数,用户处理模型推理的结果,由用户自行定义
def callback_func(self, delete_list):
    for temp in delete_list:
        _, infer_output = temp
        # device to host
        num = acl.mdl.get_dataset_num_buffers(infer_output)
        for i in range(num):
            temp_output_buf = acl.mdl.get_dataset_buffer(infer_output, i)
            infer_output_ptr = acl.get_data_buffer_addr(temp_output_buf)
            infer_output_size = acl.get_data_buffer_size(temp_output_buf)
            output_host, ret = acl.rt.malloc_host(infer_output_size)
            ret = acl.rt.memcpy(output_host,
                                 infer_output_size,
                                 infer_output_ptr,
                                 infer_output_size,
                                 ACL_MEMCPY_DEVICE_TO_HOST)
            output_host_dict = ["buffer": output_host, "size": infer_output_size}]
            result = self.get_result(output_host_dict)
            st = struct.unpack("1000f", bytearray(result[0]))
            vals = np.array(st).flatten()
            top_k = vals.argsort()[-1:-6:-1]
            print("\n======== top5 inference results: =============")
            for n in top_k:
                print("[%d]: %f" % (n, vals[n]))
            ret = acl.rt.free_host(output_host)
# 2.5 自定义函数forward,执行模型推理
def forward(self):
    self.excute_dataset = []
    for idx in range(self.excute_times):
        img_data, infer_output = self.dataset_list.pop(0)
        ret = acl.mdl.execute_async(self.model_id,
                                      img_data,
                                      infer_output,
                                      self.stream)
        if self.is_callback:
            self.excute_dataset.append([img_data, infer_output])
            self._get_callback(idx)
# 2.6 对于异步推理,需阻塞应用程序运行,直到指定Stream中的所有任务都完成
ret = acl.rt.synchronize_stream(self.stream)
# 2.7 取消线程注册,Stream上的回调函数不再由指定线程处理 
ret = acl.rt.unsubscribe_report(tid, self.stream)
self.is_exist = True
ret = acl.util.stop_thread(tid)

# 3.释放运行管理资源
ret = acl.rt.destroy_stream(self.stream)
ret = acl.rt.destroy_context(self.context)
ret = acl.rt.reset_device(self.model_id)

# 4.pyACL去初始化
ret = acl.finalize()
# ......