昇腾社区首页
中文
注册

消息订阅

基本原理

BufferMessage消息订阅的关键接口调用流程如图1所示。

图1 BufferMessage消息订阅接口调用流程
  1. 资源初始化。

    与“普通消息 > 消息订阅 > 资源初始化”过程大致一样,必须先调用OpenHiva::Init接口进行初始化,此处不再赘述。

  2. 订阅消息。
    1. 用户在订阅Topic之前,必须要先创建节点句柄OpenHiva::Node n。
    2. 再使用创建的节点句柄n调用CreateSubscriber接口订阅指定的Topic。CreateSubscriber接口调用完成后,会返回Subscriber对象。

      设置CreateSubscriber接口入参时,需注意:

      • 订阅者CreateSubscriber接口的入参topicName必须和发布者CreatePublisher接口的topicName相同。
      • 订阅者CreateSubscriber接口的入参groupName必须和OpenHiva::Init接口中设置的groupName相同。

      CreateSubscriber接口内的消息订阅流程如下:

      1. 向工作线程注册指定Topic的回调函数,用于处理订阅队列中接收到的消息。
      2. 创建订阅队列,并向DataMaster注册队列ID、Topic等Subscriber信息。
      3. OpenHiva(DataMaster进程)接收到注册消息后,会查找订阅该Topic的所有订阅队列ID信息,并将其与发布队列ID绑定。
      4. 当发布消息到发布队列时,队列调度器会将消息从发布队列搬移到订阅队列,并向订阅端工作线程发送事件,激活工作线程,调用回调函数处理消息。
    3. 最后通过调用Subscriber对象的Ready接口,判断订阅者是否创建成功。
  3. 资源释放。

    与“普通消息 > 消息订阅 > 资源释放”过程大致一样,此处不再赘述。

示例代码

代码中公共的、相对稳定的参数(如队列大小queueSize),可以通过配置管理模块进行统一管理,方便后续维护,具体请参见配置管理

基本场景下,BufferMessage消息订阅的关键步骤代码示例如下,仅供参考。用户创建回调线程时,线程组中ThreadGroup.scheduleType取值不同,OpenHiva内部的处理流程也不同。

  • 非USER_DEFINED模式下,Subscriber可以自动起线程获取订阅消息。
  • USER_DEFINED模式下,Subscriber不会自动起线程获取订阅消息,用户需调用OpenHiva::SpinOnce来获取订阅消息。
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
#include <string>
#include <iostream>
#include <memory>
#include <unistd.h>
#include "open/subscriber.h"
#include "open/init.h"
#include "open/node.h"
#include "open/buffer_message.h"
#include "securec.h"

uint32_t cnt = 0;

void Usage()
{
    std::cout << "Usage: pmupload test_create_buffer_sub nodeName topicName groupName bindType maxMsgSize queueSize blockNum overwrite queueFCFlag queueTTL transport" << std::endl;
    std::cout << "if argc==2, pub will use default value: pmupload test_create_buffer_sub testNodeBufferSub testTopic testGroupBufferSub 0 102400 10 10 1 0 0 1 dsf" << std::endl;
    std::cout << "bindType can be 0 1 2" << std::endl;
}

// 用户根据自身业务逻辑,定义回调函数用于处理订阅消息
void ChatterCallback(const OpenHiva::BufferMessage &msg)
{
    void *currBlockPtr = nullptr;
    size_t dataSize = 0U;
    OpenHiva::HivaBuffer &HivaBuffer = const_cast<OpenHiva::HivaBuffer &>(msg.data);
    uint32_t ret = HivaBuffer.GetBuff(currBlockPtr, dataSize);        // 获取内存地址dataPtr和dataLen
    if (ret != Hiva::HIVA_SUCCESS) {
        HIVA_ERROR("GetBuff failed and return %u", ret);
        return;
    }
    uint32_t dataLen = *reinterpret_cast<uint32_t *>(currBlockPtr);
    std::string str(reinterpret_cast<char *>(currBlockPtr) + 4, dataLen);
    HIVA_WARN("I heard    :%s", str.c_str());
    cnt++;
    return;
}

int main(int argc, char **argv)
{
    // 1. 资源初始化
    std::string nodeName = "testNodeSub";         
    std::string topicName = "testTopic";           // 订阅的话题名,要和发布者发布的话题名保持一致
    std::string groupName = "testGroupSub";        // groupName要和Init接口里的ThreadGroup.groupName保持一致
    OpenHiva::ScheduleType scheType = OpenHiva::ScheduleType(0);
    uint32_t maxMsgSize = 100 * 1024U;
    uint32_t queueSize = 10U;
    uint32_t blockNum = 10U;
    bool overwrite = false;
    bool queueFCFlag = false;
    uint16_t queueTTL = 100U;
    std::string transport;
    int argNum = 12;
    if (argc < argNum) {
        Usage();
        if (argc != 2) {
            return 0;
        }
    } else {
        nodeName = argv[1];
        topicName = argv[2];
        groupName = argv[3];
        scheType = (OpenHiva::ScheduleType)strtol(argv[4], NULL, 10);
        maxMsgSize = (uint32_t)strtol(argv[5], NULL, 10);
        queueSize = (uint32_t)strtol(argv[6], NULL, 10);
        blockNum = (uint32_t)strtol(argv[7], NULL, 10);
        overwrite = (bool)(strtol(argv[8], NULL, 10) & 0x01);
        queueFCFlag = (bool)(strtol(argv[9], NULL, 10) & 0x01);
        queueTTL = (uint16_t)strtol(argv[10], NULL, 10);
        transport = argv[11];
    }
    // 定义线程组
    std::vector<OpenHiva::ScheduleGroup> scheGrpVec;
    OpenHiva::ScheduleGroup scheGrp;
    scheGrp.groupName = groupName;                  // 线程组名称,需要保证在每个进程内唯一
    scheGrp.scheduleType = scheType;                // 工作线程是否是确定性
    scheGrpVec.push_back(scheGrp);
    // 调用资源初始化接口
    OpenHiva::Init(argc, argv, scheGrpVec);
    HIVA_EVENT("subscriber init ok!");

    // 2. 订阅消息
    // 构造Node对象
    OpenHiva::Node node(nodeName);
    // 构造TopicOptions  
    OpenHiva::TopicOptions topicOps;
    topicOps.BuildGroupName(groupName)
            .BuildMessageTraits<OpenHiva::BufferMessage>()
            .BuildShmOptions(maxMsgSize, blockNum)
            .BuildQueueOptions(queueSize, overwrite, queueFCFlag, queueTTL)
            .BuildTopicName(topicName);
    // 通过NodeHandle对象调用Subscriber接口订阅Topic,返回Subscriber对象
    std::function<void(OpenHiva::BufferMessage)> callBack = ChatterCallback;
    std::shared_ptr<OpenHiva::Subscriber> sub = node.CreateSubscriber(topicName, callBack, topicOps);
    // 判断Subscriber对象是否构造成功。若失败,调用Shutdown函数释放资源并退出
    if ((sub == nullptr) || (!sub->Ready())) {
        HIVA_ERROR("create subscriber failed");
        OpenHiva::Shutdown();
        return 0;
    }

    // 3. 资源释放
    // 判断Hiva节点状态。当节点是使能状态,返回true;当节点是shutdown或初始化失败状态,返回false,收发包均不能正确进行
    while(OpenHiva::Ready()) {
       OpenHiva::SpinOnce(groupName)   // USER_DEFINED模式下调用SpinOnce,非USER_DEFINED模式下无需调用SpinOnce
        sleep(1);
    }
    // 阻塞式调用,防止进程主动退出
    OpenHiva::WaitForShutdown();
    return 0;
}