异步推理+callback回调处理
关于异步推理+callback回调处理的接口调用流程,请依次参见主要接口调用流程、Callback场景。
基本原理
异步推理场景下,关键接口的调用流程如下:
- 调用acl.init接口初始化ACL。
- 按顺序调用acl.rt.set_device、acl.rt.create_context、acl.rt.create_stream接口依次申请运行管理资源:Device、Context、Stream,确保可以使用这些资源执行运算、管理任务。
如果不显式创建Context和Stream,您可以使用acl.rt.set_device接口隐式创建的默认Context和默认Stream,但默认Context和默认Stream存在如下限制:
- 一个Device对应一个默认Context,默认Context不能通过acl.rt.destroy_context接口来释放。
- 一个Device对应一个默认Stream,默认Stream不能通过acl.rt.destroy_stream接口来释放。默认Stream作为接口入参时,直接传0。
- 申请模型推理资源。
- 加载模型。
加载模型数据分为以下4种方式,当由用户管理内存时,为确保内存不浪费,在申请工作内存、权值内存前,需要调用acl.mdl.query_size接口查询模型运行时所需工作内存、权值内存的大小。
- acl.mdl.load_from_file:从文件加载离线模型数据,由系统内部管理内存。
- acl.mdl.load_from_mem:从内存加载离线模型数据,由系统内部管理内存。
- acl.mdl.load_from_file_with_mem:从文件加载离线模型数据,由用户自行管理模型运行的内存(包括工作内存和权值内存)。
- acl.mdl.load_from_mem_with_mem:从内存加载离线模型数据,由用户自行管理模型运行的内存(包括工作内存和权值内存)。
- 调用acl.mdl.get_desc接口获取成功加载的模型的描述信息。
- 初始化内存,用于存放模型推理的输入数据、输出数据。
- 调用acl.mdl.create_dataset接口创建aclmdlDataset类型的数据(描述模型的输入/输出)。
- 模型可能存在多个输入/输出,调用acl.create_data_buffer接口创建aclDataBuffer类型的数据(描述每个输入/输出数据的内存地址、内存大小)。
模型输入数据的内存大小,根据实际读入的图片数据的大小来确定。
模型输出数据的内存大小,可调用acl.mdl.get_output_size_by_index接口根据3.b中模型描述信息获取每个输出需占用的内存大小。
- 调用acl.mdl.add_dataset_buffer接口向aclmdlDataset中增加aclDataBuffer。
- 加载模型。
- 执行模型异步推理和callback(用于处理模型推理的结果)。
此处需要用户可自行定义函数实现以下关键点:
- 创建新线程(例如t1),在线程函数内调用acl.rt.process_report接口,等待指定时间后,触发回调函数(例如CallBackFunc,用于处理模型推理结果)。
- 调用acl.rt.subscribe_report接口,指定处理Stream上回调函数(CallBackFunc)的线程(t1),回调函数需要提前创建,用于处理模型推理的结果(例如,将推理结果写入文件、从推理结果中获取topn置信度的类别标识等)。
- 调用acl.mdl.execute_async接口执行异步模型推理。
- 调用acl.rt.launch_callback接口,在Stream的任务队列中增加一个需要在Host/Device上执行的回调函数(CallBackFunc)。
- 调用acl.rt.synchronize_stream接口,阻塞应用程序运行,直到指定Stream中的所有任务都完成。
- 调用acl.rt.unsubscribe_report接口,取消线程注册,Stream上的回调函数(CallBackFunc)不再由指定线程(t1)处理。
- 模型推理结束后,调用acl.mdl.unload接口卸载模型。
- 所有数据处理结束后,按顺序调用acl.rt.destroy_stream、acl.rt.destroy_context、acl.rt.reset_device接口依次释放运行管理资源,包括Stream、Context、Device。
- 调用acl.finalize接口实现ACL去初始化。
示例代码
您可以从样例介绍中查看完整样例代码。
在acl_resnet50_async样例中:
- 运行可执行文件,不带参数时:
- 执行模型异步推理的次数默认为10次;
- device id默认使用 编号为 0 的设备
- callback间隔默认为1,表示1次异步推理后,下发一次callback任务;
- 内存池中的内存块的个数默认为10个。
- 运行可执行文件,带参数时:
- 第一个参数表示使用设备ID;
- 第二个参数表示执行模型异步推理的次数;
- 第三个参数表示下发callback间隔,参数值为0时表示不下发callback任务,参数值为非0值(例如m)时表示m次异步推理后下发一次callback任务;
- 第四个参数表示内存池中内存块的个数,内存块个数需大于等于模型异步推理的次数。用户可根据输入图片数量,来调整内存块的个数,例如有2张输入图片、内存块个数为2时,则1张图片1个内存块;例如有3张输入图片、内存块个数为10时,则执行10次循环,每(10/3取整)个内存块对应同一张图片,剩下1个内存块随机对应1张图片。内存块中存放是模型推理的输入数据、输出数据,若多个内存块对应的是同一张图片,则多个内存块中存放的是相同的输入数据、输出数据,用于输入图片少但又想模拟大量图片数据的场景。
- 第五个参数表示要加载执行的模型的路径
- 第六个参数表示输入的图片路径。支持图片格式如下:
IMG_EXT = ['.jpg', '.JPG', '.png', '.PNG', '.bmp', '.BMP', '.jpeg', '.JPEG']
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
import acl import argparse # ...... images_list = [os.path.join(args.images_path, img) for img in os.listdir(args.images_path) \ if os.path.splitext(img)[1] in IMG_EXT] data_list = [] for image in images_list: # 自定义函数transfer_pic,完成以下功能: # 加载图片到内存,并resize成 224x224尺寸 transfer_pic(image) dst_im = np.fromfile(image.replace(".jpg", ".bin"), dtype=np.byte) data_list.append(dst_im) # 1.资源初始化: 样例中通过Net类实现资源初始化操作 # 1.1 pyACL初始化 ret = acl.init() # 1.2 运行管理资源申请 ret = acl.rt.set_device(self.device_id) self.context, ret = acl.rt.create_context(self.device_id) self.stream, ret = acl.rt.create_stream() # 获取当前昇腾AI软件栈的运行模式,根据不同的运行模式,后续的内存申请、内存复制等接口调用方式不同 self.run_mode, ret = acl.rt.get_run_mode() # 1.3 申请模型推理资源 # 1.4 加载模型 # 加载离线模型文件,模型加载成功,返回标识模型的ID。 self.model_id, ret = acl.mdl.load_from_file(self.model_path) # 1.5 根据模型的ID,获取该模型的描述信息 self.model_desc = acl.mdl.create_desc() ret = acl.mdl.get_desc(self.model_desc, self.model_id) # 2.模型推理 # 2.1 根据输入参数内存池中内存块的个数申请内存,拷贝图片数据到device侧 def _data_interaction(self, images_dataset_list): for idx in range(self.memory_pool): img_idx = idx % len(images_dataset_list) img_input = self._load_input_data(images_dataset_list[img_idx]) infer_ouput = self._load_output_data() self.dataset_list.append([img_input, infer_ouput]) # 2.2 创建线程tid,并将该tid线程指定为处理Stream上回调函数的线程 # 其中ProcessCallback为线程函数,在该函数内调用acl.rt.process_report接口,等待指定时间后,触发回调函数处理 tid, ret = acl.util.start_thread(self._process_callback, [self.context, 50]) # 2.3 指定处理Stream上回调函数的线程 ret = acl.rt.subscribe_report(tid, self.stream) # 2.4 创建回调函数,用户处理模型推理的结果,由用户自行定义 def callback_func(self, delete_list): for temp in delete_list: _, infer_output = temp # device to host num = acl.mdl.get_dataset_num_buffers(infer_output) for i in range(num): temp_output_buf = acl.mdl.get_dataset_buffer(infer_output, i) infer_output_ptr = acl.get_data_buffer_addr(temp_output_buf) infer_output_size = acl.get_data_buffer_size(temp_output_buf) output_host, ret = acl.rt.malloc_host(infer_output_size) ret = acl.rt.memcpy(output_host, infer_output_size, infer_output_ptr, infer_output_size, ACL_MEMCPY_DEVICE_TO_HOST) output_host_dict = ["buffer": output_host, "size": infer_output_size}] result = self.get_result(output_host_dict) st = struct.unpack("1000f", bytearray(result[0])) vals = np.array(st).flatten() top_k = vals.argsort()[-1:-6:-1] print("\n======== top5 inference results: =============") for n in top_k: print("[%d]: %f" % (n, vals[n])) ret = acl.rt.free_host(output_host) # 2.5 自定义函数forward,执行模型推理 def forward(self): self.excute_dataset = [] for idx in range(self.excute_times): img_data, infer_output = self.dataset_list.pop(0) ret = acl.mdl.execute_async(self.model_id, img_data, infer_output, self.stream) if self.is_callback: self.excute_dataset.append([img_data, infer_output]) self._get_callback(idx) # 2.6 对于异步推理,需阻塞应用程序运行,直到指定Stream中的所有任务都完成 ret = acl.rt.synchronize_stream(self.stream) # 2.7 取消线程注册,Stream上的回调函数不再由指定线程处理 ret = acl.rt.unsubscribe_report(tid, self.stream) self.is_exist = True ret = acl.util.stop_thread(tid) # 3.释放运行管理资源 ret = acl.rt.destroy_stream(self.stream) ret = acl.rt.destroy_context(self.context) ret = acl.rt.reset_device(self.model_id) # 4.pyACL去初始化 ret = acl.finalize() # ...... |
父主题: 模型推理扩展场景