前置条件和编译命令请参见算子调用示例。
场景:基础场景。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 | #include <acl/acl.h> #include <atb/atb_infer.h> #include <iostream> #include <unistd.h> #include <sys/wait.h> #include "demo_util.h" void ExcuteImpl(atb::Operation *op, atb::VariantPack variantPack, atb::Context *context) { uint64_t workspaceSize = 0; CHECK_STATUS(op->Setup(variantPack, workspaceSize, context)); void *workspace = nullptr; if (workspaceSize > 0) { CHECK_STATUS(aclrtMalloc(&workspace, workspaceSize, ACL_MEM_MALLOC_HUGE_FIRST)); } CHECK_STATUS(op->Execute(variantPack, (uint8_t *)workspace, workspaceSize, context)); if (workspace) { CHECK_STATUS(aclrtFree(workspace)); // 销毁workspace } } void LinearParallelSample(int rank, int rankSize) { int ret = aclInit(nullptr); // 设置每个进程对应的deviceId int deviceId = rank; CHECK_STATUS(aclrtSetDevice(deviceId)); atb::Context *context = nullptr; CHECK_STATUS(atb::CreateContext(&context)); aclrtStream stream = nullptr; CHECK_STATUS(aclrtCreateStream(&stream)); context->SetExecuteStream(stream); atb::Tensor input = CreateTensorFromVector( context, stream, std::vector<float>(64, 2.0), aclDataType::ACL_FLOAT16, aclFormat::ACL_FORMAT_ND, {2, 32}); atb::Tensor weight = CreateTensorFromVector( context, stream, std::vector<float>(64, 2.0), aclDataType::ACL_FLOAT16, aclFormat::ACL_FORMAT_ND, {32, 2}); atb::Tensor output; output.desc.dtype = ACL_FLOAT16; output.desc.format = ACL_FORMAT_ND; output.desc.shape.dimNum = 2; output.desc.shape.dims[0] = 2; output.desc.shape.dims[1] = 2; output.dataSize = atb::Utils::GetTensorSize(output); CHECK_STATUS(aclrtMalloc(&output.deviceData, output.dataSize, ACL_MEM_MALLOC_HUGE_FIRST)); atb::infer::LinearParallelParam param; param.transWeight = false; param.rank = rank; param.rankRoot = 0; param.rankSize = rankSize; param.backend = "hccl"; atb::Operation *op = nullptr; CHECK_STATUS(atb::CreateOperation(param, &op)); atb::VariantPack variantPack; variantPack.inTensors = {input, weight}; variantPack.outTensors = {output}; ExcuteImpl(op, variantPack, context); std::cout << "rank: " << rank << " executed END." << std::endl; // 资源释放 CHECK_STATUS(atb::DestroyOperation(op)); // 销毁op对象 CHECK_STATUS(aclrtDestroyStream(stream)); // 销毁stream CHECK_STATUS(atb::DestroyContext(context)); // 销毁context CHECK_STATUS(aclFinalize()); std::cout << "demo excute success" << std::endl; } int main(int argc, const char *argv[]) { const int processCount = 2; for (int i = 0; i < processCount; i++) { pid_t pid = fork(); // 子进程 if (pid == 0) { LinearParallelSample(i, processCount); return 0; } else if (pid < 0) { std::cerr << "Failed to create process." << std::endl; return 1; } } // 父进程等待子进程执行完成 for (int i = 0; i < processCount; ++i) { wait(NULL); } std::cout << "The communication operator is successfully executed. Parent process exit" << std::endl; return 0; } |