昇腾社区首页
中文
注册

BufferMessage消息(异步推理场景)

场景说明

机器人业务使用AI模型对数据进行推理时,可采用异步推理方式。推理引擎(NN Engine)从输入队列中获取数据、执行推理、并将推理结果写入输出队列。

为了提供一致的使用形式,OpenHiva支持通过Topic的形式将数据提供给推理引擎,以订阅的形式获取推理结果,并调用处理函数进行后处理,最终以发布的形式发布推理结果。

本章节针对推理场景,结合NN Engine,以推理消息的收发过程为例,介绍BufferMessage消息调用流程。

基本原理

推理场景下,推理消息的收发内部实现原理与BufferMessage消息(基本场景)中“消息订阅-消息发布”的Topic传递方式略有不同,主要区别在于新增了Hiva::CreateNnEngine接口。用户需要在初始化阶段向该接口传入模型的输入消息名和模型的输出消息名。接口内部会将根据输入/输出消息,自动申请和创建接收队列,用于接收需要推理的消息(NN的输入);自动创建发布队列,用于存放推理后的结果(NN的输出)。当用户注册了处理该结果的回调函数,一旦NN推理完成,将调用该回调函数,用户可以在回调函数中添加逻辑用于后处理(postprocess)。

关于NN Engine推理的关键接口调用流程如图1所示。

图1 NN Engine推理接口调用流程
  1. 资源初始化。

    用户在调用OpenHiva接口之前,需先调用OpenHiva::Init接口进行初始化,返回值为0代表初始化成功,否则失败。

    初始化动作主要包含节点名注册、线程组创建、资源申请等动作。

    • 节点名注册:
      • 不同节点的节点名称不能重复,否则会导致后面启动的节点初始化失败。
      • 如果初始化失败,节点不能正常执行发布或者订阅动作。
    • 线程组创建:

      调用OpenHiva::Init接口时需传入线程组参数,请提前按需创建线程组(ScheduleGroup)。每个线程组中存放回调线程(针对接收数据)的若干信息,包括线程组名字(groupName)、线程组调度类型(scheduleType)等。

      当线程组中ThreadGroup.scheduleType取值不同,OpenHiva内部的处理流程也不同:

      • UNBIND_AICPU(推荐):表示非确定域线程,线程组不绑核。每个组会启动一个工作线程,等待事件激活。工作线程具体在哪个CPU上运行由操作系统进行调度。
      • BIND_AICPU: 表示确定域线程,线程组绑核。每个组启动的工作线程数最大为4(具体由CPU核数决定),并绑在各个核上,每个核绑一个。其调度状态由事件调度机制来确定。这种机制实时性高,在很大程度上避免了由操作系统内核调度引起的时间抖动,保证了线程从休眠到运行状态切换的“确定性”。
      • USER_DEFINED:表示不启动工作线程,这种类型的group不会被框架调度,需要与OpenHiva::SpinOnce配合使用。当用户调用OpenHiva::SpinOnce时,groupName要和ScheduleGroup.groupName一致,此接口在用户的调用线程中处理消息。
  2. NN推理。
    • 在推理前,请准备好能直接在昇腾AI处理器上做推理的om模型。如果是开源框架的网络模型(如Caffe、TensorFlow等),请先使用ATC(Ascend Tensor Compiler)工具将网络模型转换为适配昇腾AI处理器的*.om文件。关于ATC工具的使用说明,请参见《ATC离线模型编译工具用户指南》
    • 用户必须为outTopicInfo中的每一个Topic注册回调函数处理消息,如果不处理该Topic,则也需要为其注册一个空的回调函数。
    1. 调用CreateNnEngine接口执行推理。推理过程中,框架内部会加载modelPath指定的模型文件、订阅inTopicInfo指定的Topic。推理完成后,框架内部会发布outTopicInfo指定的Topic。
    2. 调用CreateSubscriber接口订阅推理结果,即outTopicInfo指定的Topic,用户可以自定义函数来处理推理结果。
  3. 资源释放。

    进程结束前需释放相应资源,定义的OpenHiva接口将无法使用。

    1. 释放NN Engine资源。用户需主动调用DestroyNnEngine接口释放资源,该接口的engineName参数值需要和CreateNnEngine接口的engineName参数值保持一致。
    2. 释放框架资源。
      • 异常分支中,需主动调用OpenHiva::Shutdown接口释放资源,包括注销队列ID/Topic等信息、释放对应的内存块等。
      • 主线程中,需主动调用OpenHiva::WaitForShutdown接口释放资源,防止主线程提前退出。当进程异常终止时,WaitForShutdown接口内部会自动调用Shutdown接口进行资源清理。

示例代码

异步推理场景下,推理消息的收发关键步骤代码示例如下,仅供参考:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include "hiva.h"
#include "inner_topic/hiva_nn_topic.h"
#include "open/buffer_message.h"
#include "open/node.h"
#include "open/init.h"
#include "open/publisher.h"
#include "open/subscriber.h"
#include "open/hiva_buffer_pool.h"

const std::string nnInputTopic1 = "NN1BufferMsg";            // 接收camera1的ISP数据
const std::string nnInputTopic2 = "NN2BufferMsg";            // 接收camera2的ISP数据
const std::string nnOutputTopic1 = "NN1OutputBufferMsg";     // 输出camera1 ISP数据的推理结果
const std::string nnOutputTopic2 = "NN2OutputBufferMsg";     // 输出camera2 ISP数据的推理结果
const uint32_t inputTopicQueSize = 10;
const uint32_t outputTopicQueSize = 10;
const uint32_t groupId = 0;
const uint32_t priority = 0;
const std::string nnEngineName = "aclEngine1";              // engineName
const std::string modelPath = "/home/aclMode";              // om模型文件所在的路径
std::vector<std::shared_ptr<OpenHiva::Subscriber>> sub;

// 注册回调函数,用于处理模型推理的结果
void PerceptionCallback(const OpenHiva::BufferMessage &buffMsg)
{
    void *currBlockPtr = nullptr;
    size_t dataSize = 0U;
    OpenHiva::HivaBuffer &HivaBuffer = const_cast<OpenHiva::HivaBuffer &>(buffMsg.data);
    uint32_t ret = HivaBuffer.GetBuff(currBlockPtr, dataSize);        // 获取数据的内存地址dataPtr和dataLen
    if (ret != Hiva::HIVA_SUCCESS) {
        HIVA_ERROR("GetBuff failed and return %u", ret);
        return;
    }
    uint32_t dataLen = *reinterpret_cast<uint32_t *>(currBlockPtr);
    DoSthForNNaction();                                               // 用户自定义的处理函数,用于处理推理结果
    return;
}

// 自定义函数,用于推理节点初始化
uint32_t PerceptionNodeInit(OpenHiva::Node &node, const OpenHiva::TopicOptions &topicOps)
{
    uint32_t ret = HIVA_SUCCESS;
    std::vector<std::string> inputTopicVec;
    std::vector<std::string> outputTopicVec;
    inputTopicVec.push_back(nnInputTopic1);
    inputTopicVec.push_back(nnInputTopic2);
    outputTopicVec.push_back(nnOutputTopic1);
    outputTopicVec.push_back(nnOutputTopic2);
    Hiva::TopicInfo inTopicInfo = {
        .topicVec = inputTopicVec,
        .queueSize = inputTopicQueSize
    };
    Hiva::TopicInfo outTopicInfo = {
        .topicVec = outputTopicVec,
        .queueSize = outputTopicQueSize
    };
    // 创建NN引擎,加载指定模型文件,对inTopicInfo发布的数据执行推理,并将推理结果以outTopicInfo形式发布。
    ret = Hiva::CreateNnEngine(nnEngineName, modelPath, inTopicInfo, outTopicInfo);
    if (ret != HIVA_SUCCESS) {
        HIVA_ERROR("CreateNnEngine Fail. engineName:%s(%zu,%u,%zu,%u,%s).", nnEngineName.c_str(),
            inTopicInfo.topicVec.size(), inTopicInfo.queueSize, outTopicInfo.topicVec.size(), outTopicInfo.queueSize,
            modelPath.c_str());
        return ret;
    }
    // Subscribe接口中的groupName要和Init接口里的ThreadGroup.groupName保持一致
    const std::string groupName = "hivatest_perception";
    for (auto itr = outputTopicVec.begin(); itr != outputTopicVec.end(); itr++) {
        // 通过NodeHandle对象调用Subscriber接口订阅推理结果Topic,并根据outputTopic依次调用回调函数处理推理结果(本例使用相同的回调函数)
        sub.push_back(node.CreateSubscriber(*itr, &PerceptionCallback,topicOps));
    }
    return HIVA_SUCCESS;
}


int main(int argc, char **argv)
{
    // 1. 资源初始化
    std::string nodeName = "test_percetion";         
    std::string topicName = "testTopic";                   // 订阅的话题名,要和发布者发布的话题名保持一致
    std::string groupName = "hivatest_perception";        // groupName要和Init接口里的ThreadGroup.groupName保持一致
    OpenHiva::ScheduleType scheType = OpenHiva::ScheduleType(0);
    uint32_t maxMsgSize = 100 * 1024U;
    uint32_t queueSize = 10U;
    uint32_t blockNum = 10U;
    bool overwrite = false;
    bool queueFCFlag = false;
    uint16_t queueTTL = 100U;
    std::string transport;
    // 定义线程组
    std::vector<OpenHiva::ScheduleGroup> scheGrpVec;
    OpenHiva::ScheduleGroup scheGrp = scheType;
    scheGrp.groupName = groupName;
    scheGrp.scheduleType = scheType;
    scheGrpVec.push_back(scheGrp);
    // 调用资源初始化接口
    OpenHiva::Init(argc, argv, scheGrpVec);

    // 2. 推理
    // 构造Node对象
    OpenHiva::Node node(nodeName);
    // 构造TopicOptions  
    OpenHiva::TopicOptions topicOps;
    topicOps.BuildGroupName(groupName)
            .BuildMessageTraits<OpenHiva::BufferMessage>()
            .BuildShmOptions(maxMsgSize, blockNum)
            .BuildQueueOptions(queueSize, overwrite, queueFCFlag, queueTTL)
            .BuildTopicName(topicName);
    // 执行推理,订阅推理结果并后处理
    if (PerceptionNodeInit(node, topicOps) != HIVA_SUCCESS) {
        HIVA_ERROR("PerceptionNodeInit Error.");
        OpenHiva::Shutdown();
        return HIVA_FAILED;
    }
    while (OpenHiva::Ready()) {
        HIVA_INFO("PerceptionTest is Running.");
        sleep(1);
    }

    // 3. 释放资源
    if (Hiva::DestroyNnEngine(nnEngineName) != HIVA_SUCCESS) {
        HIVA_ERROR("DestroyNnEngine failed");
    }
    OpenHiva::WaitForShutdown();
    return 0;
}