算子下发
开发者完成通信算子任务编排后,需要将Kernel函数下发给具体的通信引擎执行。
对于AI CPU通信引擎,开发者需首先声明算子信息库文件,文件格式为json,文件内容如下:
1 2 3 4 5 6 7 8 9 | { "CustomAicpuKernel": { "opInfo": { "opKernelLib": "AICPUKernel", // 固定值 "kernelSo": "<aicpu_kernel_so_name>", // AI CPU侧动态链接库,用户自定义,如:libp2p_aicpu_kernel.so "functionName": "<aicpu_kernel_func_name>" // AI CPU Kernel函数名字,用户自定义,如:HcclLaunchP2PAicpuKernel } } } |
定义AI CPU Kernel函数,格式如下。其中<aicpu_kernel_func_name>为AI CPU Kernel函数名字,函数名必须与算子信息库文件(json文件)中的functionName字段保持一致。
extern "C" unsigned int <aicpu_kernel_func_name>(void *param)
{
// Kernel 实现
}
代码示例
以自定义Send/Receive算子为例,使用AI CPU通信引擎时,Host侧下发AI CPU Kernel函数的代码片段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | // Host stream通知Device主thread aclrtRecordNotify(g_notifies[0], stream); std::string kernelName = "HcclLaunchP2PAicpuKernel"; aclrtFuncHandle funcHandle; aclrtArgsHandle argsHandle; ACLCHECK(aclrtBinaryGetFunction(g_binKernelHandle, kernelName.c_str(), &funcHandle)); ACLCHECK(aclrtKernelArgsInit(funcHandle, &argsHandle)); aclrtParamHandle paraHandle; aclrtKernelArgsAppend(argsHandle, ¶m, sizeof(OpParam), ¶Handle); aclrtKernelArgsFinalize(argsHandle); uint16_t NOTIFY_DEFAULT_WAIT_TIME = 27 * 68; aclrtLaunchKernelCfg cfg; aclrtLaunchKernelAttr attr; attr.id = ACL_RT_LAUNCH_KERNEL_ATTR_TIMEOUT; attr.value.timeout = NOTIFY_DEFAULT_WAIT_TIME; cfg.numAttrs = 1; cfg.attrs = &attr; constexpr uint32_t blockDim = 1; // 执行device侧的算法编排 aclrtLaunchKernelWithConfig(funcHandle, blockDim, stream, &cfg, argsHandle, nullptr); // Host stream等待Device的通知 aclrtWaitAndResetNotify(g_notifies[1], stream, CUSTOM_TIMEOUT); |
在AI CPU侧需要定义Kernel函数入口,该函数需编译到Device侧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 | typedef struct { void *addr; uint64_t size; } CommBuffer; struct AlgResourceCtx { ThreadHandle threadHandle; // 通信线程句柄 CommBuffer localBuffer; // 本端通信内存 CommBuffer remoteBuffer; // 远端通信内存 ChannelHandle channelHandle; // 通信通道资源 uint32_t notifyIds[AICPU_CONTROL_NOTIFY_NUM]; // aicpu模式下device侧控制notify }; struct OpParam { char tag[TAG_LENGTH]; char commName[COMM_INDENTIFIER_MAX_LENGTH]; // 通信域名称 void* inputPtr = nullptr; // 算子输入数据地址 void* outputPtr = nullptr; // 算子输出数据地址 uint64_t count = 0; // 算子数据量 HcclDataType dataType = HCCL_DATA_TYPE_RESERVED; // 算子数据类型 HcclCMDType opType = HcclCMDType::HCCL_CMD_INVALID; // 算子类型 AlgResourceCtx* resCtx = nullptr; // 资源上下文 }; // 在AI CPU上执行的Kernel函数 extern "C" unsigned int HcclLaunchP2PAicpuKernel(OpParam *param) { HCCL_INFO("Entry-%s, commName[%s], tag[%s]", __func__, param->commName, param->tag); if (HcommAcquireComm(param->commName) != HCCL_SUCCESS) { HCCL_ERROR("%s HcommAcquireComm fail, commName[%s]", __func__, param->commName); return 1; } // 获取Device侧主thread ThreadHandle thread = param->resCtx->threadHandle; if (HcommBatchModeStart(param->tag) != HCCL_SUCCESS) { HCCL_ERROR("failed start batch mode"); return 1; } // 主thread等待Host stream的通知 if (HcommAclrtNotifyWaitOnThread(thread, param->resCtx->notifyIds[0], CUSTOM_TIMEOUT) != HCCL_SUCCESS) { HCCL_ERROR("failed to wait notify[%d] from host main stream", param->resCtx->notifyIds[0]); return 1; } // 执行任务编排 if (ExecOp(*param, param->resCtx) != HCCL_SUCCESS) { HCCL_ERROR("orchestrate failed for op:%d", param->opType); return 1; } // 主thread通知Host stream if (HcommAclrtNotifyRecordOnThread(thread, param->resCtx->notifyIds[1]) != HCCL_SUCCESS) { HCCL_ERROR("failed to record host main stream"); return 1; } if (HcommBatchModeEnd(param->tag) != HCCL_SUCCESS) { HCCL_ERROR("failed end batch mode"); return 1; } if (HcommReleaseComm(param->commName) != HCCL_SUCCESS) { HCCL_ERROR("%s HcommReleaseComm fail, commName[%s]", __func__, param->commName); return 1; } HCCL_INFO("%s success, commName[%s], tag[%s]", __func__, param->commName, param->tag); return 0; } |
父主题: 通信算子开发