消息内存由OpenHiva申请和释放,用户只需关注业务逻辑,代码逻辑相对简单。
本章将以消息的发布、订阅场景为例,介绍普通消息的使用方式。
消息内存由用户申请和释放,并自行管理内存中的数据,消息需要遵循固定的BufferMessage格式。框架会从发布的消息中获取用户申请好的内存地址,在发布过程中如果出错,框架会自动释放该内存。如果消息发布成功,内存将由订阅端自动释放,用户无需关心该内存的释放情况。
BufferMessage通常适用于图像消息通信场景,尤其是从Lidar、Camera等传感器获取的数据,数据量较大,为减少图像数据的拷贝,使用BufferMessage消息可提高传输性能。
本章将先以消息发布、订阅的基本场景为例,介绍BufferMessage消息(基本场景)的使用方式;然后以推理场景为例,结合NN Engine,从获取数据、推理、发布推理结果介绍BufferMessage消息(异步推理场景)的使用方式。
NN Engine实际上是节点内部的实现,首先在APP里面加载推理模型,输入原始Camera数据后,推理的结果支持以Topic形式和BufferMessage的数据格式发布出去。
无论是哪种消息类型,消息发布和订阅的原理基本一致,关键流程如图1所示。其中Talker作为发布者发布话题,Listener作为订阅者订阅话题,两者通过DataMaster(中心节点)建立管道进行通信,QueueScheduler作为消息队列调度器。
TopicOptions配置优先级:yaml文件 > 参数输入 > 默认值。
yaml文件配置示例如下,具体参数说明参见表1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
testTopic: transport: [Shm, Dsf] groupName: test_group blockSize: 102400 blockNum: 10 Dsf: queueDepth: 5 overwrite: false queueFCFlag: false queueTTL: 0 Dds: history: DDS_QOS_HISTORY_POLICY_KEEP_LAST depth: 5 reliability: DDS_QOS_RELIABILITY_POLICY_BEST_EFFORT durability: DDS_QOS_DURABILITY_POLICY_VOLATILE deadline: second: 100 nanosecond: lifespan: second: 10 nanosecond: 10 liveliness: DDS_QOS_LIVELINESS_POLICY_AUTOMATIC lease_duration: second: nanosecond: |
参数分类 |
参数名 |
参数含义 |
---|---|---|
基础参数 |
testTopic |
指明了需要配置参数的话题名。 |
transport |
配置话题对应的通信方式,可以指定一个或者多个,需要包含在“[]”内部。
说明:
当前支持的通信方式有Dsf、Vfs、Shm、Dds。如果配置的通信方式不合法,在service情况下会默认使用为Shm,管理面会默认使用Vfs,数据面会默认使用Dsf。 |
|
groupName |
指明了Topic所属的group名 。 |
|
blockSize |
指内存块大小。Dsf通信、Shm通信和Dds通信(单设备共享内存通信)会根据blockSize申请内存资源。 |
|
blockNum |
指内存块数量,使用Dsf通信或Shm通信时会在初始化时申请多个内存块,每个都是blockSize大小,但要满足blockSize*blockNum<300M。Dds通信(单设备共享内存通信)中blockSize*blockNum接近或小于消息大小会导致丢包。 |
|
Dsf |
queueDepth |
指通信时缓存消息的队列长度。 |
overwrite |
是否写覆盖,其值只能为true或者false:
|
|
queueFCFlag |
是否队列流控,其值只能为true或者false:
|
|
queueTTL |
指队列流控时间(ms),仅当Dsf/queueFCFlag为true时生效。当queueTTL=0,接收队列最多缓存一帧数据;当queueTTL>0,接收队列消息出队时会计算消息从入队到出队的时间差,一旦超出queueTTL范围,该消息会被丢弃。 |
|
Dds
说明:
|
history |
设置DDS消息保存策略,支持2种:
|
depth |
设置队列长度,仅当history设置成DDS_QOS_HISTORY_POLICY_KEEP_LAST时,该参数才能生效。 |
|
reliability |
设置消息可靠性,支持2种:
|
|
durability |
设置消息的持久性,支持2种:
|
|
deadline |
设置消息发送到接收方的有效时间。消息被发出来后,接收方检查该有效时间,超时则认为该消息无效而被丢弃。
总时间为两者时间相加。 |
|
lifespan |
设置消息在系统里的有效时间。
总时间为两者时间相加。 |
|
liveliness |
设置datareader、datawriter的生命周期检测策略,支持2种:
|
|
lease_duration |
订阅端、发布端有效性检测的时间。超过该时候应答心跳信号则认为该订阅端或发布端已失效。
总时间为两者时间相加。需要与DDS_QOS_LIVELINESS_POLICY_MANUAL_BY_TOPIC配合使用。 |
策略项 |
发布端 |
订阅端 |
是否兼容 |
---|---|---|---|
reliability策略兼容性 |
Best effort |
Best effort |
兼容 |
Best effort |
Reliable |
不兼容 |
|
Reliable |
Best effort |
兼容 |
|
Reliable |
Reliable |
兼容 |
|
durability策略兼容性 |
Volatile |
Volatile |
兼容 |
Volatile |
Transient local |
不兼容 |
|
Transient local |
Volatile |
兼容 |
|
Transient local |
Transient local |
兼容 |
|
deadline策略兼容性 |
系统默认值(QOS_DURATION_MAX) |
系统默认值(QOS_DURATION_MAX) |
兼容 |
系统默认值(QOS_DURATION_MAX) |
x |
不兼容 |
|
x |
系统默认值(QOS_DURATION_MAX) |
兼容 |
|
x |
x |
兼容 |
|
x |
y (y > x) |
兼容 |
|
x |
y (y < x) |
不兼容 |
|
lifespan策略兼容性 |
NA |
NA |
NA |
liveliness策略兼容性 |
Automatic |
Automatic |
兼容 |
Automatic |
Manual by topic |
不兼容 |
|
Manual by topic |
Automatic |
兼容 |
|
Manual by topic |
Manual by topic |
兼容 |
|
lease duration策略兼容性 |
系统默认值(QOS_DURATION_MAX) |
系统默认值(QOS_DURATION_MAX) |
兼容 |
系统默认值(QOS_DURATION_MAX) |
x |
不兼容 |
|
x |
系统默认值(QOS_DURATION_MAX) |
兼容 |
|
x |
x |
兼容 |
|
x |
y (y > x) |
兼容 |
|
x |
y (y < x) |
不兼容 |
|
说明:假设x、y为有效数值,取值范围为[0, QOS_DURATION_MAX]。 |