样例代码
本节以HcclAllReduce操作为例,给出使用HcclCommInitRootInfoConfig初始化方式,且每个AI Server对应一个业务进程的通信域创建及集合通信样例。
该样例仅支持单机N卡,N小于等于8
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | #include <iostream>
#include <fstream>
#include <vector>
#include <memory>
#include <thread>
#include <chrono>
#include <atomic>
#include <string.h>
#include <pthread.h>
#include "hccl/hccl.h"
#include "hccl/hccl_types.h"
using namespace std;
struct ThreadContext {
HcclComm comm;
int32_t device;
};
#define ACLCHECK(ret) do {\
if(ret != ACL_SUCCESS)\
{\
printf("acl interface return err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret);\
return ret;\
}\
} while(0)
#define HCCLCHECK(ret) do {\
if(ret != HCCL_SUCCESS)\
{\
printf("hccl interface return err %s:%d, retcode: %d \n", __FILE__, __LINE__, ret);\
return ret;\
}\
} while(0)
// 参数平面位置
enum class NICDeployment {
NIC_DEPLOYMENT_HOST = 0,
NIC_DEPLOYMENT_DEVICE,
NIC_DEPLOYMENT_RESERVED
};
using HcclRootHandle = struct HcclRootHandleDef {
char ip[64];
uint32_t port;
NICDeployment nicDeploy;
char identifier[128];
};
// rootInfo转HcclRootHandle,并打印
void printHcclRootHandle(const HcclRootInfo& rootInfo) {
HcclRootHandle rootHandle;
memcpy(&rootHandle, rootInfo.internal, sizeof(HcclRootHandle));
// 打印HcclRootHandle的内容
printf("IP: %s\t", rootHandle.ip);
printf("Port: %u\t", rootHandle.port);
printf("NIC Deployment: %d\t", static_cast<int>(rootHandle.nicDeploy));
printf("Identifier: %s\n", rootHandle.identifier);
}
void allreduce_operation(void *sendBuf, void *recvBuf, uint64_t count, HcclDataType dataType,
HcclReduceOp op, HcclComm comm, aclrtStream stream, int devId)
{
pthread_setname_np(pthread_self(), "allreduce");
// 指定集合通信操作使用的设备
aclrtSetDevice(devId);
// 尝试执行AllReduce操作
HcclAllReduce(sendBuf, recvBuf, count, dataType, op, comm, stream);
// 等待stream中集合通信任务执行完成
aclrtSynchronizeStream(stream);
}
int Sample(void *arg)
{
ThreadContext* ctx = (ThreadContext *)arg;
void* host_buf = nullptr;
void* send_buff = nullptr;
void* recv_buff = nullptr;
uint64_t count = 10;
int malloc_kSize = count * sizeof(float);
aclrtEvent start_event, end_event;
aclrtStream stream;
ACLCHECK(aclrtCreateStream(&stream));
ACLCHECK(aclrtCreateEvent(&start_event));
ACLCHECK(aclrtCreateEvent(&end_event));
// 申请集合通信操作的内存
ACLCHECK(aclrtMalloc((void**)&send_buff, malloc_kSize, ACL_MEM_MALLOC_HUGE_FIRST));
ACLCHECK(aclrtMalloc((void**)&recv_buff, malloc_kSize, ACL_MEM_MALLOC_HUGE_FIRST));
// 初始化输入内存
ACLCHECK(aclrtMallocHost((void**)&host_buf, malloc_kSize));
float* tmpHostBuff = static_cast<float*>(host_buf);
for (uint32_t i = 0; i < count; ++i) {
tmpHostBuff[i] = 20000;
}
ACLCHECK(aclrtMemcpy((void*)send_buff, malloc_kSize, (void*)host_buf, malloc_kSize, ACL_MEMCPY_HOST_TO_DEVICE));
// 启动AllReduce操作的线程
thread allreduce_thread(allreduce_operation, (void *)send_buff, (void*)recv_buff, count,
HCCL_DATA_TYPE_FP32, HCCL_REDUCE_SUM, ctx->comm, stream, ctx->device);
// 等待Allreduce线程结束
allreduce_thread.join();
cout << "Rank " << ctx->device << " AllReduce operation completed successfully." << endl;
if (ctx->device < 8) {
void* resultBuff;
ACLCHECK(aclrtMallocHost((void**)&resultBuff, malloc_kSize));
ACLCHECK(aclrtMemcpy((void*)resultBuff, malloc_kSize, (void*)recv_buff, malloc_kSize, ACL_MEMCPY_DEVICE_TO_HOST));
ACLCHECK(aclrtFreeHost(resultBuff));
}
ACLCHECK(aclrtFree(send_buff));
ACLCHECK(aclrtFree(recv_buff));
ACLCHECK(aclrtFreeHost(host_buf));
// 销毁任务流
ACLCHECK(aclrtDestroyStream(stream));
ACLCHECK(aclrtDestroyEvent(start_event));
ACLCHECK(aclrtDestroyEvent(end_event));
return 0;
}
int multiGetRootInfo(int commNum)
{
std::ofstream out("rootinfo.bin", std::ios::binary);
for (int rank = 0; rank < commNum; ++rank) {
HcclRootInfo rootInfo;
ACLCHECK(aclrtSetDevice(rank));
HCCLCHECK(HcclGetRootInfo(&rootInfo));
out.write(rootInfo.internal, HCCL_ROOT_INFO_BYTES);
printHcclRootHandle(rootInfo);
std::cout << "get rank done : " << rank << std::endl;
// 重置设备
ACLCHECK(aclrtResetDevice(rank));
}
out.close();
return HCCL_SUCCESS;
}
HcclRootInfo loadRootInfo(int commNum, int commId)
{
std::ifstream in("rootinfo.bin", std::ios::binary);
for (int i = 0; i < commNum; ++i) {
HcclRootInfo rootInfo{};
in.read(rootInfo.internal, HCCL_ROOT_INFO_BYTES);
if (i == commId) {
printHcclRootHandle(rootInfo);
return rootInfo;
}
}
HcclRootInfo rootInfo_{};
return rootInfo_;
}
int run(int commNum, int commId, int rankNum, int devId, int logicalRankId)
{
auto rootInfo = loadRootInfo(commNum, commId);
printHcclRootHandle(rootInfo);
ACLCHECK(aclrtSetDevice(devId));
// 创建并初始化通信域配置项
HcclCommConfig config;
HcclCommConfigInit(&config);
// 根据需要修改通信域配置
config.hcclBufferSize = 50;
strcpy(config.hcclCommName, "comm_1");
// 初始化集合通信域
HcclComm hcclComm;
HCCLCHECK(HcclCommInitRootInfoConfig(rankNum, &rootInfo, logicalRankId, &config, &hcclComm));
// 创建任务stream
struct ThreadContext args;
args.comm = hcclComm;
args.device = devId;
Sample((void *)&args);
// 销毁集合通信域,只有在未超时操作的情况下销毁
HCCLCHECK(HcclCommDestroy(hcclComm));
return 0;
}
int main(int argc, char*argv[])
{
int devId = std::stoi(argv[1]); // dev id
int logicalRank = std::stoi(argv[2]); // rank id
int commId = std::stoi(argv[3]); // commId
int commNum = std::stoi(argv[4]); // number of comm
int rankNum = std::stoi(argv[5]); // rank size
int devCount = rankNum;
int logicalRankId = logicalRank;
// 设备资源初始化
ACLCHECK(aclInit(NULL));
// 指定集合通信操作使用的设备
ACLCHECK(aclrtSetDevice(devId));
// 在rank0循环获取多个rank的rootInfo
int32_t rootRank = 0;
std::vector<std::thread> tmp;
if (devId == rootRank) {
std::vector<int> rootDevIds;
std::vector<int> rootLogicalIds;
for (int i=0; i<commNum; i++) {
rootDevIds.emplace_back(i); // 每个通信域默认root device = commID
rootLogicalIds.emplace_back(0);
}
HCCLCHECK(multiGetRootInfo(commNum));
// rank0进程起多个线程,为对应的root调用HcclCommInitRootInfo
for (int i=0; i<commNum; i++) {
tmp.emplace_back(std::thread([=]() { run(commNum, i, rankNum, rootDevIds[i], rootLogicalIds[i]); }));
}
} else {
run(commNum, commId, rankNum, devId, logicalRankId);
}
for (int i=0; i<tmp.size(); i++) {
tmp[i].join();
}
// 重置设备
ACLCHECK(aclrtResetDevice(devId));
// 设备去初始化
ACLCHECK(aclFinalize());
return 0;
}
|
父主题: AI Server与业务进程一对一场景