BatchWrite
Applicability
|
Product |
Supported |
|---|---|
|
|
√ |
|
|
√ |
|
|
x |
|
|
x |
|
|
x |
|
|
x |
Function
Returns handleId of the BatchWrite task to users. BatchWrite implements point-to-point communication, which is a communication mode that directly transmits data. It can send multiple copies of data to different Global Memory addresses at the same time.
For the
For the
Prototype
1 2 |
template <bool commit = false> __aicore__ inline HcclHandle BatchWrite(GM_ADDR batchWriteInfo, uint32_t itemNum, uint16_t queueID = 0U) |
Parameters
|
Parameter |
Input/Output |
Description |
|---|---|---|
|
commit |
Input |
Bool. Values:
|
|
Parameter |
Input/Output |
Description |
||||
|---|---|---|---|---|---|---|
|
batchWriteInfo |
Input |
Global memory address of the communication task information. The information about a group of communication data must be saved in the specified format. You can specify multiple groups of communication task information at a time. When the communication task is executed, data is sent in batches. For the
For the
|
||||
|
itemNum |
Input |
Number of batch tasks. The value of this parameter must be the same as the number of groups of communication task information in batchWriteInfo. For the |
||||
|
queueID |
Input |
ID of the queue where the current communication is located. The default value is 0. |
Returns
The task ID handleId is returned. The value of handleId is greater than or equal to 0. If the API fails to be called, the value -1 is returned.
Restrictions
- Before calling this API, ensure that the InitV2 and SetCcTilingV2 APIs have been called.
- If the core for delivering the communication task is not specified in the config template parameters of the HCCL object, this API can be called only on the AIC or AIV core. If the core for delivering the communication task is specified in the config template parameters of the HCCL object, this API can be called on both the AIC and AIV cores. The API delivers the communication task only on the AIC or AIV core based on the specified core type.
- The total number of times that all Prepare and InterHcclGroupSync APIs are called in a communicator cannot exceed 63.
- For the
Atlas A2 training products /Atlas A2 inference products , currently, this API supports communication between different AI Servers. The destination device number specified in the communication task information cannot be the local device number. - Before the communication task information is written to batchWriteInfo, you must call the DataCacheCleanAndInvalid API to ensure that the expected data is successfully updated to the global memory.
Example
- Point-to-point communication between different AI servers.
On the
Atlas A2 training products /Atlas A2 inference products , assume that data on the current AI Server needs to be sent to the specified locations on AI Servers 2 and 3. You can call the BatchWrite API to implement batch point-to-point communication.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
struct BatchWriteItem { uint64_t localBuf; // Window address for sending data on the local end. uint64_t remoteBuf; // Window address for receiving data on the peer end. uint64_t count; // Number of sent data elements. uint32_t dataType; // Type of the sent data. uint32_t remoteRankId; // ID of the destination device to which data is sent. }; // Define the format based on API conventions. extern "C" __global__ __aicore__ void BatchWrite_custom(GM_ADDR inputGM, GM_ADDR workspace, GM_ADDR tilingGM) { GM_ADDR userWS = GetUserWorkspace(workspace); if (userWS == nullptr) { return; } REGISTER_TILING_DEFAULT(BatchWriteCustomTilingData); // BatchWriteCustomTilingData is the structure defined in the operator header file. GET_TILING_DATA_WITH_STRUCT(BatchWriteCustomTilingData, tilingData, tilingGM); GM_ADDR contextGM = AscendC::GetHcclContext<0>(); if constexpr (g_coreType == AscendC::AIV) { Hccl hccl; hccl.InitV2(contextGM, &tilingData); hccl.SetCcTilingV2(offsetof(BatchWriteCustomTilingData, mc2CcTiling)); __gm__ BatchWriteItem *sendInfo = reinterpret_cast<__gm__ BatchWriteItem *>(workspace); // Move the data to be sent from inputGM to the window address specified by localBuf. sendInfo->localBuf = hccl.GetWindowsOutAddr(hccl.GetRankId()); // The receiving address of the peer end must be the window address. The receiving end needs to consider whether to move the receiving address to the output or workspace. sendInfo->remoteBuf = hccl.GetWindowsInAddr(2U); sendInfo->count = 16U; sendInfo->dataType = HcclDataType::HCCL_DATA_TYPE_FP16; sendInfo->remoteRankId = 2U; // You can assemble multiple communication tasks to send data in batches. (sendInfo + 1)->localBuf = hccl.GetWindowsOutAddr(hccl.GetRankId()); (sendInfo + 1)->remoteBuf = hccl.GetWindowsInAddr(3U); (sendInfo + 1)->count = 32U; (sendInfo + 1)->dataType = HcclDataType::HCCL_DATA_TYPE_BFP16; (sendInfo + 1)->remoteRankId = 3U; // Ensure that the data in the cache has been updated to the GM address. GlobalTensor<int64_t> tempTensor; tempTensor.SetGlobalBuffer((__gm__ int64_t *)sendInfo); DataCacheCleanAndInvalid<int64_t, CacheLine::SINGLE_CACHE_LINE, DcciDst::CACHELINE_OUT>(tempTensor); auto handleId = hccl.BatchWrite<true>(sendInfo, 2U); // wait only indicates that the local end has finished data sending. Whether the peer end receives the data is determined by the peer end. hccl.Wait(handleId); AscendC::SyncAll(); hccl.Finalize(); } }
When the communication data volume is large, you can call the SetAicpuBlockDim API in the Tiling process to set the number of AI CPU cores. Within the operator, the optimal core among multiple AI CPU cores will be automatically selected for communication, to achieve better performance. You are advised to set the number of schedulable AI CPU cores to 5.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
static ge::graphStatus BatchWriteTilingFunc(gert::TilingContext* context) { // Omit irrelevant code. auto ascendcPlatform = platform_ascendc::PlatformAscendC(context->GetPlatformInfo()); const auto aicCoreNum = ascendcPlatform.GetCoreNumAic(); auto coreNum = use_aiv ? aicCoreNum * 2 : aicCoreNum; context->SetAicpuBlockDim(5U); context->SetBlockDim(coreNum); context->SetTilingKey(1000); // Omit irrelevant code. SdmaBatchWriteCustomTilingData *tiling = context->GetTilingData<SdmaBatchWriteCustomTilingData>(); AscendC::Mc2CcTilingConfig mc2CcTilingConfig(groupName, 18, "BatchWrite=level0:fullmesh", 0); mc2CcTilingConfig.GetTiling(tiling->mc2InitTiling); mc2CcTilingConfig.GetTiling(tiling->mc2CcTiling); return ge::GRAPH_SUCCESS; }
- Point-to-point communication among multiple queues
On the
Atlas A3 training products /Atlas A3 inference products , if you need to copy a segment of data to two different global memories, you can call the BatchWrite API once to implement batch point-to-point communication.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
struct BatchWriteItem { uint64_t type; uint32_t res1[5]; uint32_t length; uint32_t srcAddrLow; uint32_t srcAddrHigh; uint32_t dstAddrLow; uint32_t dstAddrHigh; uint32_t res2[4]; }; // Define the format based on API conventions. extern "C" __global__ __aicore__ void BatchWrite_custom(GM_ADDR inputGM, GM_ADDR outputGM1, GM_ADDR outputGM2, GM_ADDR tilingGM) { GM_ADDR userWS = GetUserWorkspace(workspace); if (userWS == nullptr) { return; } REGISTER_TILING_DEFAULT(BatchWriteCustomTilingData); // BatchWriteCustomTilingData is the structure defined in the operator header file. GET_TILING_DATA_WITH_STRUCT(BatchWriteCustomTilingData, tilingData, tilingGM); GM_ADDR contextGM = AscendC::GetHcclContext<0>(); if constexpr (g_coreType == AscendC::AIV) { Hccl hccl; hccl.InitV2(contextGM, &tilingData); hccl.SetCcTilingV2(offsetof(BatchWriteCustomTilingData, mc2CcTiling)); __gm__ BatchWriteItem *sendInfo = reinterpret_cast<__gm__ BatchWriteItem *>(inputGM); sendInfo->type = 0UL; sendInfo->length = 64U; sendInfo->srcAddrLow = static_cast<uint32_t>((uint64_t)(inputGM) & 0xFFFFFFFF); sendInfo->srcAddrHigh = static_cast<uint32_t>(((uint64_t)(inputGM) >> 32) & 0xFFFFFFFF); sendInfo->dstAddrLow = static_cast<uint32_t>((uint64_t)(outputGM1) & 0xFFFFFFFF); sendInfo->dstAddrHigh = static_cast<uint32_t>(((uint64_t)(outputGM1) >> 32) & 0xFFFFFFFF); // You can assemble multiple communication tasks to send data in batches. (sendInfo + 1)->type = 0UL; (sendInfo + 1)->length = 64U; (sendInfo + 1)->srcAddrLow = static_cast<uint32_t>((uint64_t)(inputGM) & 0xFFFFFFFF); (sendInfo + 1)->srcAddrHigh = static_cast<uint32_t>(((uint64_t)(inputGM) >> 32) & 0xFFFFFFFF); (sendInfo + 1)->dstAddrLow = static_cast<uint32_t>((uint64_t)(outputGM2) & 0xFFFFFFFF); (sendInfo + 1)->dstAddrHigh = static_cast<uint32_t>(((uint64_t)(outputGM2) >> 32) & 0xFFFFFFFF); // Ensure that the data in the cache has been updated to the GM address. GlobalTensor<int64_t> tempTensor; tempTensor.SetGlobalBuffer((__gm__ int64_t *)sendInfo); DataCacheCleanAndInvalid<int64_t, CacheLine::SINGLE_CACHE_LINE, DcciDst::CACHELINE_OUT>(tempTensor); // Deploy the two copies in queue 0 and queue 1, respectively. auto handleId0 = hccl.BatchWrite<true>(sendInfo, 1U, 0U); auto handleId1 = hccl.BatchWrite<true>(sendInfo, 1U, 1U); // Block the BatchWrite communication task in all queues. All queues will continue to execute after all communication tasks are complete, implementing synchronization of all queues. const uint16_t queueNum = hccl.GetQueueNum(); for (uint16_t i = 0U; i < queueNum; ++i) { hccl.QueueBarrier<ScopeType::ALL>(i); } // Finalize can exit without waiting for all communication tasks on the server to complete, thus releasing AIV core resources as early as possible. hccl.Finalize<false>(); AscendC::SyncAll(); } }
When the communication data volume is large, you can call the SetAicpuBlockDim, SetCommBlockNum and SetQueueNum APIs in the Tiling process to improve the operator performance through the concurrency mechanism.
In the following sample code, the number of cores involved in BatchWrite communication is 24, and the number of communication queues is 2. Therefore, the total number of queues is 48 (24 × 2). In addition, the number of AI CPU cores on the server is 4. In this case, each AI CPU core only needs to orchestrate tasks in 12 (48/4) communication queues, improving communication efficiency.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
static ge::graphStatus BatchWriteTilingFunc(gert::TilingContext* context) { // Omit irrelevant code. auto ascendcPlatform = platform_ascendc::PlatformAscendC(context->GetPlatformInfo()); const auto aicCoreNum = ascendcPlatform.GetCoreNumAic(); auto coreNum = use_aiv ? aicCoreNum * 2 : aicCoreNum; context->SetAicpuBlockDim(4U); context->SetBlockDim(coreNum); context->SetTilingKey(1000); // Omit irrelevant code. SdmaBatchWriteCustomTilingData *tiling = context->GetTilingData<SdmaBatchWriteCustomTilingData>(); AscendC::Mc2CcTilingConfig mc2CcTilingConfig(groupName, 18, "BatchWrite=level0:fullmesh", 0); mc2CcTilingConfig.SetCommBlockNum(24U); mc2CcTilingConfig.SetQueueNum(2U); mc2CcTilingConfig.GetTiling(tiling->mc2InitTiling); mc2CcTilingConfig.GetTiling(tiling->mc2CcTiling); return ge::GRAPH_SUCCESS; }