AllGatherVOperation

功能

将多个通信卡上的数据按通信编号的顺序在第一维进行聚合,然后发送到每张卡上。支持每张卡发送的数据不等长。

推理场景中会出现batch size不能被TP数整除的情况,reducescatter后续的计算算子需要按照batch维度处理数据,再将处理数据进行allgather,如图图1所示。

图1 算子上文示意图

示例:

图2 计算过程示意图

计算过程示意(python):

# 计算goldtensor
gold_outtensor = []
for i in range(len(sendcount)):
    gold_outtensor= gold_outtensor+(tensorafters[i][0:sendcount[i]])
GoldenTensors = (torch.tensor(np.array(gold_outtensor+[0]*(sum*dim[1]-len(gold_outtensor))).reshape(sum,dim[1]), dtype=inTensorDtype))

使用场景

模型并行;模型并行里前向计算里的参数全同步,需要用AllgatherV把模型并行里将切分到不同的XPU上的参数全同步到一张XPU上才能进行前向计算。

>>> rank0 input
tensor([[0,1,2,3],
        [4,5,6,7]], device='npu:0')  shape[2,4]
>>> rank0 sendcount
tensor([4], device='npu:0')  shape[1]
>>> rank1 input
tensor([[3,2,1,0],
        [7,6,5,4],
        [7,6,5,4]], device='npu:1')  shape[3,4]
>>> rank1 sendcount
tensor([2], device='npu:1')  shape[1]
>>> recvout=tensor([4,2])
>>> recvdis=tensor([0,4])
>>> y=tensor([0,1,2,3,4], device='npu:0')
>>> rank0 output
tensor([[0,1,2,3],
        [3,2,0,0],
        [0,0,0,0],
        [0,0,0,0],
        [0,0,0,0]], device='npu:0')  shape[5,4]
>>> rank1 output
tensor([[0,1,2,3],
        [3,2,0,0],
        [0,0,0,0],
        [0,0,0,0],
        [0,0,0,0]], device='npu:1')  shape[5,4]

定义

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
struct AllGatherVParam {
    int rank = -1;
    int rankSize = 0;
    int rankRoot = 0;
    std::string backend = "hccl";
    HcclComm hcclComm = nullptr;
    CommMode commMode = COMM_MULTI_PROCESS;
    std::string rankTableFile;
    std::string commDomain;
    uint8_t rsv[64] = {0};
};

参数列表

成员名称

类型

默认值

描述

rank

int

-1

当前卡所属通信编号。-1表示未传。

rankSize

int

0

通信的卡的数量,不能为0。

rankRoot

int

0

主通信编号。

backend

std::string

"hccl"

通信计算类型,仅支持"hccl"。

hcclComm

HcclComm

nullptr

hccl通信域接口获取的地址指针。默认为空,加速库为用户创建;若用户想要自己管理通信域,则需要传入该通信域指针,加速库使用传入的通信域指针来执行通信算子。

commMode

CommMode

COMM_MULTI_PROCESS

通信模式,CommMode类型枚举值。hccl多线程只支持外部传入通信域方式。

  • COMM_UNDEFINED:未定义。
  • COMM_MULTI_PROCESS:指定多进程通信。
  • COMM_MULTI_THREAD: 指定多线程通信。

rankTableFile

std::string

-

集群信息的配置文件路径,用于多机通信场景,当前仅支持hccl后端场景。集群信息的配置文件路径,适用单机以及多机通信场景,当前仅支持hccl后端场景,若单机配置了rankTable,则以ranktable来初始化通信域。

ranktable配置请参考:《TensorFlow 1.15模型迁移指南》的“模型训练>执行分布式训练>准备ranktable资源配置文件”章节

commDomain

std::string

-

通信device组用通信域名标识,多通信域时使用,当前仅支持hccl。

rsv[64]

uint8_t

{0}

预留参数。

输入

参数

维度

数据类型

格式

是否必选

描述

x

[dim_0, dim_1, ..., dim_n]

"hccl": float16/int8/bfloat16

ND

输入tensor。

sendCount

1[1]

int64

ND

输入tensor,为本卡发送的数据量。支持每张卡的该tensor不同,即支持数据不等长。

recvCounts

1[ranksize]

int64

ND

输入tensor,为从对应索引卡号接收到的数据量,每张卡都一样。

rdispls

1[ranksize]

int64

ND

为从对应索引卡号接收到的数据量的偏移,每张卡都一样,rdispls[i] = n表示本rank从相对于输入起始位置的偏移量为n的位置开始接收rank_i的数据。

y

1[每个卡的第1维的shape的和]

float16

ND

shape为所有卡的合并tensor的首shape之和,用于infer shape。

输出

参数

维度

数据类型

格式

是否必选

描述

output

[n, dim_1, ..., dim_n]

"hccl": float16/int8/bfloat16

ND

输出tensor,与输入tensor地址不同,非原地写,n为所有卡的要合并tensor的第一维shape之和,即y的shape。数据类型和输入相同。

规格约束

接口调用示例(Python)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def main_worker(rank, world_size, inTensorDtypes, random_seed):
    torch_npu.npu.set_device(rank)
    print(f'Process {rank} started, using device npu:{rank}.')
    acl_allgatherv_operation = torch.classes.OperationTorch.OperationTorch(
        "AllGatherVOperation")
    torch.manual_seed(random_seed)
    for inTensorDtype in inTensorDtypes:
        acl_param = json.dumps({"rank": rank, "rankSize": world_size, "rankRoot": 0, "backend": "hccl"})
        acl_allgatherv_operation.set_param(acl_param)

        op_name = "AllGatherVOperation"
        run_param = json.dumps({"sendCount": sendcount[rank], "recvCounts": recvout[rank], "rdispls": recvdis[rank]})
        host_list = [[sendcount[rank]], recvout[rank], recvdis[rank]]
        host_tensors = [np.array(tensor) for tensor in host_list]
        host_tensors = [torch.from_numpy(tensor).to(torch.int64) for tensor in host_tensors]
        host_tensors = [tensor.npu() for tensor in host_tensors]
        acl_allgatherv_operation.set_varaintpack_param(run_param)

        inTensors = [torch.tensor([[0, 1, 2, 3], [4, 5, 6, 7]], dtype=inTensorDtype),
                     torch.tensor([[3, 2, 1, 0], [7, 6, 5, 4], [7, 6, 5, 4]], dtype=inTensorDtype)]
        # y用来推导outputshape,长度应为所有inputtensor的dim0之和
        y = torch.tensor([0, 1, 2, 3, 4], dtype=torch.float16)
        GoldenTensors = [
            torch.tensor([[0, 1, 2, 3], [3, 2, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], dtype=inTensorDtype),
            torch.tensor([[0, 1, 2, 3], [3, 2, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]],
                         dtype=inTensorDtype)]
        acl_out_tensor = acl_allgatherv_operation.execute([inTensors[rank].npu(), host_tensors[0],
                                                           host_tensors[1], host_tensors[2], y.npu()])[0]
        print(f"********************acl_out_tensor:{acl_out_tensor}")
        torch.npu.synchronize()
        # assert result
        assert golden_compare(GoldenTensors[rank], acl_out_tensor.cpu())


def test_all_gather(self):
    world_size = 2
    random_seed = 123
    inTensorDtypes = [torch.int8]
    set_start_method('spawn', force=True)
    process_list = []
    for i in range(world_size):
        p = Process(target=main_worker, args=(i, world_size, inTensorDtypes, random_seed))
        p.start()
        process_list.append(p)
    for i in process_list:
        p.join()