用户在调用OpenHiva接口之前,需先调用OpenHiva::Init接口进行初始化,返回值为0代表初始化成功,否则失败。
初始化动作主要包含节点名注册、线程组创建、资源申请等动作。
调用OpenHiva::Init接口时需传入线程组参数,请提前按需创建线程组(ScheduleGroup)。每个线程组中存放回调线程(针对接收数据)的若干信息,包括线程组名字(groupName)、线程组调度类型(scheduleType)等。
当线程组中ThreadGroup.scheduleType取值不同,OpenHiva内部的处理流程也不同:
设置CreateSubscriber接口入参时,需注意:
CreateSubscriber接口内的消息订阅流程如下:
普通消息订阅的关键步骤代码示例如下,仅供参考。用户创建回调线程时,线程组中ThreadGroup.scheduleType取值不同,OpenHiva内部的处理流程也不同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
#include <thread> #include <unistd.h> #include <string> #include <iostream> #include <memory> #include "open/subscriber.h" #include "open/init.h" #include "open/node.h" #include "std_msgs/include/StringMessage.h" // 用户根据自身业务逻辑,定义回调函数用于处理订阅消息 void ChatterCallback(const Hiva::StdMgs::StringMessage msg) { HIVA_WARN("I heard :%s", msg.stringData.c_str()); } int main(int argc, char **argv) { // 1. 资源初始化 uint32_t queueSize = 10; // 数据发布队列长度,最大128 uint32_t maxMsgSize = 20000; // 每帧消息的最大大小,单位字节 uint32_t blockNum = 20; // 共享内存池中block的个数,默认为queueSize * 2,最大256 uint32_t queueSize = 10U; // 队列长度 bool overwrite = true; // 是否写覆盖 bool queueFCFlag = false; // 是否开启队列流控 uint16_t queueTTL = 0U; // 流控的时间差 const std::string groupName = testGroup.groupName; // groupName要和Init接口里的ThreadGroup.groupName保持一致 const std::string topicName = "/chatter"; // 订阅的话题名,要和发布者发布的话题名保持一致 // 定义线程组 std::vector<OpenHiva::ScheduleGroup> threadGroup; OpenHiva::ScheduleGroup testGroup; testGroup.groupName = "charter_listener"; // 线程组名称,需要保证在每个进程内唯一 testGroup.scheduleType = OpenHiva::ScheduleType::UNBIND_AICPU; // 工作线程是否是确定性 threadGroup.push_back(testGroup); // 调用资源初始化接口 OpenHiva::Init(argc, argv, threadGroup); HIVA_EVENT("Listener init ok!"); // 2. 订阅消息 // 构造Node对象 OpenHiva::Node n("/listener"); // 构造TopicOptions OpenHiva::TopicOptions topicOps; topicOps.BuildGroupName(groupName) .BuildMessageTraits<Hiva::StdMsgs::StringMessage>() .BuildShmOptions(maxMsgSize, blockNum) .BuildQueueOptions(queueSize, overwrite, queueFCFlag, queueTTL) .BuildTopicName(topicName); // 通过NodeHandle对象调用Subscriber接口订阅Topic,返回Subscriber对象 std::function<void(Hiva::StdMgs::StringMessage)> callBack = ChatterCallback; std::shared_ptr<OpenHiva::Subscriber> sub = n.CreateSubscriber(topicName, callBack, topicOps); // 判断Subscriber对象是否构造成功。若失败,调用Shutdown函数释放资源并退出 if ((sub == nullptr) || !sub->Ready()) { HIVA_ERROR("sample_listener: sub.Ok() is false!"); OpenHiva::Shutdown(); return 0; } // 3. 资源释放 // 判断Hiva节点状态。当节点是使能状态,返回true;当节点是shutdown或初始化失败状态,返回false,收发包均不能正确进行 while (OpenHiva::Ready()) { OpenHiva::SpinOnce(testGroup.groupName); // USER_DEFINED模式下调用SpinOnce,非USER_DEFINED模式下无需调用SpinOnce sleep(1); std::cout<<"I'm listerner"<<std::endl; } // 阻塞式调用,防止进程主动退出,在对进程执行Ctrl+C或kill-2时函数会立刻返回 OpenHiva::WaitForShutdown(); return 0; } |