昇腾社区首页
中文
注册
开发者
下载

HcommWriteOnThread

产品支持情况

产品

是否支持

Atlas A3 训练系列产品/Atlas A3 推理系列产品

Atlas A2 训练系列产品/Atlas A2 推理系列产品

Atlas 200I/500 A2 推理产品

Atlas 推理系列产品

Atlas 训练系列产品

针对Atlas A2 训练系列产品/Atlas A2 推理系列产品,仅支持Atlas 800T A2 训练服务器、Atlas 900 A2 PoD 集群基础单元、Atlas 200T A2 Box16 异构子框。

功能说明

向channel上的指定内存写数据,将src中长度为len的内存数据写入dst所指向的相同长度的内存区域。接口调用方为src所在节点,该接口为异步接口。

函数原型

1
int32_t HcommWriteOnThread(ThreadHandle thread, ChannelHandle channel, void *dst, const void *src, uint64_t len)

参数说明

参数名

输入/输出

描述

thread

输入

通信线程句柄,为通过HcclThreadAcquire接口获取到的threads。

ThreadHandle类型的定义可参见ThreadHandle

channel

输入

通信通道句柄,为通过HcclChannelAcquire接口获取到的channels。

ChannelHandle类型的定义可参见ChannelHandle

dst

输出

目的内存地址。

src

输入

源内存地址。

len

输入

数据长度(字节)。

返回值

int32_t:接口成功返回0,其他失败。

约束说明

调用示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
typedef struct {
    HcclMemType type; ///< 缓存物理位置类型,参见HcclMemType
    void *addr;       ///< 缓存地址
    uint64_t size;    ///< 缓存区域字节数
} CommBuffer;
uint32_t channelNum = 2;
std::vector<HcclChannelDesc> channelDesc(channelNum);
HcclChannelDescInit(channelDesc.data(), channelNum);

channelDesc[0].remoteRank = 1;
channelDesc[0].channelProtocol = CommProtocol::COMM_PROTOCOL_HCCS;
channelDesc[0].notifyNum = 3;

channelDesc[1].remoteRank = 2;
channelDesc[1].channelProtocol = CommProtocol::COMM_PROTOCOL_HCCS;
channelDesc[1].notifyNum = 3;

HcclComm comm;
CommEngine engine = CommEngine::COMM_ENGINE_CPU_TS;
std::vector<ChannelHandle> channels(channelNum);
HcclChannelAcquire(comm, engine, channelDesc.data(), channelNum, channels.data());

ThreadHandle threads[2];
// 申请2条流,每条流2个notify
aclrtStream streams[2];
ThreadHandle threads[2];
// 申请2条流,每条流2个Notify
aclrtCreateStream(&streams[0]);
aclrtCreateStream(&streams[1]);
HcclResult result = HcclThreadAcquireWithStream(comm, engine, streams[0], 2, &threads[0]);
result = HcclThreadAcquireWithStream(comm, engine, streams[1], 2, &threads[1]);

u32 NOTIFY_TIMEOUT = 1;
void Sample(HcclComm comm, ChannelHandle channel, HcclThread threadHandle, uint32_t rank)
{
    if (rank == 0) {
        CommBuffer localBuf;
        // 获取本端通信内存信息
        HcclGetHcclBuffer(comm, &localBuf.addr, &localBuf.size);
        CommBuffer remoteBuf;
        // 获取对端通信内存信息
        HcclChannelGetHcclBuffer(comm, channel, &remoteBuf.addr, &remoteBuf.size);
        uint64_t len = std::min(localBuf.size, remoteBuf.size);

        // 前同步,通知对端本端已准备完成,并接收对端准备完成信号
        HcommChannelNotifyRecordOnThread(threadHandle, channel, 0);
        HcommChannelNotifyWaitOnThread(threadHandle, channel, 0, NOTIFY_TIMEOUT);
        // 将本端内存的内容写到对端内存上
        HcommWriteOnThread(threadHandle, channel, remoteBuf.addr, localBuf.addr, len);
        // 数据写完成同步,通知对端本端数据写完成,并接收对端数据写完成信号
        HcommChannelNotifyRecordOnThread(threadHandle, channel, 1);
        HcommChannelNotifyWaitOnThread(threadHandle, channel, 1, NOTIFY_TIMEOUT);
        // 后同步,通知对端本端已完成数据接收,并接收对端数据接收完成信号
        HcommChannelNotifyRecordOnThread(threadHandle, channel, 2);
        HcommChannelNotifyWaitOnThread(threadHandle, channel, 2, NOTIFY_TIMEOUT);
    } else {
        // 前同步,通知对端本端已准备完成,并接收对端准备完成信号
        HcommChannelNotifyRecordOnThread(threadHandle, channel, 0);
        HcommChannelNotifyWaitOnThread(threadHandle, channel, 0, NOTIFY_TIMEOUT);
        // 数据写完成同步,通知对端本端数据写完成,并接收对端数据写完成信号
        HcommChannelNotifyRecordOnThread(threadHandle, channel, 1);
        HcommChannelNotifyWaitOnThread(threadHandle, channel, 1, NOTIFY_TIMEOUT);
        // 后同步,通知对端本端已完成数据接收,并接收对端数据接收完成信号
        HcommChannelNotifyRecordOnThread(threadHandle, channel, 2);
        HcommChannelNotifyWaitOnThread(threadHandle, channel, 2, NOTIFY_TIMEOUT);
    }
    return;
}