技术干货

HCCL集合通信算法开发Hello World示例(超详细)

昇腾CANN

发表于 2024/09/03

1 前言

本文给读者介绍了HCCL算法开发所涉及的概念和流程,并通过一个样例将前文介绍的内容串联起来。本文定位为HCCL算法开发的入门介绍,读者读完后,可结合HCCL开放代码仓中的算法样例,做深入研究。

2 什么是集合通信

集合通信定义了一系列标准信息交换接口,解决并行计算时不同进程之间的通信问题。集合通信的概念和作用可参考文章《HCCL——昇腾高性能集合通信库》。

举例来说,AllGather将group内所有节点的输入按照Rank Id重新排序,然后拼接起来,再将结果发送到所有节点的输出。

HCCL为集合通信算子提供了对应的API供开发者调用,例如AllGather算子的API如下:

HcclResult HcclAllGather(void *sendBuf, void *recvBuf, uint64_t sendCount, HcclDataType dataType, HcclComm comm, aclrtStream stream);

·   sendBuf表示输入内存缓冲区指针

·   recvBuf表示输出内存缓冲区指针

·   sendCount表示输入的数据个数

·   dataType表示处理的数据类型

·   comm表示算子执行所在的通信域

·   stream表示算子执行所在的主流

开发者可以在PyTorch等AI框架中调用HcclAllGather接口,使能昇腾AI处理器(下文简称NPU)执行AllGather通信。

3 集合通信算法开发相关概念引入

针对同一个通信算子,随着网络拓扑、通信数据量、硬件资源等变化,往往会采用不同的通信算法,从而最大化集群通信性能。以AllGather算子举例,HCCL实现了Mesh、Ring、Recursive Halving-Doubling(RHD)、NHR(Nonuniform Hierarchical Ring)、NB(Nonuniform Bruck)等多种算法用于Server内和Server间的集合通信,通信操作执行时,HCCL会根据输入条件从算法仓中自动选择性能最佳的算法。

在进行集合通信算法开发前,先引入一些概念介绍:

3.1 通信域

集合通信发生在一组通信对象上(比如一个NPU就是一个通信对象)。通信域是集合通信算子执行的上下文,管理对应的通信对象和通信所需的资源。

3.2 Rank

通信域中的每个通信对象称为一个Rank。

3.3 Memory

集合通信执行所需要的各种Buffer资源。

·   Input Buffer:集合通信算子输入数据缓冲区。

·   Output Buffer:集合通信算子输出数据缓冲区,存放算法计算结果。

·   CCL Buffer:一组地址固定的Buffer,可被远端访问。单算子模式下,通信对象通过CCL Buffer来实现跨Rank的数据交换。

CCL buffer和通信域绑定,通信域初始化的时候创建两块CCL buffer,分别称为CCL_In和CCL_Out。CCL_In和CCL_Out默认大小是200M Byte,可以通过环境变量HCCL_BUFFSIZE进行修改。同一个通信域内执行的集合通信算子都复用相同的CCL Buffer。

·   Scratch Buffer:除了CCL Buffer,有些算法计算过程中需要额外的存储空间,这部分额外的存储空间,称为Scratch Buffer。

3.4 流

流(Stream)是NPU上的一种硬件资源,承载了待执行的Task序列。Task可以是一个DMA操作、一个同步操作或者一个NPU算子等。同一个流上的Task序列按顺序执行,不同流上的Task序列可并发执行。

·   主流:由Pytorch等训练框架调用集合通信API传入的stream对象称为主流。

·   从流:为了实现集合通信算法所需要的并行性而额外申请的stream对象称为从流。

上图展示了一条主流和一条从流,主从流之间通过Post/Wait这一组Task进行同步。主从流之间没有依赖关系时,Task可并行执行。如上图主流的TaskA、TaskB和从流的TaskX、TaskY可以是并行执行的。Post/Wait的具体含义会在后文给出。

3.5 Notify

Notify是NPU上的硬件资源,用来做同步。在集合通信中主要有两种作用:

1)Rank内主从流之间的同步;

2)跨Rank数据收发的同步。

和Notify有关的Task有两种:1)Post;2)Wait。

Post操作给对应的notify寄存器置1,并返回。如果对应的notify值已经是1,则不产生变化,直接返回。

Wait操作会等待对应的notify值变为1。条件满足后,将对应的notify值复位为0,并继续执行后续的Task。

3.5.1 Rank内主从流同步示例


主流通知从流,实质是将notify1置位为1。从流通知主流,实质是将notify2置位为1。

3.5.2 跨Rank数据收发同步示例

与Rank内主从流同步类似,跨Rank的数据收发也需要同步。比如向远端Rank写数据前,得知道远端是否准备好接受数据的Buffer。关于跨Rank的同步,可参考下面关于Transport链路的章节。

3.6 Transport链路

要完成Rank间的数据通信需要先建立Transport链路,Transport链路分两种:1)SDMA链路,对应到HCCS/PCIE硬件连接;2)RDMA链路,对应到RoCE硬件连接。

·  SDMA Transport链路的两端各有两种类型的notify,分别称为Ack、DataSignal

·  RDMA Transport链路的两端各有三种类型的notify,分别称为Ack、DataSiganl、DataAck

每条Transport链路会申请各自的notify资源,不同的Transport之间不会复用notify。SDMA链路会申请四个notify,每端各两个;RDMA链路会申请六个notfy,每端各三个。

3.6.1 SDMA数据收发同步

一次SDMA数据收发需要两组同步,如下图所示,分别使用了Ack和DataSignal两个notify。为了避免同一条链路上多次收发数据相互影响,同步需以Ack开始,以DataSignal结束。

3.6.2 RDMA数据收发同步

一次RDMA数据收发需要三组同步信号,如下图所示。这是因为RDMA操作在流上是异步执行的,所以Rank 0执行完Write和Post DataSignal之后,并不知道数据什么时候写完,因此需要Rank1 Wait DataSignal满足条件后,再给Rank 0发送一个DataAck同步信号,通知Rank0数据已经写完了。

为了避免同一条链路上多次收发数据相互影响,同步需以Ack开始,以DataAck结束。

4 算法实现样例解析

4.1 算法原理

在了解完集合通信开发涉及的基本概念后,我们以单台Atlas 800T A2 训练服务器执行AllGather算子来举例,实现一个server内的mesh算法。

假定通信域中有4个NPU,每个NPU与另外三个NPU都有独立的HCCS链路,也就是说server内是mesh链接。硬件拓扑如下:

Buffer初始状态如下图,每个Rank只有UserIn(输入Buffer)中存在有效数据。Rank之间使用CCL内存交换数据,因为本算法只使用CCL_Out,所以没有把CCL_In画出来。

第一步,将数据从UserIn搬移到CCL_Out。

第二步,每个Rank同时从本Rank的CCL_Out和其他Rank的CCL_Out读取数据,并写到自己的UserOut(输出Buffer)的对应位置。这边只画了Rank0的数据搬运方向,其他Rank的搬运方式是类似的。

4.2 算法执行流程

在开发集合通信算法之前,需要先了解通信算法的执行流程,如下图所示:

通信算法的主体执行流程可划分为4步:

1. HCCL框架根据算子类型构造出对应的算子实例。

2. 框架调用算子实例执行算法选择,算法选择会返回将要执行的算法名字和一个标志资源的新tag字符串。

3. 框架根据新tag查询对应的资源(Scratch Buffer、从流、主从流同步的notify、Transport链路)是否存在,若不存在,则调用算子实例的资源计算接口获取算法执行需要的资源诉求,然后框架根据资源诉求将对应的资源创建出来。

4. 框架传入算法执行需要的资源,并执行算法编排。算法编排执行的过程中,会通过平台层的接口提交要执行的Task。

上述主体流程的步骤2至步骤4,是执行一个算法的必经步骤。那么开发一个新的算法,也需要完成相应的步骤,即算法选择、资源计算和算法编排。

4.3 算法选择

每个算法有自己的适用范围,在适用范围内,该算法往往是性能最佳的。本样例介绍的算法在以下三个条件满足时,为优选算法。具体条件为:1)server内是mesh拓扑;2)执行模式为单算子模式;3)通信域中只有一台server。

具体代码实现如下:

// isMeshTopo为true表示server内mesh拓扑
    if (isMeshTopo) {
        // 表示当前为单算子模式
        if (workflowMode_ == HcclWorkflowMode::HCCL_WORKFLOW_MODE_OP_BASE) {
            // 表示拓扑中只有一个server
            if (isSingleMeshAggregation_) {
                // 选择条件:1)server内mesh拓扑;2)单server场景;3)单算子模式
                algName = "AllGatherMeshOpbaseExecutor";
            }
        }
    }

4.4 资源计算

接下来,我们从算法原理来分析该算法执行需要的资源,代码中以rankSize表示参与通信的实体个数,本样例中rankSize即为4。

1. 算法执行不需要额外的Scratch Buffer,因此Scratch Buffer大小为0。

2. 算法原理第二步中,Rank内有rankSize块数据并发搬运,一共需要rankSize条流,除去主流,还需要rankSize - 1个从流。

3. 需要的主从流同步的notify数量为(rankSize - 1) * 2。Transport同步的notify随Transport链路自动申请,这边不用计算Transport需要的notify数量。

4. 算法原理第二步中,每个Rank都需要和其他的Rank交互,因此需要建立mesh链路。资源计算的参考代码实现:

// 计算从流的数量,deviceNumPerAggregation即为mesh内的Rank个数,即为RankSize
HcclResult CollAllGatherMeshOpbaseExecutor::CalcStreamNum(u32& streamNum)
{
u32 totalStreamNum = topoAttr_.deviceNumPerAggregation;
// 返回的从流数量为rankSize-1
    streamNum = totalStreamNum - 1U;
    return HCCL_SUCCESS;
}

// 计算建链诉求
HcclResult CollAllGatherMeshOpbaseExecutor::CalcCommInfo(std::vector<LevelNSubCommTransport>& opTransport)
{
    TransportMemType inputType = TransportMemType::RESERVED;
    TransportMemType outputType = TransportMemType::RESERVED;
    CHK_RET(CalcTransportMemType(inputType, outputType));
    CHK_RET(CalcLevel0CommInfo(inputType, outputType, opTransport));
    return HCCL_SUCCESS;
}

// 获取建链的内存类型
HcclResult CollAllGatherMeshOpbaseExecutor::CalcTransportMemType(TransportMemType &inputType, TransportMemType &outputType)
{
    if (GetWorkflowMode() == HcclWorkflowMode::HCCL_WORKFLOW_MODE_OP_BASE) {
        inputType = TransportMemType::CCL_INPUT;    // 单算子模式下使用CCL Buffer建链
        outputType = TransportMemType::CCL_OUTPUT;  // 单算子模式下使用CCL Buffer建链
    }
    return HCCL_SUCCESS;
}

// 调用公共函数完成server内mesh建链
HcclResult CollAllGatherMeshOpbaseExecutor::CalcLevel0CommInfo(TransportMemType inputType, TransportMemType outputType, std::vector<LevelNSubCommTransport>& opTransport)
{
    CommParaInfo commParaLevel0(COMM_LEVEL0, CommType::COMM_TAG_MESH);
    CHK_RET(CalcCommPlaneInfo(tag_, commParaLevel0, opTransport[COMM_LEVEL0], inputType, outputType));
    return HCCL_SUCCESS;
}

4.5 算法编排

以Rank0视角来举例说明算法编排。Rank0上一共有四条流,主流做Rank内的数据搬运,三条从流分别与另外三个Rank做Rank间的数据搬运。

算法原理中第一步对应了主流上的第一个蓝色Task,即将数据从输入Buffer搬到CCL_Out Buffer。在执行第二步之前,需要由主流唤醒从流开始工作,即第一组橙色块所示。然后执行算法原来的第二步,主流上做Rank内搬运,从流上做Rank间搬运,即中间蓝色和绿色块。最后,从流完成任务并通知主流,即图中最后一组橙色块。

图中主从流同步一共使用了6个Wait,对应到资源中的6个notify。主从流同步的写法如下:

// 主流通知从流,其中stream_表示主流,meshSignalAux_表示安排在从流上的notify
HcclResult AllgatherMeshDirect::MainRecordSub()
{
    for (u32 signalIndex = 0; signalIndex < meshSignalAux_.size(); signalIndex++) {
        CHK_RET(LocalNotify::Post(stream_, dispatcher_, meshSignalAux_[signalIndex],
            profilerInput_.stage));
    }
    return HCCL_SUCCESS;
}

// 从流等待主流,meshStream_表示从流,meshSignalAux_表示安排在从流上的notify
HcclResult AllgatherMeshDirect::SubWaitMain()
{
    for (u32 streamIndex = 0; streamIndex < meshSignalAux_.size(); streamIndex++) {
        CHK_RET(LocalNotify::Wait(meshStreams_[streamIndex], dispatcher_, meshSignalAux_[streamIndex],
            profilerInput_.stage));
    }
    return HCCL_SUCCESS;
}

// 从流通知主流,meshStream_表示从流,meshSignal_表示安排在主流上的notify
HcclResult AllgatherMeshDirect::SubRecordMain()
{
    for (u32 streamIndex = 0; streamIndex < meshSignal_.size(); streamIndex++) {
        CHK_RET(LocalNotify::Post(meshStreams_[streamIndex], dispatcher_, meshSignal_[streamIndex],
            profilerInput_.stage));
    }
    return HCCL_SUCCESS;
}

// 主流等待从流,其中stream_表示主流,meshSignal_表示安排在主流上的notify
HcclResult AllgatherMeshDirect::MainWaitSub()
{
    for (u32 signalIndex = 0; signalIndex < meshSignal_.size(); signalIndex++) {
        CHK_RET(LocalNotify::Wait(stream_, dispatcher_, meshSignal_[signalIndex], profilerInput_.stage));
    }
    return HCCL_SUCCESS;
}

算法编排实现中使用TxAck表示Post远端的Ack notify,使用RxAck表示Wait本端的Ack notify;使用TxDataSignal表示Post远端的DataSignal notify,使用RxDataSignal表示Wait本端的DataSignal notify。编排逻辑代码如下

HcclResult AllgatherMeshDirect::RunAsync(const u32 rank, const u32 rankSize, const std::vector<LINK> &links)
{
    // 获取数据类型对应的字节数
    u32 unitSize = DataUnitSize(dataType_);
    u64 sdmaSize = count_ * unitSize; // 当前一次SDMA通信的字节数
    u64 sliceSize = opInfo_->count * unitSize; // AllGather输入Buffer对应的字节数
    // 获取输入Buffer指针
char* curUerMemInPtr = static_cast<char *>(opInfo_->inputAddr);
// 获取输出Buffer指针
char* curUerMemOutPtr = static_cast<char *>(opInfo_->outputAddr);
// 获取CCL_Out Buffer指针
    char* curCommMemOutPtr = static_cast<char *>(outputMem_.ptr());

// 第一步,本地数据从输入Buffer搬移到CCL_Out
DeviceMem src;
    DeviceMem dst;
    src = DeviceMem::create(curUerMemInPtr, sdmaSize);
    u64 localOffsetByte = (sliceSize * rank) % HCCL_MIN_SLICE_ALIGN_910B;
    dst = DeviceMem::create(curCommMemOutPtr + localOffsetByte, sdmaSize);
    CHK_RET(HcclD2DMemcpyAsync(dispatcher_, dst, src, stream_));

    // 主流通知从流开始干活
    CHK_RET(MainRecordSub());
    CHK_RET(SubWaitMain());

    // 本Rank与远端Rank互发Ack信号
    for (u32 round = 1; round < rankSize; round++) {
        u32 dstRank = BackwardRank(rank, rankSize, round);
        Stream& subStream = meshStreams_[round - 1];
         // TxAck表示Post远端的Ack notify,RxAck表示Wait本端的Ack nofity
        CHK_RET(links[dstRank]->TxAck(subStream));
        CHK_RET(links[dstRank]->RxAck(subStream));
    }

    // 主流将数据从CCL_Out搬移到输出Buffer
    src = dst;
    dst = DeviceMem::create(curUerMemOutPtr + rank * sliceSize, sdmaSize);
    CHK_RET(HcclD2DMemcpyAsync(dispatcher_, dst, src, stream_));

    // 从流从其他Rank搬运数据;本Rank与远端Rank互发DataSignal信号
    for (u32 round = 1; round < rankSize; round++) {
        u32 dstRank = BackwardRank(rank, rankSize, round);
	   // 获取要下发Task的从流
        Stream& subStream = meshStreams_[round - 1];
        // 获取server内远端Rank在本Rank映射的CCL_Out Buffer的地址
        void *remMemPtr = nullptr;
        CHK_RET(links[dstRank]->GetRemoteMem(UserMemType::OUTPUT_MEM, &remMemPtr));
        u64 remoteOffsetByte = (sliceSize * dstRank) % HCCL_MIN_SLICE_ALIGN_910B;
        src = DeviceMem::create(static_cast<char *>(remMemPtr) + remoteOffsetByte, sdmaSize);
        dst = DeviceMem::create(curUerMemOutPtr + dstRank * sliceSize, sdmaSize);
	   // 将数据从远端Rank读取到本端输出Buffer
        CHK_RET(HcclD2DMemcpyAsync(dispatcher_, dst, src, subStream,
            links[dstRank]->GetRemoteRank(), links[dstRank]->GetLinkType()));
         // TxDataSignal表示Post远端的DataSignal notify
// RxDataSignal表示Wait本端的DataSignal nofity
        CHK_RET(links[dstRank]->TxDataSignal(subStream));
        CHK_RET(links[dstRank]->RxDataSignal(subStream));
    }
    // 从流通知主流任务完成
    CHK_RET(SubRecordMain());
    CHK_RET(MainWaitSub());

    return HCCL_SUCCESS;
}

5 参考代码

本样例即为HCCL开放代码仓中的AllGatherMeshOpbaseExecutor算法实现,对应代码链接可参考以下链接。为了减少干扰,文档中贴出的代码删除了部分逻辑,不影响整体流程。
算法选择部分代码实现:
https://gitee.com/ascend/cann-hccl/blob/master/src/domain/collective_communication/algorithm/impl/operator/all_gather_operator.cc
资源计算部分代码实现:
https://gitee.com/ascend/cann-hccl/blob/master/src/domain/collective_communication/algorithm/impl/coll_executor/coll_all_gather/coll_all_gather_mesh_opbase_executor.cc
算法编排部分代码实现:
https://gitee.com/ascend/cann-hccl/blob/master/src/domain/collective_communication/algorithm/base/executor/all_gather_mesh_direct.cc

6 获取更多学习资源

HCCL集合通信算法开发的相关介绍就到这里,欢迎大家关注手续技术分享。了解更多集合通信能力介绍,请访问昇腾社区HCCL信息专区:https://www.hiascend.com/hccl

往期介绍:HCCL——昇腾高性能集合通信库


本页内容