昇腾社区首页
中文
注册

样例代码

本节以HcclAllReduce操作为例,给出使用HcclCommInitRootInfoConfig初始化方式,且每个AI Server对应一个业务进程的通信域创建及集合通信样例。

该样例仅支持单机N卡,N小于等于8

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
#include <iostream>
#include <fstream>
#include <vector>
#include <memory>
#include <thread>
#include <chrono>
#include <atomic>
#include <string.h>
#include <pthread.h>
#include "hccl/hccl.h"
#include "hccl/hccl_types.h"

using namespace std;
struct ThreadContext {
    HcclComm comm;
    int32_t device;
};
#define ACLCHECK(ret) do {\
    if(ret != ACL_SUCCESS)\
    {\
        printf("acl interface return err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret);\
        return ret;\
    }\
} while(0)
#define HCCLCHECK(ret) do {\
    if(ret != HCCL_SUCCESS)\
    {\
        printf("hccl interface return err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret);\
        return ret;\
    }\
} while(0)
// 参数平面位置
enum class NICDeployment {
    NIC_DEPLOYMENT_HOST = 0,
    NIC_DEPLOYMENT_DEVICE,
    NIC_DEPLOYMENT_RESERVED
};
 
using HcclRootHandle = struct HcclRootHandleDef {
    char ip[64];
    uint32_t port;
    NICDeployment nicDeploy;
    char identifier[128];
};
 
// rootInfo转HcclRootHandle,并打印
void printHcclRootHandle(const HcclRootInfo& rootInfo) {
    HcclRootHandle rootHandle;
    memcpy(&rootHandle, rootInfo.internal, sizeof(HcclRootHandle));
 
    // 打印HcclRootHandle的内容
    printf("IP: %s\t", rootHandle.ip);
    printf("Port: %u\t", rootHandle.port);
    printf("NIC Deployment: %d\t", static_cast<int>(rootHandle.nicDeploy));
    printf("Identifier: %s\n", rootHandle.identifier);
}
void allreduce_operation(void *sendBuf, void *recvBuf, uint64_t count, HcclDataType dataType,
    HcclReduceOp op, HcclComm comm, aclrtStream stream, int devId)
{
    pthread_setname_np(pthread_self(), "allreduce");
    // 指定集合通信操作使用的设备
    aclrtSetDevice(devId);
    // 尝试执行AllReduce操作
    HcclAllReduce(sendBuf, recvBuf, count, dataType, op, comm, stream);
    // 等待stream中集合通信任务执行完成
    aclrtSynchronizeStream(stream);
}
int Sample(void *arg)
{
    ThreadContext* ctx = (ThreadContext *)arg;
    void* host_buf = nullptr;
    void* send_buff = nullptr;
    void* recv_buff = nullptr;
    uint64_t count = 10;
    int malloc_kSize = count * sizeof(float);
    aclrtEvent start_event, end_event;
    aclrtStream stream;
    ACLCHECK(aclrtCreateStream(&stream));
    ACLCHECK(aclrtCreateEvent(&start_event));
    ACLCHECK(aclrtCreateEvent(&end_event));
    // 申请集合通信操作的内存
    ACLCHECK(aclrtMalloc((void**)&send_buff, malloc_kSize, ACL_MEM_MALLOC_HUGE_FIRST));
    ACLCHECK(aclrtMalloc((void**)&recv_buff, malloc_kSize, ACL_MEM_MALLOC_HUGE_FIRST));
    // 初始化输入内存
    ACLCHECK(aclrtMallocHost((void**)&host_buf, malloc_kSize));
    float* tmpHostBuff = static_cast<float*>(host_buf);
    for (uint32_t i = 0; i < count; ++i) {
        tmpHostBuff[i] = 20000;
    }
    ACLCHECK(aclrtMemcpy((void*)send_buff, malloc_kSize, (void*)host_buf, malloc_kSize, ACL_MEMCPY_HOST_TO_DEVICE));
    // 启动AllReduce操作的线程
    thread allreduce_thread(allreduce_operation, (void *)send_buff, (void*)recv_buff, count,
        HCCL_DATA_TYPE_FP32, HCCL_REDUCE_SUM, ctx->comm, stream, ctx->device);
    // 等待Allreduce线程结束
    allreduce_thread.join();
    cout << "Rank " << ctx->device << " AllReduce operation completed successfully." << endl;
    if (ctx->device < 8) {
        void* resultBuff;
        ACLCHECK(aclrtMallocHost((void**)&resultBuff, malloc_kSize));
        ACLCHECK(aclrtMemcpy((void*)resultBuff, malloc_kSize, (void*)recv_buff, malloc_kSize, ACL_MEMCPY_DEVICE_TO_HOST));
        ACLCHECK(aclrtFreeHost(resultBuff));
    }
    ACLCHECK(aclrtFree(send_buff));
    ACLCHECK(aclrtFree(recv_buff));
    ACLCHECK(aclrtFreeHost(host_buf));
    // 销毁任务流
    ACLCHECK(aclrtDestroyStream(stream));
    ACLCHECK(aclrtDestroyEvent(start_event));
    ACLCHECK(aclrtDestroyEvent(end_event));
    return 0;
}
int multiGetRootInfo(int commNum)
{
    std::ofstream out("rootinfo.bin", std::ios::binary);
    for (int rank = 0; rank < commNum; ++rank) {
        HcclRootInfo rootInfo;
        ACLCHECK(aclrtSetDevice(rank));
        HCCLCHECK(HcclGetRootInfo(&rootInfo));
        out.write(rootInfo.internal, HCCL_ROOT_INFO_BYTES);
        printHcclRootHandle(rootInfo);
        std::cout << "get rank done : " << rank << std::endl;
        
        // 重置设备
        ACLCHECK(aclrtResetDevice(rank));
    }
    out.close();
    return HCCL_SUCCESS;
}
HcclRootInfo loadRootInfo(int commNum, int commId)
{
    std::ifstream in("rootinfo.bin", std::ios::binary);
    for (int i = 0; i < commNum; ++i) {
        HcclRootInfo rootInfo{};
        in.read(rootInfo.internal, HCCL_ROOT_INFO_BYTES);
        if (i == commId) {
            printHcclRootHandle(rootInfo);
            return rootInfo;
        }
    }
    HcclRootInfo rootInfo_{};
    return rootInfo_;
}
int run(int commNum, int commId, int rankNum, int devId, int logicalRankId)
{
    auto rootInfo = loadRootInfo(commNum, commId);
    printHcclRootHandle(rootInfo);
    ACLCHECK(aclrtSetDevice(devId));
    // 创建并初始化通信域配置项
    HcclCommConfig config;
    HcclCommConfigInit(&config);
    // 根据需要修改通信域配置
    config.hcclBufferSize = 50;
    strcpy(config.hcclCommName, "comm_1");
    // 初始化集合通信域
    HcclComm hcclComm;
    HCCLCHECK(HcclCommInitRootInfoConfig(rankNum, &rootInfo, logicalRankId, &config, &hcclComm));
    // 创建任务stream
    struct ThreadContext args;
    args.comm = hcclComm;
    args.device = devId;
    Sample((void *)&args);
    // 销毁集合通信域,只有在未超时操作的情况下销毁
    HCCLCHECK(HcclCommDestroy(hcclComm));
    return 0;
}
int main(int argc, char*argv[])
{
    int devId = std::stoi(argv[1]); // dev id
    int logicalRank = std::stoi(argv[2]); // rank id
    int commId = std::stoi(argv[3]); // commId
    int commNum = std::stoi(argv[4]); // number of comm
    int rankNum = std::stoi(argv[5]); // rank size
    int devCount = rankNum;
    int logicalRankId = logicalRank;
    // 设备资源初始化
    ACLCHECK(aclInit(NULL));
    // 指定集合通信操作使用的设备
    ACLCHECK(aclrtSetDevice(devId));
    // 在rank0循环获取多个rank的rootInfo
    int32_t rootRank = 0;
    std::vector<std::thread> tmp;
    if (devId == rootRank) {
        std::vector<int> rootDevIds;
        std::vector<int> rootLogicalIds;
        for (int i=0; i<commNum; i++) {
            rootDevIds.emplace_back(i); // 每个通信域默认root device = commID
            rootLogicalIds.emplace_back(0);
        }
        HCCLCHECK(multiGetRootInfo(commNum));
        // rank0进程起多个线程,为对应的root调用HcclCommInitRootInfo
        for (int i=0; i<commNum; i++) {
            tmp.emplace_back(std::thread([=]() { run(commNum, i, rankNum, rootDevIds[i], rootLogicalIds[i]); }));
        }
    } else {
        run(commNum, commId, rankNum, devId, logicalRankId);
    }
    for (int i=0; i<tmp.size(); i++) {
        tmp[i].join();
    }
    // 重置设备
    ACLCHECK(aclrtResetDevice(devId));
    // 设备去初始化
    ACLCHECK(aclFinalize());
    return 0;
}