BufferMessage消息发布的关键接口调用流程如图1所示。
与“普通消息 > 消息发布 > 资源初始化”过程大致一样,必须先调用OpenHiva::Init接口进行初始化,此处不再赘述。
由于从Camera等传感器获取的数据量非常大且非结构化,其传输格式需遵循固定的消息格式BufferMessage:
调用CreatePublisher接口时,OpenHiva内部无需创建内存池,直接使用对应传感器驱动申请的共享内存进行传递,因此maxMsgSize参数可以设为0。
用户使用步骤2中返回的Publisher对象调用其Publish接口发布消息之前,还需要申请HivaBuffer,并按照要求向HivaBuffer中填充数据,关键流程如下:
完成上述步骤之后,用户就可以通过调用Publish接口发布BufferMessage消息。
与“普通消息 > 消息发布 > 资源释放”过程大致一样,此处不再赘述。
基本场景下,BufferMessage消息发布的关键步骤代码示例如下,仅供参考:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
#include <string> #include <iostream> #include <memory> #include <sstream> #include "open/publisher.h" #include "open/init.h" #include "open/node.h" #include "open/hiva_buffer_pool.h" #include "open/buffer_message.h" #include "hiva_time/hiva_rate.h" void Usage() { std::cout << "Usage: pmupload test_create_buffer_pub nodeName topicName groupName bindType maxMsgSize queueSize blockNum overwrite queueFCFlag queueTTL rate msgCnt transport" << std::endl; std::cout << "if argc==2, pub will use default value: pmupload test_create_buffer_pub testNodeBufferPub testTopic testGroupBufferPub 0 102400 10 10 1 0 0 1 10 dsf" << std::endl; std::cout << "bindType can be 0 1 2" << std::endl; } int main(int argc, char **argv) { // 1. 资源初始化 std::string nodeName = "testNodePub"; std::string topicName = "testTopic"; std::string groupName = "testGroupPub"; OpenHiva::ScheduleType scheType = OpenHiva::ScheduleType(0); uint32_t maxMsgSize = 100 * 1024U; uint32_t queueSize = 10U; uint32_t blockNum = 10U; bool overwrite = true; bool queueFCFlag = false; uint16_t queueTTL = 0U; uint32_t rate = 1U; uint32_t msgCnt = 10U; std::string transport; int argNum = 14; if (argc < argNum) { Usage(); if (argc != 2) { return 0; } } else { nodeName = argv[1]; topicName = argv[2]; groupName = argv[3]; scheType = (OpenHiva::ScheduleType)strtol(argv[4], NULL, 10); maxMsgSize = (uint32_t)strtol(argv[5], NULL, 10); queueSize = (uint32_t)strtol(argv[6], NULL, 10); blockNum = (uint32_t)strtol(argv[7], NULL, 10); overwrite = (bool)(strtol(argv[8], NULL, 10) & 0x01); queueFCFlag = (bool)(strtol(argv[9], NULL, 10) & 0x01); queueTTL = (uint16_t)strtol(argv[10], NULL, 10); rate = (uint32_t)strtol(argv[11], NULL, 10); msgCnt = (uint32_t)strtol(argv[12], NULL, 10); transport = argv[13]; } // 定义线程组 std::vector<OpenHiva::ScheduleGroup> scheGrpVec; // 调用资源初始化接口 OpenHiva::Init(argc, argv, scheGrpVec); HIVA_EVENT("publisher init ok!"); // 2. 注册发布消息 // 构造Node对象 OpenHiva::Node node(nodeName); // 构造TopicOptions OpenHiva::TopicOptions topicOps; topicOps.BuildGroupName(groupName) .BuildMessageTraits<OpenHiva::BufferMessage>() .BuildShmOptions(maxMsgSize, blockNum) .BuildQueueOptions(queueSize, overwrite, queueFCFlag, queueTTL) .BuildTopicName(topicName); // 通过NodeHandle对象调用CreatePublisher接口发布Topic,返回Publisher对象 std::shared_ptr<OpenHiva::Publisher> pub = node.CreatePublisher<OpenHiva::BufferMessage>(topicName, topicOps); // 判断Publisher对象是否构造成功。若失败,调用Shutdown函数释放资源并退出 if ((pub == nullptr) || (!pub->Ready())) { HIVA_ERROR("create publisher failed"); OpenHiva::Shutdown(); return 0; } HIVA_EVENT("create publisher success"); // 3. 发布消息 uint32_t count = 0U; Hiva::HivaRate interval(rate); // 初始化内存池 OpenHiva::HivaBufferPool HivaBufferPool; uint32_t ret = HivaBufferPool.InitMemoryPool(topicName, maxMsgSize, queueSize*2); if (ret != Hiva::HIVA_SUCCESS) { HIVA_ERROR("create buff pool failed"); } else { HIVA_INFO("create pool success"); } // 判断Hiva节点状态。当节点是使能状态,返回true;当节点是shutdown或初始化失败状态,返回false,收发包均不能正确进行 while(OpenHiva::Ready()) { std::stringstream ss; ss << "talker: ==>Hello World " << count; OpenHiva::HivaBuffer Hivabuffer; // 申请HivaBuffer ret = HivaBufferPool.Allocate(Hivabuffer); if (ret != Hiva::HIVA_SUCCESS){ HIVA_ERROR("alloc new buff failed, return code is %u", ret); continue; } else { HIVA_INFO("create mbuf success"); } void *currBlockPtr = nullptr; size_t dataSize = 0U; // 获取写入数据的内存地址和长度 ret = Hivabuffer.GetBuff(currBlockPtr, dataSize); if (ret != Hiva::HIVA_SUCCESS) { HIVA_ERROR("GetBuff failed and return %u", ret); continue; } *reinterpret_cast<uint32_t *>(currBlockPtr) = ss.str().size(); ret = memcpy_s(reinterpret_cast<uint8_t *>(currBlockPtr) + 4, dataSize - 4, ss.str().c_str(), ss.str().size()); if (ret != Hiva::HIVA_SUCCESS) { HIVA_ERROR("memcpy failed and return %u", ret); continue; } HIVA_WARN("%s", ss.str().c_str()); // 构造BufferMessage对象,同时发布BufferMessage消息 pub->Publish(OpenHiva::BufferMessage(Hivabuffer)); interval.Sleep(); ++count; if (count > msgCnt) { break; } } // 4. 资源释放 OpenHiva::WaitForShutdown(); return 0; } |