样例代码
代码目录结构示例如下,此处以llm_manager_demo为例,请根据实际情况修改。
MindIE-LLM |____CMakeLists.txt |____src |____tools | |____llm_manager_demo | |____engine_util.cpp | |____llm_infer_engine.cpp | |____metric.cpp | |____engine_util.h | |____llm_infer_engine.h | |____metric.h | |____llm_engine_test.cpp | |____CMakeLists.txt
“llm_manager_demo”下的文件含义和作用如表1所示。
源文件 |
含义及作用 |
---|---|
engine_util.cpp |
IOManager类和Data类的具体实现,支持从数据集文件中读取请求数据。 |
llm_infer_engine.cpp |
基于LLM Manager API实现的推理引擎LlmInferEngine类的具体实现,是LLM Manager样例的核心类。 |
metric.cpp |
推理结果的统计类的实现,主要实现了统计过程中的工具函数。 |
engine_util.h |
定义了样例中的IOManager类和Data类,支持从数据集文件中读取请求数据。 |
llm_infer_engine.h |
基于LLM Manager API实现的推理引擎LlmInferEngine类的头文件。 |
metric.h |
推理结果的统计类的头文件,主要定义了推理结果的数据结构和统计过程中的工具函数。 |
llm_engine_test.cpp |
基于LlmInferEngine类实现了一个完整的推理流程样例,实现数据集的读取,请求的构造,推理结果的统计。 |
CMakeLists.txt |
样例代码的CMakeLists.txt。 |
样例代码:
- llm_infer_engine.h
/* * Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. */ #pragma once #include "infer_request.h" #include "llm_manager.h" #include <condition_variable> #include <deque> #include <functional> #include <mutex> #include <queue> #include <string> #include <vector> #include "llm_infer_response.h" namespace mindie_llm { class LlmInferEngine { public: LlmInferEngine(); ~LlmInferEngine(); Status Init(const SendResponseCallback4Request &callback = nullptr, const std::string &configPath = ""); Status Forward(std::shared_ptr<InferRequest> &request); Status GetProcessingRequest(uint64_t *num); Status GetRequestBlockQuotas(uint64_t *remainBlocks, uint64_t *remainPrefillSlots, uint64_t *remainPrefillTokens); Status ControlRequest(const InferRequestId &requestId, Operation operation); Status Finalize(); Status GetMaxBatchSize(uint64_t *batchSize); Status GetNodeStatus(std::map<std::string, mindie_llm::NodeHealthStatus>& slaveStatus); SendResponseCallback4Request sendResponseCallback_; std::shared_ptr<mindie_llm::LlmManager> llmManager_ = nullptr; std::vector<std::shared_ptr<mindie_llm::InferRequest>> GetRequestCallbackImpl(); void SendStatusCallbackImpl(mindie_llm::InferRequestId reqId, mindie_llm::Status status, mindie_llm::StatusResponseType type); void StatsCallbackImpl(const std::string &status); std::queue<std::shared_ptr<mindie_llm::InferRequest>> requestQueue_{}; std::queue<std::pair<mindie_llm::InferRequestId, mindie_llm::Operation>> stopIdQueue_{}; std::mutex forwardMutex_; std::mutex stopMutex_; std::shared_mutex statsMutex_; std::unordered_map<mindie_llm::InferRequestId, SendResponseCallback4Request> callbackMap_{}; std::unordered_map<mindie_llm::InferRequestId, std::unique_ptr<std::condition_variable>> forwardCVMap_; std::unordered_map<mindie_llm::InferRequestId, mindie_llm::Status> forwardStatusMap_{}; std::unordered_map<mindie_llm::InferRequestId, std::unique_ptr<std::condition_variable>> cvMap_; std::unordered_map<mindie_llm::InferRequestId, mindie_llm::Status> ctrlStatusMap_{}; uint64_t remainBlocks_ = 0; uint64_t remainPrefillSlots_ = 0; uint64_t remainPrefillTokens_ = 0; uint64_t processingRequestNum_ = 0; std::map<std::string, mindie_llm::NodeHealthStatus> slavesStatus_{}; } };
- llm_infer_engine.cpp
#include "llm_infer_engine.h" #include "common_util.h" #include "config_manager.h" #include "log.h" namespace mindie_llm { LlmInferEngine::LlmInferEngine() { MINDIE_LLM_LOG_INFO("start infer engine init"); } LlmInferEngine::~LlmInferEngine() = default; static bool InitManagerConfig(uint32_t modelInstanceId, std::set<size_t> &npuDeviceIds, const std::string &configPath) { try { auto &configManager = mindie_llm::config::ConfigManager::GetInstance(configPath); auto &backendConfig = configManager.GetBackendConfig(); for (auto &npuID : backendConfig.npuDeviceIds[modelInstanceId]) { npuDeviceIds.insert(npuID); } } catch (const std::exception &e) { return false; } return true; } static std::shared_ptr<LlmInferResponse> ConstructResponseByTensorMap(InferRequestId reqId, const TensorMap &output, bool isFinal, const std::string &errMsg) { auto response = std::make_shared<LlmInferResponse>(reqId); for (auto it = output.begin(); it != output.end(); it++) { auto &tensor = it->second; auto status = response->AddOutput(tensor); if (!status.IsOk()) { MINDIE_LLM_LOG_ERROR(" return status error"); break; } } response->SetFlags(std::stoi(errMsg)); response->SetEOS(isFinal); return response; } Status LlmInferEngine::Init(const SendResponseCallback4Request &callback, const std::string &configPath) { if (callback == nullptr) { return Status(Error::Code::ERROR, "engine init failed. callback is null"); } sendResponseCallback_ = callback; // GetRequestsCallback, get requests from the queue mindie_llm::GetRequestsCallback getRequestCallback = [this]() { return GetRequestCallbackImpl(); }; // SendResponsesCallback, get response from the tensormap mindie_llm::SendResponsesCallback sendResponsesCallback = [this](InferRequestId reqId, const TensorMap &output, bool isFinal, const std::string &errMsg) { auto llmInferResponse = ConstructResponseByTensorMap(reqId, output, isFinal, errMsg); auto inferResponse = std::dynamic_pointer_cast<LlmInferResponse>(llmInferResponse); std::shared_ptr<mindie_llm::InferResponse> inferRes = std::static_pointer_cast<mindie_llm::InferResponse>(inferResponse); // get users callback function std::unique_lock lock(this->forwardMutex_); if (this->callbackMap_.find(reqId) != this->callbackMap_.end()) { this->callbackMap_[reqId](inferRes); if (isFinal) { this->callbackMap_.erase(reqId); } return; } this->sendResponseCallback_(inferRes); }; // StopSignalCallback, get stop request id from the queue mindie_llm::ControlSignalCallback stopSignalCallback = [this]() { std::vector<std::pair<mindie_llm::InferRequestId, mindie_llm::Operation>> stopList; std::unique_lock lock(this->stopMutex_); while (!this->stopIdQueue_.empty()) { stopList.push_back(this->stopIdQueue_.front()); this->stopIdQueue_.pop(); } return stopList; }; // LLMRuntimeStatsCallback, get status mindie_llm::LlmManagerStatsCallback statsCallback = [this](const std::string &status) { StatsCallbackImpl(status); }; mindie_llm::SendStatusResponseCallback sendStatusCallback = [this](mindie_llm::InferRequestId reqId, mindie_llm::Status status, mindie_llm::StatusResponseType type) { SendStatusCallbackImpl(reqId, status, type); }; try { llmManager_ = std::make_shared<mindie_llm::LlmManager>(configPath, getRequestCallback, sendResponsesCallback, stopSignalCallback, statsCallback, sendStatusCallback); } catch (const std::runtime_error &e) { return Status(Error::Code::ERROR, e.what()); } uint32_t modelInstanceId = 0; std::set<size_t> npuDeviceIds; if (!InitManagerConfig(modelInstanceId, npuDeviceIds, configPath)) { return Status(Error::Code::ERROR, "init manager config failed"); } auto status = llmManager_->Init(modelInstanceId, npuDeviceIds); if (!status.IsOk()) { return Status(Error::Code::ERROR, "llmManager init failed"); } return Status(Error::Code::OK, "Success"); } Status LlmInferEngine::Forward(std::shared_ptr<InferRequest> &request) { if (llmManager_ == nullptr) { return Status(Error::Code::ERROR, "RuntimeEngine not init!"); } if (request == nullptr) { return Status(Error::Code::ERROR, "input request is null"); } std::unique_lock lock(forwardMutex_); InferRequestId reqId = request->GetRequestId(); if (callbackMap_.find(reqId) != callbackMap_.end()) { return Status(Error::Code::ERROR, "Request id has been used before!"); } callbackMap_[reqId] = sendResponseCallback_; // Add request to queue requestQueue_.push(request); forwardCVMap_[reqId] = std::make_unique<std::condition_variable>(); auto res = forwardCVMap_[reqId]->wait_for(lock, std::chrono::seconds(300U)); if (res == std::cv_status::timeout) { if (forwardStatusMap_.find(reqId) != forwardStatusMap_.end()) { forwardStatusMap_.erase(reqId); } if (forwardCVMap_.find(reqId) != forwardCVMap_.end()) { forwardCVMap_.erase(reqId); } return Status(Error::Code::ERROR, "forward timeout"); } auto status = mindie_llm::Status(mindie_llm::Error::Code::OK, "forward failed"); if (forwardStatusMap_.find(reqId) != forwardStatusMap_.end()) { status = forwardStatusMap_[reqId]; forwardStatusMap_.erase(reqId); } if (forwardCVMap_.find(reqId) != forwardCVMap_.end()) { forwardCVMap_.erase(reqId); } return mindie_llm::Status(static_cast<mindie_llm::Error::Code>(status.StatusCode()), status.StatusMsg()); } Status LlmInferEngine::GetRequestBlockQuotas(uint64_t *remainBlocks, uint64_t *remainPrefillSlots, uint64_t *remainPrefill) { if (remainBlocks == nullptr) { return Status(Error::Code::INVALID_ARG, "mindie_llm::Error: remainBlocks is nullptr"); } if (remainPrefillSlots == nullptr) { return Status(Error::Code::INVALID_ARG, "mindie_llm::Error: remainPrefillSlots is nullptr"); } if (remainPrefill == nullptr) { return Status(Error::Code::INVALID_ARG, "mindie_llm::Error: remainPrefillTokens is nullptr"); } std::shared_lock lock(statsMutex_); *remainBlocks = this->remainBlocks_; *remainPrefillSlots = this->remainPrefillSlots_; *remainPrefill = this->remainPrefillTokens_; return Status(Error::Code::OK, "Success"); } Status LlmInferEngine::GetProcessingRequest(uint64_t *num) { if (llmManager_ == nullptr) { return Status(Error::Code::ERROR, "llmInferEngine is not initialized!"); } if (num == nullptr) { return Status(Error::Code::ERROR, "requestNumber ptr can not be nullptr"); } std::shared_lock lock(statsMutex_); *num = processingRequestNum_; return Status(Error::Code::OK, "Success"); } Status LlmInferEngine::Finalize() { // 1. finalize engine if (this->llmManager_ == nullptr) { return Status(Error::Code::ERROR, "infer engine finalize failed. llm mananger is null ptr." "please make sure infer engine is successfully initialized."); } this->llmManager_->Shutdown(); return Status(Error::Code::OK, "Success."); } Status LlmInferEngine::GetMaxBatchSize(uint64_t *batchSize) { if (batchSize == nullptr) { return Status(Error::Code::ERROR, "args can not be nullptr"); } auto &configManager = mindie_llm::config::ConfigManager::GetInstance(); auto &scheduleConfig = configManager.GetScheduleConfig(); *batchSize = scheduleConfig.maxBatchSize; return Status(Error::Code::OK); } Status LlmInferEngine::GetNodeStatus(std::map<std::string, mindie_llm::NodeHealthStatus> &slaveStatus) { if (llmManager_ == nullptr) { return Status(Error::Code::ERROR, "llmInferEngine is not initialized!"); } std::shared_lock lock(statsMutex_); slaveStatus = slavesStatus_; return Status(Error::Code::OK, "get node status success"); } std::vector<std::shared_ptr<mindie_llm::InferRequest>> LlmInferEngine::GetRequestCallbackImpl() { std::vector<std::shared_ptr<mindie_llm::InferRequest>> requests{}; std::unique_lock lock(this->forwardMutex_); while (!this->requestQueue_.empty()) { requests.emplace_back(this->requestQueue_.front()); this->requestQueue_.pop(); } return requests; } void LlmInferEngine::StatsCallbackImpl(const std::string &status) { Json receivedJson = Json::parse(status); std::shared_lock lock(this->statsMutex_); this->slavesStatus_ = receivedJson["slaves_status"]; this->remainBlocks_ = receivedJson["remain_blocks"]; this->remainPrefillSlots_ = receivedJson["remain_prefill_slots"]; this->remainPrefillTokens_ = receivedJson["remain_prefill_tokens"]; this->processingRequestNum_ = receivedJson["processing_request_num"]; } void LlmInferEngine::SendStatusCallbackImpl(mindie_llm::InferRequestId reqId, mindie_llm::Status status, mindie_llm::StatusResponseType type) { if (type == mindie_llm::StatusResponseType::CONTROL_SIGNAL_STATUS) { std::unique_lock lock(this->stopMutex_); if (this->cvMap_.find(reqId) != this->cvMap_.end()) { this->ctrlStatusMap_[reqId] = status; if (this->cvMap_[reqId] == nullptr) { return; } this->cvMap_[reqId]->notify_one(); } } else if (type == mindie_llm::StatusResponseType::REQUEST_ENQUEUE_STATUS) { std::unique_lock lock(this->forwardMutex_); if (this->forwardCVMap_.find(reqId) != this->forwardCVMap_.end()) { this->forwardStatusMap_[reqId] = status; if (this->forwardCVMap_[reqId] == nullptr) { return; } this->forwardCVMap_[reqId]->notify_one(); } } else { MINDIE_LLM_LOG_ERROR("SendStatusResponseCallback type invalid!"); } } } // namespace mindie_llm
- engine_util.h
/* * Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. */ #ifndef ENGINE_UTIL_H #define ENGINE_UTIL_H #pragma once #include <climits> #include <cmath> #include <deque> #include <iostream> #include <map> #include <mutex> #include <string> #include <vector> #include <atomic> #include <memory> #include "infer_request.h" namespace mindie_llm { struct Data { std::string id; void *data; // token_id int64_t size; explicit Data(const std::string &id, void *data, int64_t size = 0) { this->id = id; this->data = data; this->size = size; } Data(const Data &other) = delete; Data &operator=(const Data &rhs) = delete; ~Data() { if (data != nullptr) { free(data); data = nullptr; } } }; inline Data *CreateData(void *data, uint64_t size) { static std::atomic<uint64_t> id; return new Data(std::to_string(id++), data, size); } std::vector<std::vector<int64_t>> LoadData(const std::string &filepath, bool &isOk); std::shared_ptr<Data> ConvertData(std::vector<int64_t> &srcData); template <typename T> void ConvertData(std::vector<std::vector<int64_t>> &&srcData, T &dstData) { dstData.resize(srcData.size()); for (size_t i = 0; i < dstData.size(); ++i) { auto tmpData = ConvertData(srcData[i]); if (!tmpData) { dstData.resize(i); std::cout << "failed to convert data." << std::endl; std::cout << "original dataset length is: " << srcData.size() << std::endl; std::cout << "actual dataset length is: " << i << std::endl; break; } dstData[i] = tmpData; } } class IOManager { public: IOManager() noexcept {}; int SetInputData(const std::string &dataset) { char resolvedPath[PATH_MAX] = {0}; std::unique_lock<std::mutex> lock(mtx); if (realpath(dataset.c_str(), resolvedPath) == nullptr) { std::cout << "the path of dataset is invalid!" << std::endl; return -1; } bool isOk = true; auto data = LoadData(resolvedPath, isOk); if (!isOk) { return -1; } ConvertData(std::move(data), inputs_); return 0; } bool Empty() { std::unique_lock<std::mutex> lck(mtx); return inputs_.empty(); } std::vector<std::shared_ptr<Data>> GetInputData(size_t n) { std::unique_lock<std::mutex> lck(mtx); std::vector<std::shared_ptr<Data>> ret; for (size_t i = 0; i < n; ++i) { if (!inputs_.empty()) { auto tmpData = inputs_.front(); ret.emplace_back(tmpData); usingData_[tmpData->id] = tmpData; inputs_.pop_front(); } } return ret; } std::vector<std::shared_ptr<Data>> GetWarmupInputs(size_t n) { std::unique_lock<std::mutex> lck(mtx); std::vector<std::shared_ptr<Data>> ret; for (size_t i = 0; i < n && i < inputs_.size(); ++i) { if (!inputs_.empty()) { auto tmpData = inputs_.at(i); ret.emplace_back(tmpData); } } return ret; } std::vector<std::shared_ptr<Data>> GetInputDataByQuotas(size_t remainPrefillSlots, size_t remainPrefillTokens, size_t slotNum) { std::vector<std::shared_ptr<Data>> ret; std::unique_lock<std::mutex> lck(mtx); while (!inputs_.empty()) { auto tmpData = inputs_.front(); size_t demandTokenNum = static_cast<size_t>(tmpData->size); if (remainPrefillSlots > 0 && remainPrefillTokens >= demandTokenNum && slotNum > 0) { ret.emplace_back(tmpData); usingData_[tmpData->id] = tmpData; inputs_.pop_front(); remainPrefillSlots -= 1; remainPrefillTokens -= demandTokenNum; slotNum -= 1; } else { break; } } return ret; } void SetOutputData(const std::string &id) { std::unique_lock<std::mutex> lck(mtx); usingData_.erase(id); } private: // 创建input queue, output queue std::deque<std::shared_ptr<Data>> inputs_; std::mutex mtx; std::map<std::string, std::shared_ptr<Data>> usingData_; }; std::vector<std::shared_ptr<InferRequest>> Data2Request(const std::vector<std::shared_ptr<Data>> &data); struct SamplingParams { float temperature = 1.0; uint32_t topK = 0; float topP = 1.0; float typicalP = 1.0; bool doSample = false; uint32_t seed = 1; float repetitionPenalty = 1.0; bool watermark = false; float frequencyPenalty = 0.0f; float presencePenalty = 0.0f; }; } // namespace mindie_llm #endif
- engine_util.cpp
/* * Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. */ #include "engine_util.h" #include <iostream> #include "common_util.h" #include "mindie_llm/file_utils.h" namespace mindie_llm { std::vector<std::shared_ptr<InferRequest>> Data2Request(const std::vector<std::shared_ptr<Data>> &data) { std::vector<std::shared_ptr<InferRequest>> requests(data.size()); for (size_t i = 0; i < data.size(); ++i) { // 1. construct request requests[i] = std::make_shared<InferRequest>(InferRequestId(data[i]->id)); std::vector<int64_t> shape({1, data[i]->size}); mindie_llm::InferDataType dataType = mindie_llm::InferDataType::TYPE_INT64; auto inputsTensor = std::make_shared<mindie_llm::InferTensor>("INPUT_IDS", dataType, shape); inputsTensor->SetBuffer(data[i]->data, data[i]->size * sizeof(int64_t), false); requests[i]->AddTensor(inputsTensor->GetName(), inputsTensor); } return requests; } constexpr uint32_t MAX_TOKEN_ALLOWED = 51200000; // token数规格为51200000 constexpr uint32_t MAX_TOKEN_BYTE_ALLOWED = MAX_TOKEN_ALLOWED * sizeof(int64_t); constexpr uint32_t MAX_DATA_FILE_SIZE_LIMIT = 500 * 1024 * 1024; // 500 MB const mode_t MAX_DATASET_PERM = S_IRUSR | S_IWUSR | S_IRGRP; // 640 // data loader code std::vector<std::vector<int64_t>> LoadData(const std::string &filepath, bool &isOk) { std::vector<std::vector<int64_t>> data; std::string errmsg = ""; if (!mindie_llm::FileUtils::RegularFilePath(filepath, errmsg)) { isOk = false; std::cout << "LoadData: RegularFilePath is invalid!" << std::endl; std::cout << errmsg << std::endl; return data; } bool checkFlag = mindie_llm::FileUtils::GetCheckPermissionFlag(); mindie_llm::FileValidationParams params = {true, MAX_DATASET_PERM, MAX_DATA_FILE_SIZE_LIMIT, checkFlag}; if (!mindie_llm::FileUtils::IsFileValid(filepath, errmsg, params)) { isOk = false; std::cout << "LoadData: IsFileValid is invalid!" << std::endl; std::cout << errmsg << std::endl; return data; } std::ifstream inFile(filepath, std::ios::in); std::string lineStr; while (getline(inFile, lineStr)) { // 1. split data std::vector<std::string> sData = mindie_llm::Split(lineStr, ','); // 2. convert to int64_t std::vector<int64_t> tmpData(sData.size()); for (size_t i = 0; i < sData.size(); ++i) { tmpData[i] = std::strtoll(sData[i].c_str(), nullptr, 10u); } data.emplace_back(tmpData); } return data; } std::shared_ptr<Data> ConvertData(std::vector<int64_t> &srcData) { // 1. allocate data size_t size = srcData.size(); if (size > 0 && size <= MAX_TOKEN_BYTE_ALLOWED) { auto pData = std::shared_ptr<Data>(CreateData(malloc(size * sizeof(int64_t)), size)); if (pData == nullptr) { std::cout << " ConvertData error: data is nullptr" << std::endl; return nullptr; } // 2. convert uint32 to fp16 for (size_t idx = 0; idx < srcData.size(); ++idx) { static_cast<int64_t *>(pData->data)[idx] = srcData[idx]; } return pData; } std::cout << "error: invalid size in convertData" << std::endl; return nullptr; } } // namespace mindie_llm
- metric.h
/* * Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. */ #ifndef HELPER_METRICS_H #define HELPER_METRICS_H #pragma once #include <chrono> #include <ctime> #include <map> #include <string> #include <vector> using TtimeT = std::chrono::steady_clock::time_point; namespace mindie_llm { struct Metrics { TtimeT startingTime; // 开始时间 TtimeT endingTime; // 结束时间 TtimeT lastTokenTime; // 最后一个Token的生成时间 size_t tokensInput = 0; // 输入Token数量 size_t tokensOutput = 0; // 输出Token数量 size_t firstTokenCost = 0; // 首Token耗时 size_t lastTokenCost = 0; // 最后Token耗时 std::vector<size_t> decodeTime; // 每一轮decode的耗时 std::vector<int64_t> outputTokenIds; // 输出的Token Id集合 bool end; }; struct Statistics { std::string modelFullName; size_t serverCount = 1; // 节点数量 size_t tp = 8; // 张量并行数 size_t pp = 1; // 流水线并行数 size_t latencyForAll = 0; // E2E总耗时 size_t requestNumber = 0; // 总请求数量 float latencyForPerInfer = 0; // 从Forward开始到推理出last token耗时 float averageInputLength = 0; // 平均输入长度 float averageOutputLength = 0; // 平均输出长度 float lpct = 0; float lpot = 0; size_t p10Lpot = 0; size_t p20Lpot = 0; size_t p30Lpot = 0; size_t p40Lpot = 0; size_t p50Lpot = 0; size_t p60Lpot = 0; size_t p80Lpot = 0; size_t p90Lpot = 0; size_t p95Lpot = 0; size_t p99Lpot = 0; size_t pMaxLpot = 0; size_t averageFirstToken = 0; // 平均首Token耗时 size_t p99FirstToken = 0; // 99分位首Token耗时 size_t maxFirstToken = 0; // 最大首Token耗时 size_t averageLastToken = 0; // 平均最后Token耗时 size_t p99LastToken = 0; // 99分位最后Token耗时 size_t maxLastToken = 0; // 最大最后Token耗时 size_t maxDecode = 0; // 最大Decode耗时 double qps = 0.0; // 每分钟完成请求数 double qpsPerNpu = 0.0; // 每卡每分钟完成请求数 }; void FormatMetrics(std::map<std::string, Metrics> &metrics, Statistics &statistics); void PrintStatistics(Statistics &statistics); void WriteOutputIds(std::map<std::string, std::vector<int64_t>> &allOutputIds, const std::string &outCsv, const std::string &dir); } // test_engine_manager_helper #endif
- metric.cpp
/* * Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. */ #include "metric.h" #include "common_util.h" #include "mindie_llm/file_utils.h" #include <iostream> #include <numeric> /** * id格式 kuaishou_xxx * 截取数字部分排序 */ namespace mindie_llm { bool Compare(const std::pair<std::string, std::vector<int64_t>> &p1, const std::pair<std::string, std::vector<int64_t>> &p2) { auto id1 = std::stoi(p1.first); auto id2 = std::stoi(p2.first); return id1 < id2; } /** * 把 Output Id 写入文件 * @param allOutputIds 所有请求输出的Output Id * @param requestNameVec 请求 * @param outCsv 输出csv文件名 */ void WriteOutputIds(std::map<std::string, std::vector<int64_t>> &allOutputIds, const std::string &outCsv, const std::string &dir) { // 对map进行排序 std::vector<std::pair<std::string, std::vector<int64_t>>> outputVec(allOutputIds.cbegin(), allOutputIds.cend()); std::sort(outputVec.begin(), outputVec.end(), Compare); std::string errmsg = ""; if (!mindie_llm::FileUtils::RegularFilePath(dir, errmsg)) { std::cout << "errmsg: " << errmsg << std::endl; return; } std::string filePath = dir; filePath = filePath + outCsv; std::ofstream outFile(filePath, std::ios::out); if (!outFile.is_open()) { std::cerr << "Open outCsv file failed! outCsv: " << outCsv << std::endl; return; } for (auto &k : outputVec) { auto &outputIds = k.second; if (outputIds.empty()) { continue; } outFile << outputIds[0]; for (size_t i = 1; i < outputIds.size(); i++) { outFile << "," << outputIds[i]; } outFile << "\r\n"; } outFile.close(); } size_t Quantile(std::vector<size_t> &data, double q) { std::sort(data.begin(), data.end()); const size_t n = data.size(); double id = (n - 1) * q; size_t lo = static_cast<size_t>(std::floor(id)); size_t hi = static_cast<size_t>(std::ceil(id)); double qs = data[lo]; double h = (id - lo); return (1.0 - h) * qs + h * data[hi]; } void CalcLpot(Statistics &statistics, std::vector<size_t> &decodeTimes) { float decodeCosts = std::accumulate(decodeTimes.begin(), decodeTimes.end(), 0.0); statistics.lpot = decodeCosts / decodeTimes.size(); statistics.p10Lpot = Quantile(decodeTimes, 0.1f); statistics.p20Lpot = Quantile(decodeTimes, 0.2f); statistics.p30Lpot = Quantile(decodeTimes, 0.3f); statistics.p40Lpot = Quantile(decodeTimes, 0.4f); statistics.p50Lpot = Quantile(decodeTimes, 0.5f); statistics.p60Lpot = Quantile(decodeTimes, 0.6f); statistics.p80Lpot = Quantile(decodeTimes, 0.8f); statistics.p90Lpot = Quantile(decodeTimes, 0.9f); statistics.p95Lpot = Quantile(decodeTimes, 0.95f); statistics.p99Lpot = Quantile(decodeTimes, 0.99f); statistics.pMaxLpot = Quantile(decodeTimes, 1.0f); } void FormatMetrics(std::map<std::string, Metrics> &metrics, Statistics &statistics) { if (statistics.requestNumber == 0) { return; } std::vector<size_t> inputTokens; // 输入token数 std::vector<size_t> outputTokens; // 输出token数 std::vector<size_t> firstTokenCosts; // 首token耗时 std::vector<size_t> lastTokenCosts; // 尾token耗时 std::vector<size_t> generateCosts; // 总生成时间 std::vector<size_t> decodeTimes; // 每个请求每次decode耗时 std::map<std::string, std::vector<int64_t>> outputTokensId; size_t totalInferTime = 0; for (auto iter = metrics.cbegin(); iter != metrics.cend(); ++iter) { auto &m = (*iter).second; inputTokens.emplace_back(m.tokensInput); outputTokens.emplace_back(m.tokensOutput); firstTokenCosts.emplace_back(m.firstTokenCost); lastTokenCosts.emplace_back(m.lastTokenCost); generateCosts.emplace_back(mindie_llm::GetDuration(m.lastTokenTime, m.startingTime)); decodeTimes.insert(decodeTimes.end(), m.decodeTime.begin(), m.decodeTime.end()); outputTokensId[(*iter).first] = m.outputTokenIds; totalInferTime += mindie_llm::GetDuration(m.endingTime, m.startingTime); } // 总输入token数 auto inputTokensSum = std::accumulate(inputTokens.begin(), inputTokens.end(), 0); // 总输出token数 auto outputTokensSum = std::accumulate(outputTokens.begin(), outputTokens.end(), 0); // 总首token时间 auto firstTokenCostsSum = std::accumulate(firstTokenCosts.begin(), firstTokenCosts.end(), 0); // 总尾token时间 auto lastTokenCostsSum = std::accumulate(lastTokenCosts.begin(), lastTokenCosts.end(), 0); // lpct if (inputTokensSum != 0) { statistics.lpct = static_cast<float>(firstTokenCostsSum) / inputTokensSum; } else { return; } // lpot CalcLpot(statistics, decodeTimes); statistics.latencyForPerInfer = static_cast<float>(totalInferTime) / statistics.requestNumber; statistics.averageInputLength = static_cast<float>(inputTokensSum) / statistics.requestNumber; statistics.averageOutputLength = static_cast<float>(outputTokensSum) / statistics.requestNumber; statistics.averageFirstToken = static_cast<size_t>(firstTokenCostsSum) / statistics.requestNumber; statistics.p99FirstToken = Quantile(firstTokenCosts, 0.99f); statistics.maxFirstToken = *std::max_element(firstTokenCosts.begin(), firstTokenCosts.end()); statistics.averageLastToken = static_cast<size_t>(lastTokenCostsSum) / statistics.requestNumber; statistics.p99LastToken = Quantile(lastTokenCosts, 0.99f); statistics.maxLastToken = *std::max_element(lastTokenCosts.begin(), lastTokenCosts.end()); statistics.maxDecode = *std::max_element(decodeTimes.begin(), decodeTimes.end()); if (statistics.latencyForAll == 0 || statistics.tp == 0) { return; } statistics.qps = statistics.requestNumber / (statistics.latencyForAll / 1000.0f); statistics.qpsPerNpu = statistics.qps / statistics.tp; } void PrintStatistics(Statistics &statistics) { std::cout << "model_name:\t\t" << statistics.modelFullName << std::endl; std::cout << "server_count:\t\t" << statistics.serverCount << std::endl; std::cout << "tp:\t\t\t" << statistics.tp << std::endl; std::cout << "pp:\t\t\t" << statistics.pp << std::endl; std::cout << "request_num:\t\t" << statistics.requestNumber << std::endl; std::cout << "average_input_length:\t" << statistics.averageInputLength << std::endl; std::cout << "average_output_length:\t" << statistics.averageOutputLength << std::endl; // 耗时 std::cout << "latency_for_all:\t" << statistics.latencyForAll << "ms" << std::endl; std::cout << "latency_for_per_infer:\t" << statistics.latencyForPerInfer << "ms" << std::endl; std::cout << "lpct:\t\t\t" << statistics.lpct << "ms" << std::endl; std::cout << "lpot:\t\t\t" << statistics.lpot << "ms" << std::endl; std::cout << "p10_lpot:\t\t" << statistics.p10Lpot << "ms" << std::endl; std::cout << "p20_lpot:\t\t" << statistics.p20Lpot << "ms" << std::endl; std::cout << "p30_lpot:\t\t" << statistics.p30Lpot << "ms" << std::endl; std::cout << "p40_lpot:\t\t" << statistics.p40Lpot << "ms" << std::endl; std::cout << "p50_lpot:\t\t" << statistics.p50Lpot << "ms" << std::endl; std::cout << "p60_lpot:\t\t" << statistics.p60Lpot << "ms" << std::endl; std::cout << "p80_lpot:\t\t" << statistics.p80Lpot << "ms" << std::endl; std::cout << "p90_lpot:\t\t" << statistics.p90Lpot << "ms" << std::endl; std::cout << "p95_lpot:\t\t" << statistics.p95Lpot << "ms" << std::endl; std::cout << "p99_lpot:\t\t" << statistics.p99Lpot << "ms" << std::endl; std::cout << "pmax_lpot:\t\t" << statistics.pMaxLpot << "ms" << std::endl; std::cout << "average_first_token:\t" << statistics.averageFirstToken << "ms" << std::endl; std::cout << "p99_first_token:\t" << statistics.p99FirstToken << "ms" << std::endl; std::cout << "max_first_token:\t" << statistics.maxFirstToken << "ms" << std::endl; std::cout << "average_last_token:\t" << statistics.averageLastToken << "ms" << std::endl; std::cout << "p99_last_token:\t\t" << statistics.p99LastToken << "ms" << std::endl; std::cout << "max_last_token:\t\t" << statistics.maxLastToken << "ms" << std::endl; std::cout << "max_decode_time:\t" << statistics.maxDecode << "ms" << std::endl; std::cout << "qps:\t\t\t" << statistics.qps << "/s" << std::endl; std::cout << "qpsPerNpu:\t\t" << statistics.qpsPerNpu << "/s" << std::endl; std::cout << "generate speed\t\t" << statistics.qps * statistics.averageOutputLength << std::endl; std::cout << "current time:\t\t" << mindie_llm::GetCurTime() << std::endl; } } // namespace mindie_llm
- llm_engine_test.cpp
/* * Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. */ #include <algorithm> #include <iostream> #include <map> #include <memory> #include <thread> #include <wait.h> #include "common_util.h" #include "engine_util.h" #include "llm_infer_engine.h" #include "metric.h" using SC = std::chrono::steady_clock; using namespace mindie_llm; static IOManager g_manager; static Statistics g_statistics; static std::map<std::string, Metrics> g_metrics; static volatile int g_completeNum = false; static volatile uint32_t g_warmupCompleted = 0; static uint64_t g_warmupNum = 10; static std::mutex g_mutexWarmup; static std::mutex g_mutex; static std::mutex g_metricsMutex; static bool g_recordOutput = false; static int g_outputIdsReserveNum = 128; static constexpr int ARGC_NUM1 = 1; static constexpr int ARGC_NUM2 = 2; static constexpr int ARGC_NUM3 = 3; static constexpr int ARGC_NUM4 = 4; static constexpr int ARGC_NUM5 = 5; namespace mindie_llm { /** * 解析返回的EOS * @param response */ Status ParseEosAttr(std::shared_ptr<InferResponse> &response, int64_t &flag, int64_t &outputLen) { std::shared_ptr<InferTensor> tensor; if (response == nullptr) { return Status(Error::Code::ERROR, "reponse is null"); } auto status = response->GetOutput("IBIS_EOS_ATTR", tensor); if (!status.IsOk()) { return status; } auto *eosData = static_cast<int64_t *>(tensor->GetData()); flag = eosData[0]; outputLen = eosData[1]; return Status(Error::Code::OK, "Success"); } /** * 解析返回的token id * @param response */ Status ParseOutputId(std::shared_ptr<InferResponse> &response, std::vector<int64_t> &outputIds) { std::shared_ptr<InferTensor> tensor; if (response == nullptr) { return Status(Error::Code::ERROR, "reponse is null"); } auto status = response->GetOutput("OUTPUT_IDS", tensor); if (!status.IsOk()) { return status; } if (outputIds.empty()) { outputIds.reserve(g_outputIdsReserveNum); } // 获取输出长度 auto len = tensor->GetShape()[0]; auto *data = static_cast<int64_t *>(tensor->GetData()); for (int i = 0; i < len; ++i) { outputIds.push_back(data[i]); } return Status(Error::Code::OK, "Success"); } /** * 请求回调 * @param response */ void ResponseCallback(std::shared_ptr<InferResponse> &response) { if (response == nullptr) { std::cout << "reponse is null" << std::endl; return; } auto reqId = response->GetRequestId().StringValue(); size_t decodeTime; auto now = SC::now(); g_manager.SetOutputData(reqId); { std::unique_lock lock(g_metricsMutex); // 生成token数 int64_t flag = 1; int64_t outputLen = 1; auto ret = ParseEosAttr(response, flag, outputLen); if (!ret.IsOk()) { std::cout << "ReqId:" << reqId << ", Error:" << ret.StatusMsg() << std::endl; return; } g_metrics[reqId].tokensOutput += static_cast<size_t>(outputLen); if (g_metrics[reqId].firstTokenCost == 0) { // prefill 记录首token时间 decodeTime = mindie_llm::GetDuration(now, g_metrics[reqId].startingTime); g_metrics[reqId].firstTokenCost = decodeTime; } else { // decode 记录每次decode的时间 decodeTime = mindie_llm::GetDuration(now, g_metrics[reqId].lastTokenTime); // 针对投机场景适配,decode返回小于等于gamma个token,四舍五入 auto avgDecodeTime = (decodeTime + static_cast<size_t>(outputLen) / 2) / static_cast<size_t>(outputLen); for (int64_t i = 0; i < outputLen; ++i) { g_metrics[reqId].decodeTime.push_back(avgDecodeTime); } } g_metrics[reqId].lastTokenTime = now; // 生成token id if (g_recordOutput) { ret = ParseOutputId(response, g_metrics[reqId].outputTokenIds); if (!ret.IsOk()) { std::cout << "ReqId:" << reqId << ", Error:" << ret.StatusMsg() << std::endl; return; } } if (response->IsEnd()) { g_metrics[reqId].endingTime = now; // 最后一个Token耗时 g_metrics[reqId].lastTokenCost = decodeTime; } } if (response->IsEnd()) { std::unique_lock lock(g_mutex); g_completeNum++; std::cout << "ReqId:" << reqId << " Finished" << std::endl; } } void Forward(LlmInferEngine &engine, std::shared_ptr<InferRequest> request, const std::string reqId, uint64_t &invalidReqNum) { auto ret = engine.Forward(request); if (!ret.IsOk()) { { std::unique_lock lock(g_metricsMutex); invalidReqNum++; g_statistics.requestNumber--; g_metrics.erase(reqId); } } } void SendRequestInner(LlmInferEngine &engine, std::vector<std::shared_ptr<Data>> &data, uint64_t &invalidReqNum) { if (!data.empty()) { std::vector<std::shared_ptr<InferRequest>> requests = Data2Request(data); g_statistics.requestNumber += requests.size(); // total num // 4. forward(异步) for (size_t i = 0; i < requests.size(); ++i) { auto reqId = requests[i]->GetRequestId().StringValue(); { std::unique_lock lock(g_metricsMutex); g_metrics[reqId].startingTime = SC::now(); g_metrics[reqId].tokensInput = static_cast<size_t>(data[i]->size); } auto req = requests[i]; std::thread t(Forward, std::ref(engine), req, reqId, std::ref(invalidReqNum)); t.detach(); } } } void SendRequest(LlmInferEngine &engine, uint64_t maxBatchSize) { uint64_t processingNum = 0; auto status = engine.GetProcessingRequest(&processingNum); if (!status.IsOk()) { std::cout << status.StatusMsg() << std::endl; return; } std::cout << "the processing request num is " << processingNum << " at first." << std::endl; uint64_t slotNum = 0; uint64_t remainBlocks = 0; uint64_t remainPrefillSlots = 0; uint64_t remainPrefillTokens = 0; uint64_t invalidReqNum = 0; while (!g_manager.Empty()) { // 2. 获取可用的slot数目 status = engine.GetRequestBlockQuotas(&remainBlocks, &remainPrefillSlots, &remainPrefillTokens); if (!status.IsOk()) { std::cout << status.StatusMsg() << std::endl; break; } status = engine.GetProcessingRequest(&processingNum); if (!status.IsOk()) { std::cout << "failed to get processingNum:" << std::endl; std::cout << status.StatusMsg() << std::endl; break; } if (maxBatchSize > processingNum) { slotNum = maxBatchSize - processingNum; } else { slotNum = 0; } if (remainPrefillSlots > 0 && remainPrefillTokens > 0) { // 3. Set input std::vector<std::shared_ptr<Data>> data = g_manager.GetInputDataByQuotas(remainPrefillSlots, remainPrefillTokens, slotNum); SendRequestInner(engine, data, invalidReqNum); } std::this_thread::sleep_for(std::chrono::milliseconds(20L)); } status = engine.GetProcessingRequest(&processingNum); if (!status.IsOk()) { std::cout << status.StatusMsg() << std::endl; return; } std::cout << "the processing request num is " << processingNum << " when all requests dispatched." << std::endl; std::cout << "invalid request count is " << invalidReqNum << std::endl; } void Warmup(LlmInferEngine &engine, IOManager &manager, size_t warmupSize) { std::vector<std::shared_ptr<Data>> data = manager.GetWarmupInputs(warmupSize); std::vector<std::shared_ptr<InferRequest>> warmupRequests = Data2Request(data); uint64_t totalWarmupNum = warmupRequests.size(); std::cout << "Total warm up count : " << totalWarmupNum << std::endl; uint64_t invalidReqNum = 0; for (size_t i = 0; i < totalWarmupNum; ++i) { SendResponseCallback4Request responseCallback = [](std::shared_ptr<InferResponse> &response) { if (response->IsEnd()) { std::unique_lock<std::mutex> lock(g_mutexWarmup); g_warmupCompleted++; std::cout << "Warm up completed count: " << g_warmupCompleted << std::endl; } }; auto ret = engine.Forward(warmupRequests[i]); if (!ret.IsOk()) { invalidReqNum++; } } std::cout << "Invalid warmup request count : " << invalidReqNum << std::endl; while (g_warmupCompleted < totalWarmupNum - invalidReqNum) { std::this_thread::sleep_for(std::chrono::milliseconds(10L)); } } void RunEngine(std::string dataset, const std::string &configPath) { TtimeT start; TtimeT end; if (g_manager.SetInputData(dataset) != 0) { std::cout << "failed to load data" << std::endl; return; } // 初始化engine LlmInferEngine engine; auto ret = engine.Init(ResponseCallback, configPath); if (!ret.IsOk()) { std::cout << "engine init failed: " << ret.StatusMsg() << std::endl; return; } uint64_t maxBatchSize; ret = engine.GetMaxBatchSize(&maxBatchSize); if (!ret.IsOk()) { std::cout << "GetMaxBatchSize failed: " << ret.StatusMsg() << std::endl; return; } std::cout << "*** Warm up ***" << std::endl; Warmup(engine, g_manager, g_warmupNum); std::cout << "*** Warm up end***" << std::endl; start = SC::now(); SendRequest(engine, maxBatchSize); while (g_completeNum < static_cast<int>(g_statistics.requestNumber)) { std::this_thread::sleep_for(std::chrono::milliseconds(10L)); } end = SC::now(); // 5. 统计打点信息 g_statistics.modelFullName = ""; //mindie_llm::GetModelInfo(configPath, g_statistics.modelFullName, g_statistics.tp, g_statistics.serverCount); g_statistics.latencyForAll = mindie_llm::GetDuration(end, start); FormatMetrics(g_metrics, g_statistics); PrintStatistics(g_statistics); if (g_recordOutput) { std::map<std::string, std::vector<int64_t>> outputTokensId; for (auto iter = g_metrics.cbegin(); iter != g_metrics.cend(); ++iter) { outputTokensId[(*iter).first] = (*iter).second.outputTokenIds; } WriteOutputIds(outputTokensId, "token_output.csv", "./"); } // 6. 释放资源 auto res = engine.Finalize(); std::cout << "inferenceEngine finalize message is : " << res.StatusMsg() << std::endl; } void SignalInterruptHandler(int signal) { std::cout << "Received signal[" << signal << "]" << std::endl; std::cout << "Test program is exiting..." << std::endl; int status = 0; pid_t pid = 0; while ((pid = waitpid(0, &status, WNOHANG)) > 0) { std::cout << "Test program wait pid with " << pid << ", status " << status << std::endl; } killpg(getpgrp(), SIGKILL); } void SignalChldHandler(int signal) { std::cout << "received SIGCHLD signal[" << signal << "]" << std::endl; int status = 0; pid_t pid = 0; bool exitFlag = false; while ((pid = waitpid(0, &status, WNOHANG)) > 0) { std::cout << "Test program wait pid with " << pid << ", status " << status << std::endl; uint32_t statusCheck = static_cast<uint32_t>(status); if (WIFEXITED(statusCheck) == 0) { exitFlag = true; } } if (exitFlag) { std::cout << "Test program is exiting..." << std::endl; killpg(getpgrp(), SIGKILL); } } void RegisterSignal(void) { auto retSigint = signal(SIGINT, SignalInterruptHandler); if (retSigint == SIG_ERR) { std::cout << " signal error! " << std::endl; } auto retSigterm = signal(SIGTERM, SignalInterruptHandler); if (retSigterm == SIG_ERR) { std::cout << " signal error! " << std::endl; } auto retSigchld = signal(SIGCHLD, SignalChldHandler); if (retSigchld == SIG_ERR) { std::cout << " signal error! " << std::endl; } } } // namespace mindie_llm int main(int argc, char *argv[]) { setpgrp(); RegisterSignal(); // register signal // 数据集管理 std::string dataset = "token_input_gsm.csv"; std::string configPath = ""; try { dataset = argc > ARGC_NUM1 ? argv[ARGC_NUM1] : "token_input_gsm.csv"; std::string recordOutput = argc > ARGC_NUM2 ? argv[ARGC_NUM2] : "0"; std::string warmupNum = argc > ARGC_NUM3 ? argv[ARGC_NUM3] : "10"; configPath = argc > ARGC_NUM4 ? argv[ARGC_NUM4] : ""; g_recordOutput = std::stoi(recordOutput); g_warmupNum = std::stoull(warmupNum); if (argc > ARGC_NUM5) { std::cout << "unexpected arguments. please refer to the documentation. " << std::endl; return -1; } } catch (const std::exception &e) { std::cout << "[llm_engine_test] read argv fail: " << typeid(e).name() << ", " << e.what() << std::endl; return -1; } catch (...) { std::cout << "[llm_engine_test] get unknow error" << std::endl; return -1; } // 启动业务线程 std::thread businessThread(RunEngine, dataset, configPath); businessThread.join(); return 0; }
- CMakeLists.txt
# # Copyright (c) 2024 Huawei Technologies Co., Ltd. # AscendTransformerBoost is licensed under Mulan PSL v2. # You can use this software according to the terms and conditions of the Mulan PSL v2. # You may obtain a copy of Mulan PSL v2 at: # http://license.coscl.org.cn/MulanPSL2 # THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, # EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. # set(BOOST_DIR ${PROJECT_SOURCE_DIR}/lib) find_library(BOOST_CHRONO_LIBRARY libboost_chrono.so.1.82.0 PATHS ${BOOST_DIR} NO_DEFAULT_PATH) find_library(BOOST_THREAD_LIBRARY libboost_thread.so.1.82.0 PATHS ${BOOST_DIR} NO_DEFAULT_PATH) file(GLOB_RECURSE SOURCE_FILES "${CMAKE_CURRENT_LIST_DIR}/*.cpp") add_executable(mindie_llm_enginetest ${SOURCE_FILES}) target_include_directories(mindie_llm_enginetest PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ${PROJECT_SOURCE_DIR}/src/llm_manager/include ${PROJECT_SOURCE_DIR}/src/llm_manager/include/executor ${PROJECT_SOURCE_DIR}/src/utils/include ) file(GLOB GRPC_LIBS "${PROJECT_SOURCE_DIR}/lib/grpc/*") target_link_directories(mindie_llm_enginetest PRIVATE ${PROJECT_SOURCE_DIR}/lib ${PROJECT_SOURCE_DIR}/lib/grpc) target_link_libraries(mindie_llm_enginetest PRIVATE boundscheck ssl crypto ${BOOST_CHRONO_LIBRARY} ${BOOST_THREAD_LIBRARY} -Wl,--start-group -lpthread -Wl,--end-group ${GRPC_LIBS} mindie_llm_manager ) install(TARGETS mindie_llm_enginetest DESTINATION bin)