消息发布
基本原理
BufferMessage消息发布的关键接口调用流程如图1所示。
- 资源初始化。
与“普通消息 > 消息发布 > 资源初始化”过程大致一样,必须先调用OpenHiva::Init接口进行初始化,此处不再赘述。
- 注册发布消息。
由于从Camera等传感器获取的数据量非常大且非结构化,其传输格式需遵循固定的消息格式BufferMessage:
- 用户在注册发布信息之前,必须要先创建节点句柄OpenHiva::Node n。
- 再使用创建的节点句柄n调用CreatePublisher接口向DataMaster注册要发布的Topic。CreatePublisher接口调用完成后,会返回Publisher对象。
- 最后通过调用Publisher对象的Ready接口,判断发布者是否创建成功。
调用CreatePublisher接口时,OpenHiva内部无需创建内存池,直接使用对应传感器驱动申请的共享内存进行传递,因此maxMsgSize参数可以设为0。
- 发布消息。
用户使用步骤2中返回的Publisher对象调用其Publish接口发布消息之前,还需要申请HivaBuffer,并按照要求向HivaBuffer中填充数据,关键流程如下:
- 调用HivaBufferPool::InitMemoryPool接口初始化内存池,并输入内存池名字、每个内存块大小、内存块个数。
- 调用HivaBufferPool::Allocate接口从内存池申请一块HivaBuffer,HivaBuffer内存大小与内存池内存块大小一致。
- 调用HivaBuffer::GetBuff接口获取内存地址dataPtr和dataLen。
- 向地址dataPtr写入数据,注意长度不可超过dataLen。
- 写完数据后,调用HivaBuffer::SetBuffDataLen接口设置已写入的数据长度。
- (可选)如果用户需要将一些私有数据随消息一起传输,可以通过HivaBuffer::SetUserData接口设置。
- 调用OpenHiva::BufferMessage接口构造BufferMessage对象。
完成上述步骤之后,用户就可以通过调用Publish接口发布BufferMessage消息。
- 资源释放。
与“普通消息 > 消息发布 > 资源释放”过程大致一样,此处不再赘述。
示例代码
基本场景下,BufferMessage消息发布的关键步骤代码示例如下,仅供参考:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
#include <string> #include <iostream> #include <memory> #include <sstream> #include "open/publisher.h" #include "open/init.h" #include "open/node.h" #include "open/hiva_buffer_pool.h" #include "open/buffer_message.h" #include "hiva_time/hiva_rate.h" void Usage() { std::cout << "Usage: pmupload test_create_buffer_pub nodeName topicName groupName bindType maxMsgSize queueSize blockNum overwrite queueFCFlag queueTTL rate msgCnt transport" << std::endl; std::cout << "if argc==2, pub will use default value: pmupload test_create_buffer_pub testNodeBufferPub testTopic testGroupBufferPub 0 102400 10 10 1 0 0 1 10 dsf" << std::endl; std::cout << "bindType can be 0 1 2" << std::endl; } int main(int argc, char **argv) { // 1. 资源初始化 std::string nodeName = "testNodePub"; std::string topicName = "testTopic"; std::string groupName = "testGroupPub"; OpenHiva::ScheduleType scheType = OpenHiva::ScheduleType(0); uint32_t maxMsgSize = 100 * 1024U; uint32_t queueSize = 10U; uint32_t blockNum = 10U; bool overwrite = true; bool queueFCFlag = false; uint16_t queueTTL = 0U; uint32_t rate = 1U; uint32_t msgCnt = 10U; std::string transport; int argNum = 14; if (argc < argNum) { Usage(); if (argc != 2) { return 0; } } else { nodeName = argv[1]; topicName = argv[2]; groupName = argv[3]; scheType = (OpenHiva::ScheduleType)strtol(argv[4], NULL, 10); maxMsgSize = (uint32_t)strtol(argv[5], NULL, 10); queueSize = (uint32_t)strtol(argv[6], NULL, 10); blockNum = (uint32_t)strtol(argv[7], NULL, 10); overwrite = (bool)(strtol(argv[8], NULL, 10) & 0x01); queueFCFlag = (bool)(strtol(argv[9], NULL, 10) & 0x01); queueTTL = (uint16_t)strtol(argv[10], NULL, 10); rate = (uint32_t)strtol(argv[11], NULL, 10); msgCnt = (uint32_t)strtol(argv[12], NULL, 10); transport = argv[13]; } // 定义线程组 std::vector<OpenHiva::ScheduleGroup> scheGrpVec; // 调用资源初始化接口 OpenHiva::Init(argc, argv, scheGrpVec); HIVA_EVENT("publisher init ok!"); // 2. 注册发布消息 // 构造Node对象 OpenHiva::Node node(nodeName); // 构造TopicOptions OpenHiva::TopicOptions topicOps; topicOps.BuildGroupName(groupName) .BuildMessageTraits<OpenHiva::BufferMessage>() .BuildShmOptions(maxMsgSize, blockNum) .BuildQueueOptions(queueSize, overwrite, queueFCFlag, queueTTL) .BuildTopicName(topicName); // 通过NodeHandle对象调用CreatePublisher接口发布Topic,返回Publisher对象 std::shared_ptr<OpenHiva::Publisher> pub = node.CreatePublisher<OpenHiva::BufferMessage>(topicName, topicOps); // 判断Publisher对象是否构造成功。若失败,调用Shutdown函数释放资源并退出 if ((pub == nullptr) || (!pub->Ready())) { HIVA_ERROR("create publisher failed"); OpenHiva::Shutdown(); return 0; } HIVA_EVENT("create publisher success"); // 3. 发布消息 uint32_t count = 0U; Hiva::HivaRate interval(rate); // 初始化内存池 OpenHiva::HivaBufferPool HivaBufferPool; uint32_t ret = HivaBufferPool.InitMemoryPool(topicName, maxMsgSize, queueSize*2); if (ret != Hiva::HIVA_SUCCESS) { HIVA_ERROR("create buff pool failed"); } else { HIVA_INFO("create pool success"); } // 判断Hiva节点状态。当节点是使能状态,返回true;当节点是shutdown或初始化失败状态,返回false,收发包均不能正确进行 while(OpenHiva::Ready()) { std::stringstream ss; ss << "talker: ==>Hello World " << count; OpenHiva::HivaBuffer Hivabuffer; // 申请HivaBuffer ret = HivaBufferPool.Allocate(Hivabuffer); if (ret != Hiva::HIVA_SUCCESS){ HIVA_ERROR("alloc new buff failed, return code is %u", ret); continue; } else { HIVA_INFO("create mbuf success"); } void *currBlockPtr = nullptr; size_t dataSize = 0U; // 获取写入数据的内存地址和长度 ret = Hivabuffer.GetBuff(currBlockPtr, dataSize); if (ret != Hiva::HIVA_SUCCESS) { HIVA_ERROR("GetBuff failed and return %u", ret); continue; } *reinterpret_cast<uint32_t *>(currBlockPtr) = ss.str().size(); ret = memcpy_s(reinterpret_cast<uint8_t *>(currBlockPtr) + 4, dataSize - 4, ss.str().c_str(), ss.str().size()); if (ret != Hiva::HIVA_SUCCESS) { HIVA_ERROR("memcpy failed and return %u", ret); continue; } HIVA_WARN("%s", ss.str().c_str()); // 构造BufferMessage对象,同时发布BufferMessage消息 pub->Publish(OpenHiva::BufferMessage(Hivabuffer)); interval.Sleep(); ++count; if (count > msgCnt) { break; } } // 4. 资源释放 OpenHiva::WaitForShutdown(); return 0; } |