SyncAll

函数功能

当不同核之间操作同一块全局内存且可能存在读后写、写后读以及写后写等数据依赖问题时,通过调用该函数来插入同步语句来避免上述数据依赖时可能出现的数据读写错误问题。

函数原型

表1 接口原型定义

原型定义

__aicore__ inline void SyncAll(const GlobalTensor<int32_t>& gmWorkspace, const LocalTensor<int32_t>& ubWorkspace)

参数说明

表2 接口参数说明

参数名称

输入/输出

含义

gmWorkspace

输入

gmWorkspace为用户定义的全局Global空间,所有核共用,用于保存每个核的状态标记,类型为GlobalTensor,支持的数据类型为int32_t。GlobalTensor数据结构的定义请参考GlobalTensor

所需空间大小参见约束说明

ubWorkspace

输入

ubWorkspace为用户定义的局部Local空间,每个核单独自用,用于标记当前核的状态,类型为LocalTensor,支持的数据类型为int32_t。LocalTensor数据结构的定义请参考LocalTensor

所需空间大小参见约束说明

返回值

支持的型号

Atlas 训练系列产品

Atlas推理系列产品AI Core

Atlas A2训练系列产品

约束说明

调用示例

本演示示例实现功能为使用8个核进行数据处理,每个核均是处理32个float类型数据,对该数据乘2后再与其他核上进行同样乘2的数据进行相加,中间结果保存到workGm,因此多个核之间需要进行数据同步。
#include "kernel_operator.h"

const int32_t DEFAULT_SYNCALL_NEED_SIZE = 8;

namespace AscendC {
class KernelSyncAll {
public:
    __aicore__ inline KernelSyncAll() {}
    __aicore__ inline void Init(__gm__ uint8_t* srcGm, __gm__ uint8_t* dstGm, __gm__ uint8_t* workGm,
        __gm__ uint8_t* syncGm)
    {
        blockNum = GetBlockNum(); // 获取核总数
        perBlockSize = srcDataSize / blockNum; // 每个核平分处理相同个数
        blockIdx = GetBlockIdx(); // 获取当前工作的核ID
        srcGlobal.SetGlobalBuffer(reinterpret_cast<__gm__ float*>(srcGm + blockIdx * perBlockSize * sizeof(float)),
            perBlockSize);
        dstGlobal.SetGlobalBuffer(reinterpret_cast<__gm__ float*>(dstGm + blockIdx * perBlockSize * sizeof(float)),
            perBlockSize);
        workGlobal.SetGlobalBuffer(reinterpret_cast<__gm__ float*>(workGm), srcDataSize);
        syncGlobal.SetGlobalBuffer(reinterpret_cast<__gm__ int32_t*>(syncGm), blockNum * DEFAULT_SYNCALL_NEED_SIZE);

        pipe.InitBuffer(inQueueSrc1, 1, perBlockSize * sizeof(float));
        pipe.InitBuffer(inQueueSrc2, 1, perBlockSize * sizeof(float));
        pipe.InitBuffer(workQueue, 1, blockNum * DEFAULT_SYNCALL_NEED_SIZE * sizeof(int32_t));
        pipe.InitBuffer(outQueueDst, 1, perBlockSize * sizeof(float));
    }
    __aicore__ inline void Process()
    {
        CopyIn();
        FirstCompute();
        CopyToWorkGlobal(); // 当前工作核计算后的数据先保存到外部工作空间

        // 等待所有核都完成计算
        LocalTensor<int32_t> workLocal = workQueue.AllocTensor<int32_t>();
        SyncAll(syncGlobal, workLocal);
        workQueue.FreeTensor(workLocal);  

        // 最终累加结果需要等所有核都计算完成
        LocalTensor<float> srcLocal2 = inQueueSrc2.DeQue<float>();
        LocalTensor<float> dstLocal = outQueueDst.AllocTensor<float>();
        DataCopy(dstLocal,srcLocal2,perBlockSize); // 当前核计算结果先保存到目的空间
        inQueueSrc2.FreeTensor(srcLocal2);

        for (int i = 0; i < blockNum; i++) {
            if (i != blockIdx) {
                CopyFromOtherCore(i); // 从外部工作空间读取数据
                Accumulate(dstLocal); // 所有数据都累加到目的空间
            }
        }

        outQueueDst.EnQue(dstLocal);
        CopyOut();
    }

private:
    __aicore__ inline void CopyToWorkGlobal()
    {
        LocalTensor<float> dstLocal = outQueueDst.DeQue<float>();
        DataCopy(workGlobal[blockIdx * perBlockSize], dstLocal, perBlockSize);
        outQueueDst.FreeTensor(dstLocal);
    }
    __aicore__ inline void CopyFromOtherCore(int index)
    {
        LocalTensor<float> srcLocal = inQueueSrc1.AllocTensor<float>();
        DataCopy(srcLocal, workGlobal[index * perBlockSize], perBlockSize);
        inQueueSrc1.EnQue(srcLocal);
    }

    __aicore__ inline void Accumulate(const LocalTensor<float> &dstLocal)
    {
        LocalTensor<float> srcLocal1 = inQueueSrc1.DeQue<float>();
        Add(dstLocal, dstLocal, srcLocal1, perBlockSize);
        inQueueSrc1.FreeTensor(srcLocal1);
    }

    __aicore__ inline void CopyIn()
    {
        LocalTensor<float> srcLocal = inQueueSrc1.AllocTensor<float>();
        DataCopy(srcLocal, srcGlobal, perBlockSize);
        inQueueSrc1.EnQue(srcLocal);
    }

    __aicore__ inline void FirstCompute()
    {
        LocalTensor<float> srcLocal1 = inQueueSrc1.DeQue<float>();
        LocalTensor<float> srcLocal2 = inQueueSrc2.AllocTensor<float>();
        LocalTensor<float> dstLocal = outQueueDst.AllocTensor<float>();

        float scalarValue(2.0);
        Muls(dstLocal, srcLocal1, scalarValue, perBlockSize);

        DataCopy(srcLocal2,dstLocal,perBlockSize);

        inQueueSrc1.FreeTensor(srcLocal1);    
        inQueueSrc2.EnQue(srcLocal2);
        outQueueDst.EnQue(dstLocal);
    }
    __aicore__ inline void CopyOut()
    {
        LocalTensor<float> dstLocal = outQueueDst.DeQue<float>();
        DataCopy(dstGlobal, dstLocal, perBlockSize);
        outQueueDst.FreeTensor(dstLocal);
    }

private:
    TPipe pipe;
    TQue<QuePosition::VECIN, 1> inQueueSrc1;
    TQue<QuePosition::VECIN, 1> inQueueSrc2;
    TQue<QuePosition::VECIN, 1> workQueue;
    TQue<QuePosition::VECOUT, 1> outQueueDst;
    GlobalTensor<float> srcGlobal;
    GlobalTensor<float> dstGlobal;
    GlobalTensor<float> workGlobal;
    GlobalTensor<int32_t> syncGlobal;
    int srcDataSize = 256;
    int32_t blockNum = 0;
    int32_t blockIdx = 0;
    uint32_t perBlockSize = 0;
};
} // namespace AscendC

extern "C" __global__ __aicore__ void kernel_syncAll_float(__gm__ uint8_t* srcGm, __gm__ uint8_t* dstGm,
    __gm__ uint8_t* workGm, __gm__ uint8_t* syncGm)
{
    AscendC::KernelSyncAll op;
    op.Init(srcGm, dstGm, workGm, syncGm);
    op.Process();
}

输入数据(srcGm):
[1,1,1,1,1,...,1]
输出数据(dstGm):
[16,16,16,16,16,...,16]