将多个通信卡上的数据按通信编号的顺序在第一维进行聚合,然后发送到每张卡上。支持每张卡发送的数据不等长。
推理场景中会出现batch size不能被TP数整除的情况,reducescatter后续的计算算子需要按照batch维度处理数据,再将处理数据进行allgather,如图图1所示。
示例:
计算过程示意(python):
# 计算goldtensor gold_outtensor = [] for i in range(len(sendcount)): gold_outtensor= gold_outtensor+(tensorafters[i][0:sendcount[i]]) GoldenTensors = (torch.tensor(np.array(gold_outtensor+[0]*(sum*dim[1]-len(gold_outtensor))).reshape(sum,dim[1]), dtype=inTensorDtype))
模型并行;模型并行里前向计算里的参数全同步,需要用AllgatherV把模型并行里将切分到不同的XPU上的参数全同步到一张XPU上才能进行前向计算。
>>> rank0 input tensor([[0,1,2,3], [4,5,6,7]], device='npu:0') shape[2,4] >>> rank0 sendcount tensor([4], device='npu:0') shape[1] >>> rank1 input tensor([[3,2,1,0], [7,6,5,4], [7,6,5,4]], device='npu:1') shape[3,4] >>> rank1 sendcount tensor([2], device='npu:1') shape[1] >>> recvout=tensor([4,2]) >>> recvdis=tensor([0,4]) >>> y=tensor([0,1,2,3,4], device='npu:0') >>> rank0 output tensor([[0,1,2,3], [3,2,0,0], [0,0,0,0], [0,0,0,0], [0,0,0,0]], device='npu:0') shape[5,4] >>> rank1 output tensor([[0,1,2,3], [3,2,0,0], [0,0,0,0], [0,0,0,0], [0,0,0,0]], device='npu:1') shape[5,4]
1 2 3 4 5 6 7 8 9 10 11 | struct AllGatherVParam { int rank = -1; int rankSize = 0; int rankRoot = 0; std::string backend = "hccl"; HcclComm hcclComm = nullptr; CommMode commMode = COMM_MULTI_PROCESS; std::string rankTableFile; std::string commDomain; uint8_t rsv[64] = {0}; }; |
成员名称 |
类型 |
默认值 |
描述 |
---|---|---|---|
rank |
int |
-1 |
当前卡所属通信编号。-1表示未传。 |
rankSize |
int |
0 |
通信的卡的数量,不能为0。 |
rankRoot |
int |
0 |
主通信编号。 |
backend |
std::string |
"hccl" |
通信计算类型,仅支持"hccl"。 |
hcclComm |
HcclComm |
nullptr |
hccl通信域接口获取的地址指针。默认为空,加速库为用户创建;若用户想要自己管理通信域,则需要传入该通信域指针,加速库使用传入的通信域指针来执行通信算子。 |
commMode |
CommMode |
COMM_MULTI_PROCESS |
通信模式,CommMode类型枚举值。hccl多线程只支持外部传入通信域方式。
|
rankTableFile |
std::string |
- |
集群信息的配置文件路径,用于多机通信场景,当前仅支持hccl后端场景。集群信息的配置文件路径,适用单机以及多机通信场景,当前仅支持hccl后端场景,若单机配置了rankTable,则以ranktable来初始化通信域。 ranktable配置请参考:《TensorFlow 1.15模型迁移指南》的“模型训练>执行分布式训练>准备ranktable资源配置文件”章节。 |
commDomain |
std::string |
- |
通信device组用通信域名标识,多通信域时使用,当前仅支持hccl。 |
rsv[64] |
uint8_t |
{0} |
预留参数。 |
参数 |
维度 |
数据类型 |
格式 |
是否必选 |
描述 |
---|---|---|---|---|---|
x |
[dim_0, dim_1, ..., dim_n] |
"hccl": float16/int8/bfloat16 |
ND |
是 |
输入tensor。 |
sendCount |
1[1] |
int64 |
ND |
是 |
输入tensor,为本卡发送的数据量。支持每张卡的该tensor不同,即支持数据不等长。 |
recvCounts |
1[ranksize] |
int64 |
ND |
是 |
输入tensor,为从对应索引卡号接收到的数据量,每张卡都一样。 |
rdispls |
1[ranksize] |
int64 |
ND |
是 |
为从对应索引卡号接收到的数据量的偏移,每张卡都一样,rdispls[i] = n表示本rank从相对于输入起始位置的偏移量为n的位置开始接收rank_i的数据。 |
y |
1[每个卡的第1维的shape的和] |
float16 |
ND |
是 |
shape为所有卡的合并tensor的首shape之和,用于infer shape。 |
参数 |
维度 |
数据类型 |
格式 |
是否必选 |
描述 |
---|---|---|---|---|---|
output |
[n, dim_1, ..., dim_n] |
"hccl": float16/int8/bfloat16 |
ND |
否 |
输出tensor,与输入tensor地址不同,非原地写,n为所有卡的要合并tensor的第一维shape之和,即y的shape。数据类型和输入相同。 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | def main_worker(rank, world_size, inTensorDtypes, random_seed): torch_npu.npu.set_device(rank) print(f'Process {rank} started, using device npu:{rank}.') acl_allgatherv_operation = torch.classes.OperationTorch.OperationTorch( "AllGatherVOperation") torch.manual_seed(random_seed) for inTensorDtype in inTensorDtypes: acl_param = json.dumps({"rank": rank, "rankSize": world_size, "rankRoot": 0, "backend": "hccl"}) acl_allgatherv_operation.set_param(acl_param) op_name = "AllGatherVOperation" run_param = json.dumps({"sendCount": sendcount[rank], "recvCounts": recvout[rank], "rdispls": recvdis[rank]}) host_list = [[sendcount[rank]], recvout[rank], recvdis[rank]] host_tensors = [np.array(tensor) for tensor in host_list] host_tensors = [torch.from_numpy(tensor).to(torch.int64) for tensor in host_tensors] host_tensors = [tensor.npu() for tensor in host_tensors] acl_allgatherv_operation.set_varaintpack_param(run_param) inTensors = [torch.tensor([[0, 1, 2, 3], [4, 5, 6, 7]], dtype=inTensorDtype), torch.tensor([[3, 2, 1, 0], [7, 6, 5, 4], [7, 6, 5, 4]], dtype=inTensorDtype)] # y用来推导outputshape,长度应为所有inputtensor的dim0之和 y = torch.tensor([0, 1, 2, 3, 4], dtype=torch.float16) GoldenTensors = [ torch.tensor([[0, 1, 2, 3], [3, 2, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], dtype=inTensorDtype), torch.tensor([[0, 1, 2, 3], [3, 2, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]], dtype=inTensorDtype)] acl_out_tensor = acl_allgatherv_operation.execute([inTensors[rank].npu(), host_tensors[0], host_tensors[1], host_tensors[2], y.npu()])[0] print(f"********************acl_out_tensor:{acl_out_tensor}") torch.npu.synchronize() # assert result assert golden_compare(GoldenTensors[rank], acl_out_tensor.cpu()) def test_all_gather(self): world_size = 2 random_seed = 123 inTensorDtypes = [torch.int8] set_start_method('spawn', force=True) process_list = [] for i in range(world_size): p = Process(target=main_worker, args=(i, world_size, inTensorDtypes, random_seed)) p.start() process_list.append(p) for i in process_list: p.join() |