消息订阅
基本原理
BufferMessage消息订阅的关键接口调用流程如图1所示。
- 资源初始化。
与“普通消息 > 消息订阅 > 资源初始化”过程大致一样,必须先调用OpenHiva::Init接口进行初始化,此处不再赘述。
- 订阅消息。
- 用户在订阅Topic之前,必须要先创建节点句柄OpenHiva::Node n。
- 再使用创建的节点句柄n调用CreateSubscriber接口订阅指定的Topic。CreateSubscriber接口调用完成后,会返回Subscriber对象。
设置CreateSubscriber接口入参时,需注意:
- 订阅者CreateSubscriber接口的入参topicName必须和发布者CreatePublisher接口的topicName相同。
- 订阅者CreateSubscriber接口的入参groupName必须和OpenHiva::Init接口中设置的groupName相同。
CreateSubscriber接口内的消息订阅流程如下:
- 向工作线程注册指定Topic的回调函数,用于处理订阅队列中接收到的消息。
- 创建订阅队列,并向DataMaster注册队列ID、Topic等Subscriber信息。
- OpenHiva(DataMaster进程)接收到注册消息后,会查找订阅该Topic的所有订阅队列ID信息,并将其与发布队列ID绑定。
- 当发布消息到发布队列时,队列调度器会将消息从发布队列搬移到订阅队列,并向订阅端工作线程发送事件,激活工作线程,调用回调函数处理消息。
- 最后通过调用Subscriber对象的Ready接口,判断订阅者是否创建成功。
- 资源释放。
与“普通消息 > 消息订阅 > 资源释放”过程大致一样,此处不再赘述。
示例代码
基本场景下,BufferMessage消息订阅的关键步骤代码示例如下,仅供参考。用户创建回调线程时,线程组中ThreadGroup.scheduleType取值不同,OpenHiva内部的处理流程也不同。
- 非USER_DEFINED模式下,Subscriber可以自动起线程获取订阅消息。
- USER_DEFINED模式下,Subscriber不会自动起线程获取订阅消息,用户需调用OpenHiva::SpinOnce来获取订阅消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
#include <string> #include <iostream> #include <memory> #include <unistd.h> #include "open/subscriber.h" #include "open/init.h" #include "open/node.h" #include "open/buffer_message.h" #include "securec.h" uint32_t cnt = 0; void Usage() { std::cout << "Usage: pmupload test_create_buffer_sub nodeName topicName groupName bindType maxMsgSize queueSize blockNum overwrite queueFCFlag queueTTL transport" << std::endl; std::cout << "if argc==2, pub will use default value: pmupload test_create_buffer_sub testNodeBufferSub testTopic testGroupBufferSub 0 102400 10 10 1 0 0 1 dsf" << std::endl; std::cout << "bindType can be 0 1 2" << std::endl; } // 用户根据自身业务逻辑,定义回调函数用于处理订阅消息 void ChatterCallback(const OpenHiva::BufferMessage &msg) { void *currBlockPtr = nullptr; size_t dataSize = 0U; OpenHiva::HivaBuffer &HivaBuffer = const_cast<OpenHiva::HivaBuffer &>(msg.data); uint32_t ret = HivaBuffer.GetBuff(currBlockPtr, dataSize); // 获取内存地址dataPtr和dataLen if (ret != Hiva::HIVA_SUCCESS) { HIVA_ERROR("GetBuff failed and return %u", ret); return; } uint32_t dataLen = *reinterpret_cast<uint32_t *>(currBlockPtr); std::string str(reinterpret_cast<char *>(currBlockPtr) + 4, dataLen); HIVA_WARN("I heard :%s", str.c_str()); cnt++; return; } int main(int argc, char **argv) { // 1. 资源初始化 std::string nodeName = "testNodeSub"; std::string topicName = "testTopic"; // 订阅的话题名,要和发布者发布的话题名保持一致 std::string groupName = "testGroupSub"; // groupName要和Init接口里的ThreadGroup.groupName保持一致 OpenHiva::ScheduleType scheType = OpenHiva::ScheduleType(0); uint32_t maxMsgSize = 100 * 1024U; uint32_t queueSize = 10U; uint32_t blockNum = 10U; bool overwrite = false; bool queueFCFlag = false; uint16_t queueTTL = 100U; std::string transport; int argNum = 12; if (argc < argNum) { Usage(); if (argc != 2) { return 0; } } else { nodeName = argv[1]; topicName = argv[2]; groupName = argv[3]; scheType = (OpenHiva::ScheduleType)strtol(argv[4], NULL, 10); maxMsgSize = (uint32_t)strtol(argv[5], NULL, 10); queueSize = (uint32_t)strtol(argv[6], NULL, 10); blockNum = (uint32_t)strtol(argv[7], NULL, 10); overwrite = (bool)(strtol(argv[8], NULL, 10) & 0x01); queueFCFlag = (bool)(strtol(argv[9], NULL, 10) & 0x01); queueTTL = (uint16_t)strtol(argv[10], NULL, 10); transport = argv[11]; } // 定义线程组 std::vector<OpenHiva::ScheduleGroup> scheGrpVec; OpenHiva::ScheduleGroup scheGrp; scheGrp.groupName = groupName; // 线程组名称,需要保证在每个进程内唯一 scheGrp.scheduleType = scheType; // 工作线程是否是确定性 scheGrpVec.push_back(scheGrp); // 调用资源初始化接口 OpenHiva::Init(argc, argv, scheGrpVec); HIVA_EVENT("subscriber init ok!"); // 2. 订阅消息 // 构造Node对象 OpenHiva::Node node(nodeName); // 构造TopicOptions OpenHiva::TopicOptions topicOps; topicOps.BuildGroupName(groupName) .BuildMessageTraits<OpenHiva::BufferMessage>() .BuildShmOptions(maxMsgSize, blockNum) .BuildQueueOptions(queueSize, overwrite, queueFCFlag, queueTTL) .BuildTopicName(topicName); // 通过NodeHandle对象调用Subscriber接口订阅Topic,返回Subscriber对象 std::function<void(OpenHiva::BufferMessage)> callBack = ChatterCallback; std::shared_ptr<OpenHiva::Subscriber> sub = node.CreateSubscriber(topicName, callBack, topicOps); // 判断Subscriber对象是否构造成功。若失败,调用Shutdown函数释放资源并退出 if ((sub == nullptr) || (!sub->Ready())) { HIVA_ERROR("create subscriber failed"); OpenHiva::Shutdown(); return 0; } // 3. 资源释放 // 判断Hiva节点状态。当节点是使能状态,返回true;当节点是shutdown或初始化失败状态,返回false,收发包均不能正确进行 while(OpenHiva::Ready()) { OpenHiva::SpinOnce(groupName) // USER_DEFINED模式下调用SpinOnce,非USER_DEFINED模式下无需调用SpinOnce sleep(1); } // 阻塞式调用,防止进程主动退出 OpenHiva::WaitForShutdown(); return 0; } |