总体说明

订阅-发布原理

OpenHiva支持基于话题(Topic)的通信方式,主要包括以下消息类型:
  • 普通消息:提供标准Topic进行消息的订阅、发布。

    消息内存由OpenHiva申请和释放,用户只需关注业务逻辑,代码逻辑相对简单。

    本章将以消息的发布、订阅场景为例,介绍普通消息的使用方式。

  • BufferMessage:提供HivaBuffer进行消息的订阅、发布。

    消息内存由用户申请和释放,并自行管理内存中的数据,消息需要遵循固定的BufferMessage格式。框架会从发布的消息中获取用户申请好的内存地址,在发布过程中如果出错,框架会自动释放该内存。如果消息发布成功,内存将由订阅端自动释放,用户无需关心该内存的释放情况。

    BufferMessage通常适用于图像消息通信场景,尤其是从Lidar、Camera等传感器获取的数据,数据量较大,为减少图像数据的拷贝,使用BufferMessage消息可提高传输性能。

    本章将先以消息发布、订阅的基本场景为例,介绍BufferMessage消息(基本场景)的使用方式;然后以推理场景为例,结合NN Engine,从获取数据、推理、发布推理结果介绍BufferMessage消息(异步推理场景)的使用方式。

    NN Engine实际上是节点内部的实现,首先在APP里面加载推理模型,输入原始Camera数据后,推理的结果支持以Topic形式和BufferMessage的数据格式发布出去。

无论是哪种消息类型,消息发布和订阅的原理基本一致,关键流程如图1所示。其中Talker作为发布者发布话题,Listener作为订阅者订阅话题,两者通过DataMaster(中心节点)建立管道进行通信,QueueScheduler作为消息队列调度器。

图1 消息订阅-发布流程图
  1. 发布端,通过CreatePublisher接口向DataMaster注册待发布Topic,以及Topic对应的TopicOptions属性,如队列深度、最大消息尺寸等。
  2. 订阅端,通过CreateSubscriber接口向DataMaster注册待订阅Topic、回调函数,以及Topic对应的TopicOptions属性,如队列深度、最大消息尺寸,Subscriber分组等。
  3. OpenHiva根据DataMatser接收到的注册队列信息,将发布队列ID/订阅队列ID绑定,并存储在QueueScheduler中。
  4. 发布端,通过Publish接口发布指定Topic的消息。
  5. 订阅端,通过QueueScheduler将发布队列的消息搬运到与之绑定的订阅队列中,并唤醒订阅节点的工作线程。
  6. 订阅端线程调用回调函数处理消息,获取发布的Topic消息。

TopicOptions参数说明

TopicOptions参数是Topic消息一系列属性值的集合,包括话题名称、话题通信方式、队列深度、消息尺寸等。目前支持3种TopicOptions参数配置方法:

TopicOptions配置优先级:yaml文件 > 参数输入 > 默认值。