昇腾社区首页
中文
注册

消息订阅

基本原理

普通消息订阅的关键接口调用流程如图1所示。

图1 普通消息订阅接口调用流程
  1. 资源初始化。

    用户在调用OpenHiva接口之前,需先调用OpenHiva::Init接口进行初始化,返回值为0代表初始化成功,否则失败。

    初始化动作主要包含节点名注册、线程组创建、资源申请等动作。

    • 节点名注册:
      • 不同节点的节点名称不能重复,否则会导致后面启动的节点初始化失败。
      • 如果初始化失败,节点不能正常执行发布或者订阅动作。
    • 线程组创建:

      调用OpenHiva::Init接口时需传入线程组参数,请提前按需创建线程组(ScheduleGroup)。每个线程组中存放回调线程(针对接收数据)的若干信息,包括线程组名字(groupName)、线程组调度类型(scheduleType)等。

      当线程组中ThreadGroup.scheduleType取值不同,OpenHiva内部的处理流程也不同:

      • UNBIND_AICPU(推荐):表示非确定域线程,线程组不绑核。每个组会启动一个工作线程,等待事件激活。工作线程具体在哪个CPU上运行由操作系统进行调度。
      • BIND_AICPU: 表示确定域线程,线程组绑核。每个组启动的工作线程数最大为4(具体由CPU核数决定),并绑在各个核上,每个核绑一个。其调度状态由事件调度机制来确定。这种机制实时性高,在很大程度上避免了由操作系统内核调度引起的时间抖动,保证了线程从休眠到运行状态切换的“确定性”。
      • USER_DEFINED:表示不启动工作线程,这种类型的group不会被框架调度,需要与OpenHiva::SpinOnce配合使用。当用户调用OpenHiva::SpinOnce时,groupName要和ScheduleGroup.groupName一致,此接口在用户的调用线程中处理消息。
  2. 订阅消息。
    1. 用户在订阅Topic之前,必须要先创建节点句柄OpenHiva::Node n。
    2. 再使用创建的节点句柄n调用CreateSubscriber接口订阅指定的Topic。CreateSubscriber接口调用完成后,会返回Subscriber对象。

      设置CreateSubscriber接口入参时,需注意:

      • 订阅者CreateSubscriber接口的入参topicName必须和发布者CreatePublisher接口的topicName相同。
      • 订阅者CreateSubscriber接口的入参groupName必须和OpenHiva::Init接口中设置的groupName相同。

      CreateSubscriber接口内的消息订阅流程如下:

      1. 向工作线程注册指定Topic的回调函数,用于处理订阅队列中接收到的消息。
      2. 创建订阅队列,并向DataMaster注册队列ID、Topic等Subscriber信息。
      3. OpenHiva(DataMaster进程)接收到注册消息后,会查找订阅该Topic的所有订阅队列ID信息,并将其与发布队列ID绑定。
      4. 当发布消息到发布队列时,队列调度器会将消息从发布队列搬移到订阅队列,并向订阅端工作线程发送事件,激活工作线程,调用回调函数处理消息。
    3. 最后通过调用Subscriber对象的Ready接口,判断订阅者是否创建成功。
  3. 资源释放。
    进程结束前需释放相应资源,定义的OpenHiva接口将无法使用。
    • 异常分支中,需主动调用OpenHiva::Shutdown接口释放资源,包括注销队列ID/Topic等信息、释放对应的内存块等。
    • 主线程中,需主动调用OpenHiva::WaitForShutdown接口释放资源,防止主线程提前退出。当进程异常终止时,WaitForShutdown接口内部会自动调用Shutdown接口进行资源清理。

示例代码

代码中公共的、相对稳定的参数(如队列大小queueSize),可以通过配置管理模块进行统一管理,方便后续维护,具体请参见配置管理

普通消息订阅的关键步骤代码示例如下,仅供参考。用户创建回调线程时,线程组中ThreadGroup.scheduleType取值不同,OpenHiva内部的处理流程也不同。

  • 非USER_DEFINED模式下,Subscriber可以自动起线程获取订阅消息。
  • USER_DEFINED模式下,Subscriber不会自动起线程获取订阅消息,用户需调用OpenHiva::SpinOnce来获取订阅消息。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include <thread>
#include <unistd.h>
#include <string>
#include <iostream>
#include <memory>
#include "open/subscriber.h"
#include "open/init.h"
#include "open/node.h"
#include "std_msgs/include/StringMessage.h"

// 用户根据自身业务逻辑,定义回调函数用于处理订阅消息
void ChatterCallback(const Hiva::StdMgs::StringMessage msg)
{
    HIVA_WARN("I heard    :%s", msg.stringData.c_str());
}

int main(int argc, char **argv)
{
    // 1. 资源初始化   
    uint32_t queueSize = 10;            // 数据发布队列长度,最大128
    uint32_t maxMsgSize = 20000;       // 每帧消息的最大大小,单位字节
    uint32_t blockNum = 20;            // 共享内存池中block的个数,默认为queueSize * 2,最大256
    uint32_t queueSize = 10U;          // 队列长度
    bool overwrite = true;            // 是否写覆盖
    bool queueFCFlag = false;        // 是否开启队列流控 
    uint16_t queueTTL = 0U;          // 流控的时间差
    const std::string groupName = testGroup.groupName;   // groupName要和Init接口里的ThreadGroup.groupName保持一致
    const std::string topicName = "/chatter";           // 订阅的话题名,要和发布者发布的话题名保持一致
    // 定义线程组
    std::vector<OpenHiva::ScheduleGroup> threadGroup;
    OpenHiva::ScheduleGroup testGroup;
    testGroup.groupName =  "charter_listener";                       // 线程组名称,需要保证在每个进程内唯一
    testGroup.scheduleType = OpenHiva::ScheduleType::UNBIND_AICPU; // 工作线程是否是确定性
    threadGroup.push_back(testGroup);
    // 调用资源初始化接口
    OpenHiva::Init(argc, argv, threadGroup); 
    HIVA_EVENT("Listener init ok!");

    // 2. 订阅消息
    // 构造Node对象
    OpenHiva::Node n("/listener");
    // 构造TopicOptions  
    OpenHiva::TopicOptions topicOps;
    topicOps.BuildGroupName(groupName)
            .BuildMessageTraits<Hiva::StdMsgs::StringMessage>()
            .BuildShmOptions(maxMsgSize, blockNum)
            .BuildQueueOptions(queueSize, overwrite, queueFCFlag, queueTTL)
            .BuildTopicName(topicName);
    // 通过NodeHandle对象调用Subscriber接口订阅Topic,返回Subscriber对象
    std::function<void(Hiva::StdMgs::StringMessage)> callBack = ChatterCallback;
    std::shared_ptr<OpenHiva::Subscriber> sub = n.CreateSubscriber(topicName, callBack, topicOps);
    // 判断Subscriber对象是否构造成功。若失败,调用Shutdown函数释放资源并退出
    if ((sub == nullptr) || !sub->Ready()) {
        HIVA_ERROR("sample_listener: sub.Ok() is false!");
        OpenHiva::Shutdown();    
        return 0;
}  
    // 3. 资源释放
    // 判断Hiva节点状态。当节点是使能状态,返回true;当节点是shutdown或初始化失败状态,返回false,收发包均不能正确进行
    while (OpenHiva::Ready()) {  
       OpenHiva::SpinOnce(testGroup.groupName);   // USER_DEFINED模式下调用SpinOnce,非USER_DEFINED模式下无需调用SpinOnce
        sleep(1);
        std::cout<<"I'm listerner"<<std::endl;
    }
    // 阻塞式调用,防止进程主动退出,在对进程执行Ctrl+C或kill-2时函数会立刻返回
    OpenHiva::WaitForShutdown(); 
    return 0;
}