参考代码
vLLM 0.4.2版本昇腾框架适配代码目录结构如下所示:
├── cover │ ├── requirements-ascend.txt │ ├── setup.py │ └── vllm │ └── __init__.py ├── examples │ ├── offline_inference.py │ ├── offline_inference.sh │ └── start_server.sh ├── install.sh ├── README.md └── vllm_npu ├── README.md ├── requirements.txt ├── setup.py └── vllm_npu ├── attention │ ├── backends.py │ ├── __init__.py │ └── selector.py ├── config.py ├── core │ └── __init__.py ├── engine │ ├── arg_utils.py │ ├── ascend_engine.py │ ├── async_ascend_engine.py │ └── __init__.py ├── executor │ ├── ascend_executor.py │ ├── ascend_ray_executor.py │ ├── __init__.py │ └── ray_utils.py ├── __init__.py ├── model_executor │ ├── ascend_model_loader.py │ ├── __init__.py │ ├── layers │ │ ├── ascend_sampler.py │ │ └── __init__.py │ └── models │ ├── ascend │ │ ├── __init__.py │ │ └── mindie_llm_wrapper.py │ └── __init__.py ├── npu_adaptor.py ├── usage │ ├── __init__.py │ └── usage_lib.py ├── utils.py └── worker ├── ascend_model_runner.py ├── ascend_worker.py ├── cache_engine.py └── __init__.py
其中主要包括如下四个部分:
- cover文件夹下包含了对vllm框架源码的修改内容。
- examples文件夹下包含了离线模式和在线模式的使用实例代码。
- vllm_npu文件夹下包含了补丁仓的源码内容。
- install.sh为一键式安装脚本,在将所有的代码文件都还原后,即可运行该脚本一键安装昇腾适配版的vllm框架,其中会自动拉取源码安装vllm原生框架并打上适配补丁。
代码仓中各个文件的代码内容:
- cover/requirements-ascend.txt:昇腾环境上运行所需的python依赖。
cmake >= 3.21 ninja # For faster builds. psutil sentencepiece # Required for LLaMA tokenizer. numpy requests py-cpuinfo transformers >= 4.40.0 # Required for StarCoder2 & Llava, Llama 3. tokenizers >= 0.19.1 # Required for Llama 3. fastapi openai uvicorn[standard] pydantic >= 2.0 # Required for OpenAI server. prometheus_client >= 0.18.0 prometheus-fastapi-instrumentator >= 7.0.0 tiktoken == 0.6.0 # Required for DBRX tokenizer lm-format-enforcer == 0.10.1 typing_extensions filelock >= 3.10.4 # filelock starts to support `mode` argument from 3.10.4 ray == 2.9.3 pynvml == 11.5.0 outlines == 0.0.34
- cover/setup.py:适配vllm的安装,在其中添加NPU后端,并更新相关的版本名称、依赖模块等内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
import importlib.util import io import logging import os import re import subprocess import sys from shutil import which from typing import Dict, List import torch from packaging.version import Version, parse from setuptools import Extension, find_packages, setup from setuptools.command.build_ext import build_ext from torch.utils.cpp_extension import CUDA_HOME, BuildExtension def load_module_from_path(module_name, path): spec = importlib.util.spec_from_file_location(module_name, path) module = importlib.util.module_from_spec(spec) sys.modules[module_name] = module spec.loader.exec_module(module) return module ROOT_DIR = os.path.dirname(__file__) logger = logging.getLogger(__name__) # cannot import envs directly because it depends on vllm, # which is not installed yet envs = load_module_from_path('envs', os.path.join(ROOT_DIR, 'vllm', 'envs.py')) VLLM_TARGET_DEVICE = 'npu' # vLLM only supports Linux platform assert sys.platform.startswith( "linux"), "vLLM only supports Linux platform (including WSL)." MAIN_CUDA_VERSION = "12.1" def is_sccache_available() -> bool: return which("sccache") is not None def is_ccache_available() -> bool: return which("ccache") is not None def is_ninja_available() -> bool: return which("ninja") is not None def remove_prefix(text, prefix): if text.startswith(prefix): return text[len(prefix):] return text class CMakeExtension(Extension): def __init__(self, name: str, cmake_lists_dir: str = '.', **kwa) -> None: super().__init__(name, sources=[], **kwa) self.cmake_lists_dir = os.path.abspath(cmake_lists_dir) def _is_cuda() -> bool: return VLLM_TARGET_DEVICE == "cuda" \ and torch.version.cuda is not None \ and not _is_neuron() def _is_hip() -> bool: return (VLLM_TARGET_DEVICE == "cuda" or VLLM_TARGET_DEVICE == "rocm") and torch.version.hip is not None def _is_neuron() -> bool: torch_neuronx_installed = True try: subprocess.run(["neuron-ls"], capture_output=True, check=True) except (FileNotFoundError, PermissionError, subprocess.CalledProcessError): torch_neuronx_installed = False return torch_neuronx_installed or envs.VLLM_BUILD_WITH_NEURON def _is_cpu() -> bool: return VLLM_TARGET_DEVICE == "cpu" def _is_npu() -> bool: return VLLM_TARGET_DEVICE == "npu" def _install_punica() -> bool: return envs.VLLM_INSTALL_PUNICA_KERNELS def get_hipcc_rocm_version(): # Run the hipcc --version command result = subprocess.run(['hipcc', '--version'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) # Check if the command was executed successfully if result.returncode != 0: print("Error running 'hipcc --version'") return None # Extract the version using a regular expression match = re.search(r'HIP version: (\S+)', result.stdout) if match: # Return the version string return match.group(1) else: print("Could not find HIP version in the output") return None def get_neuronxcc_version(): import sysconfig site_dir = sysconfig.get_paths()["purelib"] version_file = os.path.join(site_dir, "neuronxcc", "version", "__init__.py") # Check if the command was executed successfully with open(version_file, "rt") as fp: content = fp.read() # Extract the version using a regular expression match = re.search(r"__version__ = '(\S+)'", content) if match: # Return the version string return match.group(1) else: raise RuntimeError("Could not find HIP version in the output") def get_nvcc_cuda_version() -> Version: """Get the CUDA version from nvcc. Adapted from https://github.com/NVIDIA/apex/blob/8b7a1ff183741dd8f9b87e7bafd04cfde99cea28/setup.py """ assert CUDA_HOME is not None, "CUDA_HOME is not set" nvcc_output = subprocess.check_output([CUDA_HOME + "/bin/nvcc", "-V"], universal_newlines=True) output = nvcc_output.split() release_idx = output.index("release") + 1 nvcc_cuda_version = parse(output[release_idx].split(",")[0]) return nvcc_cuda_version def get_path(*filepath) -> str: return os.path.join(ROOT_DIR, *filepath) def find_version(filepath: str) -> str: """Extract version information from the given filepath. Adapted from https://github.com/ray-project/ray/blob/0b190ee1160eeca9796bc091e07eaebf4c85b511/python/setup.py """ with open(filepath) as fp: version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", fp.read(), re.M) if version_match: return version_match.group(1) raise RuntimeError("Unable to find version string.") def get_vllm_version() -> str: version = find_version(get_path("vllm", "__init__.py")) # return version if _is_cuda(): cuda_version = str(get_nvcc_cuda_version()) if cuda_version != MAIN_CUDA_VERSION: cuda_version_str = cuda_version.replace(".", "")[:3] version += f"+cu{cuda_version_str}" elif _is_hip(): # Get the HIP version hipcc_version = get_hipcc_rocm_version() if hipcc_version != MAIN_CUDA_VERSION: rocm_version_str = hipcc_version.replace(".", "")[:3] version += f"+rocm{rocm_version_str}" elif _is_neuron(): # Get the Neuron version neuron_version = str(get_neuronxcc_version()) if neuron_version != MAIN_CUDA_VERSION: neuron_version_str = neuron_version.replace(".", "")[:3] version += f"+neuron{neuron_version_str}" elif _is_npu(): version += "+npu" elif _is_cpu(): version += "+cpu" else: raise RuntimeError("Unknown runtime environment") return version def read_readme() -> str: """Read the README file if present.""" p = get_path("README.md") if os.path.isfile(p): return io.open(get_path("README.md"), "r", encoding="utf-8").read() else: return "" def get_requirements() -> List[str]: """Get Python package dependencies from requirements.txt.""" def _read_requirements(filename: str) -> List[str]: with open(get_path(filename)) as f: requirements = f.read().strip().split("\n") resolved_requirements = [] for line in requirements: if line.startswith("-r "): resolved_requirements += _read_requirements(line.split()[1]) else: resolved_requirements.append(line) return resolved_requirements if _is_cuda(): requirements = _read_requirements("requirements-cuda.txt") cuda_major, cuda_minor = torch.version.cuda.split(".") modified_requirements = [] for req in requirements: if "vllm-nccl-cu12" in req: req = req.replace("vllm-nccl-cu12", f"vllm-nccl-cu{cuda_major}") elif ("vllm-flash-attn" in req and not (cuda_major == "12" and cuda_minor == "1")): # vllm-flash-attn is built only for CUDA 12.1. # Skip for other versions. continue modified_requirements.append(req) requirements = modified_requirements elif _is_hip(): requirements = _read_requirements("requirements-rocm.txt") elif _is_neuron(): requirements = _read_requirements("requirements-neuron.txt") elif _is_npu(): requirements = _read_requirements("requirements-ascend.txt") elif _is_cpu(): requirements = _read_requirements("requirements-cpu.txt") else: raise ValueError( "Unsupported platform, please use CUDA, ROCm, Neuron, NPU or CPU.") return requirements ext_modules = [] if _is_cuda(): ext_modules.append(CMakeExtension(name="vllm._moe_C")) """ if not _is_neuron(): ext_modules.append(CMakeExtension(name="vllm._C")) if _install_punica(): ext_modules.append(CMakeExtension(name="vllm._punica_C")) """ package_data = { "vllm": ["py.typed", "model_executor/layers/fused_moe/configs/*.json"] } if envs.VLLM_USE_PRECOMPILED: ext_modules = [] package_data["vllm"].append("*.so") setup( name="vllm", version=get_vllm_version(), author="vLLM Team", license="Apache 2.0", description=("A high-throughput and memory-efficient inference and " "serving engine for LLMs"), long_description=read_readme(), long_description_content_type="text/markdown", url="https://github.com/vllm-project/vllm", project_urls={ "Homepage": "https://github.com/vllm-project/vllm", "Documentation": "https://vllm.readthedocs.io/en/latest/", }, classifiers=[ "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "License :: OSI Approved :: Apache Software License", "Topic :: Scientific/Engineering :: Artificial Intelligence", ], packages=find_packages(exclude=("benchmarks", "csrc", "docs", "examples", "tests*")), python_requires=">=3.8", install_requires=get_requirements(), ext_modules=ext_modules, extras_require={ "tensorizer": ["tensorizer==2.9.0"], }, cmdclass={"build_ext": BuildExtension} if not _is_neuron() else {}, package_data=package_data, )
- cover/vllm/__init__.py:在vllm的初始化文件里导入vllm_npu,以实现适配功能的接入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
"""vLLM: a high-throughput and memory-efficient inference engine for LLMs""" import vllm_npu from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs from vllm.engine.async_llm_engine import AsyncLLMEngine from vllm.engine.llm_engine import LLMEngine from vllm.entrypoints.llm import LLM from vllm.executor.ray_utils import initialize_ray_cluster from vllm.model_executor.models import ModelRegistry from vllm.outputs import CompletionOutput, RequestOutput from vllm.sampling_params import SamplingParams __version__ = "0.4.2" __all__ = [ "LLM", "ModelRegistry", "SamplingParams", "RequestOutput", "CompletionOutput", "LLMEngine", "EngineArgs", "AsyncLLMEngine", "AsyncEngineArgs", "initialize_ray_cluster", ]
- vllm_npu/README.md:空白文件
- vllm_npu/requirements.txt:vllm_npu模块所需要的python依赖库。
numpy decorator attrs psutil absl-py cloudpickle scipy tornado transformers accelerate pandas
- vllm_npu/setup.py:vllm_npu模块的安装脚本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
import io import os import re from typing import List import setuptools ROOT_DIR = os.path.dirname(__file__) def get_path(*filepath) -> str: return os.path.join(ROOT_DIR, *filepath) def find_version(filepath: str): """Extract version information from the given filepath. """ with open(filepath) as fp: version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]", fp.read(), re.M) if version_match: return version_match.group(1) raise RuntimeError("Unable to find version string.") def read_readme() -> str: """Read the README file.""" return io.open(get_path("README.md"), "r", encoding="utf-8").read() def get_requirements() -> List[str]: """Get Python package dependencies from requirements.txt.""" with open(get_path("requirements.txt")) as f: requirements = f.read().strip().split("\n") return requirements setuptools.setup( name="vllm_npu", version=find_version(get_path("vllm_npu", "__init__.py")), author="Huawei", license="Apache 2.0", description=("A high-throughput and memory-efficient inference and " "serving engine for LLMs"), long_description=read_readme(), long_description_content_type="text/markdown", url="", project_urls={ "Homepage": "", "Documentation": "", }, classifiers=[ "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Topic :: Scientific/Engineering :: Artificial Intelligence", ], packages=setuptools.find_packages(exclude=("benchmarks", "examples", "tests")), python_requires=">=3.8", install_requires=get_requirements(), )
- vllm_npu/vllm_npu/attention/__init__.py:在attention模块的初始化过程中,对vllm里的get_attn_backend函数进行热替换。
1 2 3 4 5 6 7
from vllm_npu.attention.selector import get_attn_backend import vllm.attention.selector as selector import vllm.worker.model_runner as mr import vllm.worker.cache_engine as ce selector.get_attn_backend = get_attn_backend mr.get_attn_backend = get_attn_backend ce.get_attn_backend = get_attn_backend
- vllm_npu/vllm_npu/attention/backends.py:昇腾环境下使用的attention后端实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
# Part of codes in this file was copied from project [vLLM Team][vllm] from dataclasses import dataclass from typing import List, Optional, Tuple from vllm.attention.backends.abstract import (AttentionBackend, AttentionMetadataPerStage) import torch def get_kv_cache_shape( num_blocks: int, block_size: int, num_kv_heads: int, head_size: int, ) -> Tuple[int, ...]: return (2, num_blocks, block_size, num_kv_heads, head_size) class AscendAttentionBackend(AttentionBackend): @staticmethod def get_name() -> str: return "ascend-attn-backend" @staticmethod def get_impl_cls(): return None @staticmethod def make_metadata(*args, **kwargs) -> "AttentionMetadata": return AttentionMetadata(*args, **kwargs) @staticmethod def get_kv_cache_shape( num_blocks: int, block_size: int, num_kv_heads: int, head_size: int, ) -> Tuple[int, ...]: return get_kv_cache_shape(num_blocks, block_size, num_kv_heads, head_size) @staticmethod def swap_blocks( src_kv_cache: torch.Tensor, dst_kv_cache: torch.Tensor, src_to_dst: torch.Tensor, ) -> None: pass @staticmethod def copy_blocks( kv_caches: List[torch.Tensor], src_to_dists: torch.Tensor, ) -> None: pass @dataclass class AttentionMetadata(AttentionMetadataPerStage): """Metadata for AscendAttentionBackend. """ # Currently, input sequences can only contain all prompts # or all decoding. True if all sequences are prompts. is_prompt: bool # (batch_size,). The sequence length per sequence. Sequence length means # the computed tokens + new tokens None if it is a decoding. seq_lens: Optional[List[int]] # seq_lens stored as a tensor. seq_lens_tensor: Optional[torch.Tensor] # Maximum query length in the batch. max_query_len: Optional[int] # Maximum sequence length in the batch. max_seq_len: Optional[int] # (batch_size + 1,). The cumulative subquery lengths of the sequences in # the batch, used to index into subquery. E.g., if the subquery length # is [4, 6], it is [0, 4, 10]. subquery_start_loc: Optional[torch.Tensor] # (batch_size + 1,). The cumulative sequence lengths of the sequences in # the batch, used to index into sequence. E.g., if the sequence length is # [4, 6], it is [0, 4, 10]. seq_start_loc: Optional[torch.Tensor] # (batch_size,) A tensor of context lengths (tokens that are computed # so far). context_lens_tensor: Optional[torch.Tensor] block_tables: Optional[torch.Tensor] # Whether or not if cuda graph is enabled. use_cuda_graph: bool
- vllm_npu/vllm_npu/attention/selector.py:定义返回昇腾后端的get_attn_backend函数,用于替换vllm原生的对应函数。
1 2 3 4 5 6 7 8 9 10 11 12
# Part of codes in this file was copied from project [vLLM Team][vllm] from functools import lru_cache from typing import Type import torch from vllm.attention.backends.abstract import AttentionBackend from vllm.logger import init_logger logger = init_logger(__name__) @lru_cache(maxsize=None) def get_attn_backend(dtype: torch.dtype) -> Type[AttentionBackend]: logger.info("Using Ascend backend.") from vllm_npu.attention.backends import AscendAttentionBackend return AscendAttentionBackend
- vllm_npu/vllm_npu/core/__init__.py:空白文件。
- vllm_npu/vllm_npu/engine/__init__.py:在engine模块初始化过程中替换实例化LLMEngine和AsyncLLMEngine类的from_engine_args函数,并替换vllm的arg_utils模块中的EngineArgs类。
1 2 3 4 5 6 7 8 9
from vllm.engine.llm_engine import LLMEngine from vllm.engine.async_llm_engine import AsyncLLMEngine import vllm.engine.arg_utils as v_arg_utils from vllm_npu.engine.ascend_engine import from_engine_args from vllm_npu.engine.async_ascend_engine import from_engine_args_async from .arg_utils import EngineArgs LLMEngine.from_engine_args = from_engine_args AsyncLLMEngine.from_engine_args = from_engine_args_async v_arg_utils.EngineArgs = EngineArgs
- vllm_npu/vllm_npu/engine/ascend_engine.py:重写实例化LLMEngine类的from_engine_args函数,以实现昇腾环境下Executor类的载入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
from vllm.engine.arg_utils import EngineArgs from vllm.usage.usage_lib import UsageContext from vllm_npu.executor.ray_utils import initialize_ray_cluster @classmethod def from_engine_args( cls, engine_args: EngineArgs, usage_context: UsageContext = UsageContext.ENGINE_CONTEXT, ) -> "LLMEngine": """Creates an LLM engine from the engine arguments.""" # Create the engine configs. engine_config = engine_args.create_engine_config() # Initialize the cluster and specify the executor class. if engine_config.device_config.device_type == "neuron": from vllm.executor.neuron_executor import NeuronExecutor executor_class = NeuronExecutor elif engine_config.device_config.device_type == "cpu": from vllm.executor.cpu_executor import CPUExecutor executor_class = CPUExecutor elif engine_config.device_config.device_type == "npu": if engine_config.parallel_config.worker_use_ray: initialize_ray_cluster(engine_config.parallel_config) from vllm_npu.executor.ascend_ray_executor import RayAscendExecutor executor_class = RayAscendExecutor else: from vllm_npu.executor.ascend_executor import AscendExecutor executor_class = AscendExecutor elif engine_config.parallel_config.worker_use_ray: initialize_ray_cluster(engine_config.parallel_config) from vllm.executor.ray_gpu_executor import RayGPUExecutor executor_class = RayGPUExecutor else: if engine_config.parallel_config.world_size != 1: raise ValueError("Ray is required if parallel_config.world_size > 1.") from vllm.executor.gpu_executor import GPUExecutor executor_class = GPUExecutor # Create the LLM engine. engine = cls( **engine_config.to_dict(), executor_class=executor_class, log_stats=not engine_args.disable_log_stats, usage_context=usage_context, ) return engine
- vllm_npu/vllm_npu/engine/async_ascend_engine.py:重写实例化AsyncLLMEngine类的from_engine_args函数,以实现昇腾环境下Executor类的载入。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
from vllm.engine.arg_utils import AsyncEngineArgs from vllm.usage.usage_lib import UsageContext from vllm_npu.executor.ray_utils import initialize_ray_cluster @classmethod def from_engine_args_async( cls, engine_args: AsyncEngineArgs, start_engine_loop: bool = True, usage_context: UsageContext = UsageContext.ENGINE_CONTEXT, ) -> "AsyncLLMEngine": """Creates an async LLM engine from the engine arguments.""" # Create the engine configs. engine_config = engine_args.create_engine_config() if engine_config.device_config.device_type == "neuron": from vllm.executor.neuron_executor import NeuronExecutorAsync executor_class = NeuronExecutorAsync elif engine_config.device_config.device_type == "cpu": if engine_config.parallel_config.worker_use_ray: raise RuntimeError("Ray is not supported with the CPU backend.") from vllm.executor.cpu_executor import CPUExecutorAsync executor_class = CPUExecutorAsync elif engine_config.device_config.device_type == "npu": if engine_config.parallel_config.worker_use_ray: initialize_ray_cluster(engine_config.parallel_config) from vllm_npu.executor.ascend_ray_executor import RayAscendExecutorAsync executor_class = RayAscendExecutorAsync else: from vllm_npu.executor.ascend_executor import AscendExecutorAsync executor_class = AscendExecutorAsync elif engine_config.parallel_config.worker_use_ray: initialize_ray_cluster(engine_config.parallel_config) from vllm.executor.ray_gpu_executor import RayGPUExecutorAsync executor_class = RayGPUExecutorAsync else: if engine_config.parallel_config.world_size != 1: raise RuntimeError("Ray is required if parallel_config.world_size > 1.") from vllm.executor.gpu_executor import GPUExecutorAsync executor_class = GPUExecutorAsync # Create the async LLM engine. engine = cls( engine_config.parallel_config.worker_use_ray, engine_args.engine_use_ray, **engine_config.to_dict(), executor_class=executor_class, log_requests=not engine_args.disable_log_requests, log_stats=not engine_args.disable_log_stats, max_log_len=engine_args.max_log_len, start_engine_loop=start_engine_loop, usage_context=usage_context, ) return engine
- vllm_npu/vllm_npu/engine/arg_utils.py:重写vllm的arg_utils模块中的EngineArgs类,为block_size参数添加128的可选项。
import argparse import dataclasses from dataclasses import dataclass from typing import List, Optional, Union from vllm.config import ( CacheConfig, DecodingConfig, EngineConfig, LoadConfig, LoRAConfig, ModelConfig, ParallelConfig, SchedulerConfig, SpeculativeConfig, TokenizerPoolConfig, VisionLanguageConfig, ) from vllm.model_executor.layers.quantization import QUANTIZATION_METHODS from vllm.utils import str_to_int_tuple from vllm_npu.config import DeviceConfig DEFAULT_TYPE = "auto" def nullable_str(val: str): if not val or val == "None": return None return val @dataclass class EngineArgs: """Arguments for vLLM engine.""" model: str served_model_name: Optional[Union[List[str]]] = None tokenizer: Optional[str] = None skip_tokenizer_init: bool = False tokenizer_mode: str = DEFAULT_TYPE trust_remote_code: bool = False download_dir: Optional[str] = None load_format: str = DEFAULT_TYPE dtype: str = DEFAULT_TYPE kv_cache_dtype: str = DEFAULT_TYPE quantization_param_path: Optional[str] = None seed: int = 0 max_model_len: Optional[int] = None worker_use_ray: bool = False pipeline_parallel_size: int = 1 tensor_parallel_size: int = 1 max_parallel_loading_workers: Optional[int] = None block_size: int = 128 enable_prefix_caching: bool = False use_v2_block_manager: bool = False swap_space: int = 4 # GiB gpu_memory_utilization: float = 0.90 max_num_batched_tokens: Optional[int] = None max_num_seqs: int = 256 max_logprobs: int = 5 # OpenAI default value disable_log_stats: bool = False revision: Optional[str] = None code_revision: Optional[str] = None tokenizer_revision: Optional[str] = None quantization: Optional[str] = None enforce_eager: bool = True max_context_len_to_capture: Optional[int] = None max_seq_len_to_capture: int = 8192 disable_custom_all_reduce: bool = False tokenizer_pool_size: int = 0 tokenizer_pool_type: str = "ray" tokenizer_pool_extra_config: Optional[dict] = None enable_lora: bool = False max_loras: int = 1 max_lora_rank: int = 16 fully_sharded_loras: bool = False lora_extra_vocab_size: int = 256 lora_dtype = DEFAULT_TYPE max_cpu_loras: Optional[int] = None device: str = DEFAULT_TYPE ray_workers_use_nsight: bool = False num_gpu_blocks_override: Optional[int] = None num_lookahead_slots: int = 0 model_loader_extra_config: Optional[dict] = None # Related to Vision-language models such as llava image_input_type: Optional[str] = None image_token_id: Optional[int] = None image_input_shape: Optional[str] = None image_feature_size: Optional[int] = None scheduler_delay_factor: float = 0.0 enable_chunked_prefill: bool = False guided_decoding_backend: str = "outlines" # Speculative decoding configuration. speculative_model: Optional[str] = None num_speculative_tokens: Optional[int] = None speculative_max_model_len: Optional[int] = None ngram_prompt_lookup_max: Optional[int] = None ngram_prompt_lookup_min: Optional[int] = None def __post_init__(self): if self.tokenizer is None: self.tokenizer = self.model @staticmethod def add_cli_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: """Shared CLI arguments for vLLM engine.""" # Model arguments parser.add_argument( "--model", type=str, default="facebook/opt-125m", help="Name or path of the huggingface model to use.", ) parser.add_argument( "--tokenizer", type=nullable_str, default=EngineArgs.tokenizer, help="Name or path of the huggingface tokenizer to use.", ) parser.add_argument( "--skip-tokenizer-init", action="store_true", help="Skip initialization of tokenizer and detokenizer", ) parser.add_argument( "--revision", type=nullable_str, default=None, help="The specific model version to use. It can be a branch " "name, a tag name, or a commit id. If unspecified, will use " "the default version.", ) parser.add_argument( "--code-revision", type=nullable_str, default=None, help="The specific revision to use for the model code on " "Hugging Face Hub. It can be a branch name, a tag name, or a " "commit id. If unspecified, will use the default version.", ) parser.add_argument( "--tokenizer-revision", type=nullable_str, default=None, help="The specific tokenizer version to use. It can be a branch " "name, a tag name, or a commit id. If unspecified, will use " "the default version.", ) parser.add_argument( "--tokenizer-mode", type=str, default=EngineArgs.tokenizer_mode, choices=[DEFAULT_TYPE, "slow"], help='The tokenizer mode.\n\n* "auto" will use the ' 'fast tokenizer if available.\n* "slow" will ' "always use the slow tokenizer.", ) parser.add_argument( "--trust-remote-code", action="store_true", help="Trust remote code from huggingface.", ) parser.add_argument( "--download-dir", type=nullable_str, default=EngineArgs.download_dir, help="Directory to download and load the weights, " "default to the default cache dir of " "huggingface.", ) parser.add_argument( "--load-format", type=str, default=EngineArgs.load_format, choices=[ DEFAULT_TYPE, "pt", "safetensors", "npcache", "dummy", "tensorizer", ], help="The format of the model weights to load.\n\n" '* "auto" will try to load the weights in the safetensors format ' "and fall back to the pytorch bin format if safetensors format " "is not available.\n" '* "pt" will load the weights in the pytorch bin format.\n' '* "safetensors" will load the weights in the safetensors format.\n' '* "npcache" will load the weights in pytorch format and store ' "a numpy cache to speed up the loading.\n" '* "dummy" will initialize the weights with random values, ' "which is mainly for profiling.\n" '* "tensorizer" will load the weights using tensorizer from ' "CoreWeave which assumes tensorizer_uri is set to the location of " "the serialized weights.", ) parser.add_argument( "--dtype", type=str, default=EngineArgs.dtype, choices=[DEFAULT_TYPE, "half", "float16", "bfloat16", "float", "float32"], help="Data type for model weights and activations.\n\n" '* "auto" will use FP16 precision for FP32 and FP16 models, and ' "BF16 precision for BF16 models.\n" '* "half" for FP16. Recommended for AWQ quantization.\n' '* "float16" is the same as "half".\n' '* "bfloat16" for a balance between precision and range.\n' '* "float" is shorthand for FP32 precision.\n' '* "float32" for FP32 precision.', ) parser.add_argument( "--kv-cache-dtype", type=str, choices=[DEFAULT_TYPE, "fp8"], default=EngineArgs.kv_cache_dtype, help='Data type for kv cache storage. If "auto", will use model ' "data type. FP8_E5M2 (without scaling) is only supported on cuda " "version greater than 11.8. On ROCm (AMD GPU), FP8_E4M3 is instead " "supported for common inference criteria.", ) parser.add_argument( "--quantization-param-path", type=nullable_str, default=None, help="Path to the JSON file containing the KV cache " "scaling factors. This should generally be supplied, when " "KV cache dtype is FP8. Otherwise, KV cache scaling factors " "default to 1.0, which may cause accuracy issues. " "FP8_E5M2 (without scaling) is only supported on cuda version" "greater than 11.8. On ROCm (AMD GPU), FP8_E4M3 is instead " "supported for common inference criteria.", ) parser.add_argument( "--max-model-len", type=int, default=EngineArgs.max_model_len, help="Model context length. If unspecified, will " "be automatically derived from the model config.", ) parser.add_argument( "--guided-decoding-backend", type=str, default="outlines", choices=["outlines", "lm-format-enforcer"], help="Which engine will be used for guided decoding" " (JSON schema / regex etc) by default. Currently support " "https://github.com/outlines-dev/outlines and " "https://github.com/noamgat/lm-format-enforcer." " Can be overridden per request via guided_decoding_backend" " parameter.", ) # Parallel arguments parser.add_argument( "--worker-use-ray", action="store_true", help="Use Ray for distributed serving, will be " "automatically set when using more than 1 GPU.", ) parser.add_argument( "--pipeline-parallel-size", "-pp", type=int, default=EngineArgs.pipeline_parallel_size, help="Number of pipeline stages.", ) parser.add_argument( "--tensor-parallel-size", "-tp", type=int, default=EngineArgs.tensor_parallel_size, help="Number of tensor parallel replicas.", ) parser.add_argument( "--max-parallel-loading-workers", type=int, default=EngineArgs.max_parallel_loading_workers, help="Load model sequentially in multiple batches, " "to avoid RAM OOM when using tensor " "parallel and large models.", ) parser.add_argument( "--ray-workers-use-nsight", action="store_true", help="If specified, use nsight to profile Ray workers.", ) # KV cache arguments parser.add_argument( "--block-size", type=int, default=EngineArgs.block_size, choices=[8, 16, 32, 128], help="Token block size for contiguous chunks of " "tokens.", ) parser.add_argument( "--enable-prefix-caching", action="store_true", help="Enables automatic prefix caching.", ) parser.add_argument( "--use-v2-block-manager", action="store_true", help="Use BlockSpaceMangerV2.", ) parser.add_argument( "--num-lookahead-slots", type=int, default=EngineArgs.num_lookahead_slots, help="Experimental scheduling config necessary for " "speculative decoding. This will be replaced by " "speculative config in the future; it is present " "to enable correctness tests until then.", ) parser.add_argument( "--seed", type=int, default=EngineArgs.seed, help="Random seed for operations.", ) parser.add_argument( "--swap-space", type=int, default=EngineArgs.swap_space, help="CPU swap space size (GiB) per GPU.", ) parser.add_argument( "--gpu-memory-utilization", type=float, default=EngineArgs.gpu_memory_utilization, help="The fraction of GPU memory to be used for the model " "executor, which can range from 0 to 1. For example, a value of " "0.5 would imply 50%% GPU memory utilization. If unspecified, " "will use the default value of 0.9.", ) parser.add_argument( "--num-gpu-blocks-override", type=int, default=None, help="If specified, ignore GPU profiling result and use this number" "of GPU blocks. Used for testing preemption.", ) parser.add_argument( "--max-num-batched-tokens", type=int, default=EngineArgs.max_num_batched_tokens, help="Maximum number of batched tokens per " "iteration.", ) parser.add_argument( "--max-num-seqs", type=int, default=EngineArgs.max_num_seqs, help="Maximum number of sequences per iteration.", ) parser.add_argument( "--max-logprobs", type=int, default=EngineArgs.max_logprobs, help=( "Max number of log probs to return logprobs is specified in" " SamplingParams." ), ) parser.add_argument( "--disable-log-stats", action="store_true", help="Disable logging statistics.", ) # Quantization settings. parser.add_argument( "--quantization", "-q", type=nullable_str, choices=[*QUANTIZATION_METHODS, None], default=EngineArgs.quantization, help="Method used to quantize the weights. If " "None, we first check the `quantization_config` " "attribute in the model config file. If that is " "None, we assume the model weights are not " "quantized and use `dtype` to determine the data " "type of the weights.", ) parser.add_argument( "--enforce-eager", action="store_true", help="Always use eager-mode PyTorch. If False, " "will use eager mode and CUDA graph in hybrid " "for maximal performance and flexibility.", ) parser.add_argument( "--max-context-len-to-capture", type=int, default=EngineArgs.max_context_len_to_capture, help="Maximum context length covered by CUDA " "graphs. When a sequence has context length " "larger than this, we fall back to eager mode. " "(DEPRECATED. Use --max-seq_len-to-capture instead" ")", ) parser.add_argument( "--max-seq_len-to-capture", type=int, default=EngineArgs.max_seq_len_to_capture, help="Maximum sequence length covered by CUDA " "graphs. When a sequence has context length " "larger than this, we fall back to eager mode.", ) parser.add_argument( "--disable-custom-all-reduce", action="store_true", default=EngineArgs.disable_custom_all_reduce, help="See ParallelConfig.", ) parser.add_argument( "--tokenizer-pool-size", type=int, default=EngineArgs.tokenizer_pool_size, help="Size of tokenizer pool to use for " "asynchronous tokenization. If 0, will " "use synchronous tokenization.", ) parser.add_argument( "--tokenizer-pool-type", type=str, default=EngineArgs.tokenizer_pool_type, help="Type of tokenizer pool to use for " "asynchronous tokenization. Ignored " "if tokenizer_pool_size is 0.", ) parser.add_argument( "--tokenizer-pool-extra-config", type=nullable_str, default=EngineArgs.tokenizer_pool_extra_config, help="Extra config for tokenizer pool. " "This should be a JSON string that will be " "parsed into a dictionary. Ignored if " "tokenizer_pool_size is 0.", ) # LoRA related configs parser.add_argument( "--enable-lora", action="store_true", help="If True, enable handling of LoRA adapters.", ) parser.add_argument( "--max-loras", type=int, default=EngineArgs.max_loras, help="Max number of LoRAs in a single batch.", ) parser.add_argument( "--max-lora-rank", type=int, default=EngineArgs.max_lora_rank, help="Max LoRA rank.", ) parser.add_argument( "--lora-extra-vocab-size", type=int, default=EngineArgs.lora_extra_vocab_size, help=( "Maximum size of extra vocabulary that can be " "present in a LoRA adapter (added to the base " "model vocabulary)." ), ) parser.add_argument( "--lora-dtype", type=str, default=EngineArgs.lora_dtype, choices=[DEFAULT_TYPE, "float16", "bfloat16", "float32"], help=("Data type for LoRA. If auto, will default to " "base model dtype."), ) parser.add_argument( "--max-cpu-loras", type=int, default=EngineArgs.max_cpu_loras, help=( "Maximum number of LoRAs to store in CPU memory. " "Must be >= than max_num_seqs. " "Defaults to max_num_seqs." ), ) parser.add_argument( "--fully-sharded-loras", action="store_true", help=( "By default, only half of the LoRA computation is " "sharded with tensor parallelism. " "Enabling this will use the fully sharded layers. " "At high sequence length, max rank or " "tensor parallel size, this is likely faster." ), ) parser.add_argument( "--device", type=str, default=EngineArgs.device, choices=["auto", "cuda", "neuron", "cpu", "npu"], help="Device type for vLLM execution.", ) # Related to Vision-language models such as llava parser.add_argument( "--image-input-type", type=nullable_str, default=None, choices=[t.name.lower() for t in VisionLanguageConfig.ImageInputType], help=( "The image input type passed into vLLM. " 'Should be one of "pixel_values" or "image_features".' ), ) parser.add_argument( "--image-token-id", type=int, default=None, help=("Input id for image token."), ) parser.add_argument( "--image-input-shape", type=nullable_str, default=None, help=( "The biggest image input shape (worst for memory footprint) " "given an input type. Only used for vLLM's profile_run." ), ) parser.add_argument( "--image-feature-size", type=int, default=None, help=("The image feature size along the context dimension."), ) parser.add_argument( "--scheduler-delay-factor", type=float, default=EngineArgs.scheduler_delay_factor, help="Apply a delay (of delay factor multiplied by previous" "prompt latency) before scheduling next prompt.", ) parser.add_argument( "--enable-chunked-prefill", action="store_true", help="If set, the prefill requests can be chunked based on the " "max_num_batched_tokens.", ) parser.add_argument( "--speculative-model", type=nullable_str, default=EngineArgs.speculative_model, help="The name of the draft model to be used in speculative decoding.", ) parser.add_argument( "--num-speculative-tokens", type=int, default=EngineArgs.num_speculative_tokens, help="The number of speculative tokens to sample from " "the draft model in speculative decoding.", ) parser.add_argument( "--speculative-max-model-len", type=int, default=EngineArgs.speculative_max_model_len, help="The maximum sequence length supported by the " "draft model. Sequences over this length will skip " "speculation.", ) parser.add_argument( "--ngram-prompt-lookup-max", type=int, default=EngineArgs.ngram_prompt_lookup_max, help="Max size of window for ngram prompt lookup in speculative " "decoding.", ) parser.add_argument( "--ngram-prompt-lookup-min", type=int, default=EngineArgs.ngram_prompt_lookup_min, help="Min size of window for ngram prompt lookup in speculative " "decoding.", ) parser.add_argument( "--model-loader-extra-config", type=nullable_str, default=EngineArgs.model_loader_extra_config, help="Extra config for model loader. " "This will be passed to the model loader " "corresponding to the chosen load_format. " "This should be a JSON string that will be " "parsed into a dictionary.", ) parser.add_argument( "--served-model-name", nargs="+", type=str, default=None, help="The model name(s) used in the API. If multiple " "names are provided, the server will respond to any " "of the provided names. The model name in the model " "field of a response will be the first name in this " "list. If not specified, the model name will be the " "same as the `--model` argument. Noted that this name(s)" "will also be used in `model_name` tag content of " "prometheus metrics, if multiple names provided, metrics" "tag will take the first one.", ) return parser @classmethod def from_cli_args(cls, args: argparse.Namespace) -> "EngineArgs": # Get the list of attributes of this dataclass. attrs = [attr.name for attr in dataclasses.fields(cls)] # Set the attributes from the parsed arguments. engine_args = cls(**{attr: getattr(args, attr) for attr in attrs}) return engine_args def create_engine_config( self, ) -> EngineConfig: device_config = DeviceConfig(self.device) model_config = ModelConfig( self.model, self.tokenizer, self.tokenizer_mode, self.trust_remote_code, self.dtype, self.seed, self.revision, self.code_revision, self.tokenizer_revision, self.max_model_len, self.quantization, self.quantization_param_path, self.enforce_eager, self.max_context_len_to_capture, self.max_seq_len_to_capture, self.max_logprobs, self.skip_tokenizer_init, self.served_model_name, ) cache_config = CacheConfig( self.block_size, self.gpu_memory_utilization, self.swap_space, self.kv_cache_dtype, self.num_gpu_blocks_override, model_config.get_sliding_window(), self.enable_prefix_caching, ) parallel_config = ParallelConfig( self.pipeline_parallel_size, self.tensor_parallel_size, self.worker_use_ray, self.max_parallel_loading_workers, self.disable_custom_all_reduce, TokenizerPoolConfig.create_config( self.tokenizer_pool_size, self.tokenizer_pool_type, self.tokenizer_pool_extra_config, ), self.ray_workers_use_nsight, ) speculative_config = SpeculativeConfig.maybe_create_spec_config( target_model_config=model_config, target_parallel_config=parallel_config, target_dtype=self.dtype, speculative_model=self.speculative_model, num_speculative_tokens=self.num_speculative_tokens, speculative_max_model_len=self.speculative_max_model_len, enable_chunked_prefill=self.enable_chunked_prefill, use_v2_block_manager=self.use_v2_block_manager, ngram_prompt_lookup_max=self.ngram_prompt_lookup_max, ngram_prompt_lookup_min=self.ngram_prompt_lookup_min, ) scheduler_config = SchedulerConfig( self.max_num_batched_tokens, self.max_num_seqs, model_config.max_model_len, self.use_v2_block_manager, num_lookahead_slots=( self.num_lookahead_slots if speculative_config is None else speculative_config.num_lookahead_slots ), delay_factor=self.scheduler_delay_factor, enable_chunked_prefill=self.enable_chunked_prefill, ) lora_config = ( LoRAConfig( max_lora_rank=self.max_lora_rank, max_loras=self.max_loras, fully_sharded_loras=self.fully_sharded_loras, lora_extra_vocab_size=self.lora_extra_vocab_size, lora_dtype=self.lora_dtype, max_cpu_loras=( self.max_cpu_loras if self.max_cpu_loras and self.max_cpu_loras > 0 else None ), ) if self.enable_lora else None ) load_config = LoadConfig( load_format=self.load_format, download_dir=self.download_dir, model_loader_extra_config=self.model_loader_extra_config, ) if self.image_input_type: if ( not self.image_token_id or not self.image_input_shape or not self.image_feature_size ): raise ValueError( "Specify `image_token_id`, `image_input_shape` and " "`image_feature_size` together with `image_input_type`." ) vision_language_config = VisionLanguageConfig( image_input_type=VisionLanguageConfig.get_image_input_enum_type( self.image_input_type ), image_token_id=self.image_token_id, image_input_shape=str_to_int_tuple(self.image_input_shape), image_feature_size=self.image_feature_size, ) else: vision_language_config = None decoding_config = DecodingConfig( guided_decoding_backend=self.guided_decoding_backend ) return EngineConfig( model_config=model_config, cache_config=cache_config, parallel_config=parallel_config, scheduler_config=scheduler_config, device_config=device_config, lora_config=lora_config, vision_language_config=vision_language_config, speculative_config=speculative_config, load_config=load_config, decoding_config=decoding_config, )
- vllm_npu/vllm_npu/executor/__init__.py:在executor的初始化模块里替换ray集群的初始化函数。
1 2 3
from vllm_npu.executor.ray_utils import initialize_ray_cluster from vllm.executor import ray_utils ray_utils.initialize_ray_cluster = initialize_ray_cluster
- vllm_npu/vllm_npu/executor/ascend_executor.py:实现了昇腾环境下单卡推理所需的AscendExecutor和AscendExecutorAsync类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
# Part of codes in this file was copied from project [vLLM Team][vllm] from typing import List, Set, Tuple from vllm.executor.executor_base import ExecutorAsyncBase, ExecutorBase from vllm.logger import init_logger from vllm.lora.request import LoRARequest from vllm.sequence import ExecuteModelRequest, SamplerOutput from vllm.utils import (get_distributed_init_method, get_ip, get_open_port, make_async) from vllm_npu.worker.ascend_worker import AscendWorker logger = init_logger(__name__) class AscendExecutor(ExecutorBase): def _init_executor(self) -> None: assert (not self.speculative_config ), "Speculative decoding is not yet supported for Ascend backend." # Instantiate the worker and load the model to the device. self._init_worker() def _init_worker(self): distributed_init_method = get_distributed_init_method( get_ip(), get_open_port()) self.driver_worker = AscendWorker( self.model_config, self.parallel_config, self.scheduler_config, self.device_config, self.cache_config, self.load_config, local_rank=0, rank=0, distributed_init_method=distributed_init_method, lora_config=self.lora_config, is_driver_worker=True, ) self.driver_worker.init_device() self.driver_worker.load_model() def determine_num_available_blocks(self) -> Tuple[int, int]: """Determine the number of available KV blocks by invoking the underlying worker. """ return self.driver_worker.determine_num_available_blocks() def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None: """Initialize the KV cache by invoking the underlying worker. """ self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks) def execute_model( self, execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]: output = self.driver_worker.execute_model(execute_model_req) return output def add_lora(self, lora_request: LoRARequest) -> bool: return self.driver_worker.add_lora(lora_request) def remove_lora(self, lora_id: int) -> bool: return self.driver_worker.remove_lora(lora_id) def list_loras(self) -> Set[int]: return self.driver_worker.list_loras() def check_health(self) -> None: # NeuronExecutor will always be healthy as long as # it's running. return class AscendExecutorAsync(AscendExecutor, ExecutorAsyncBase): async def execute_model_async( self, execute_model_req: ExecuteModelRequest, ) -> List[SamplerOutput]: output = await make_async( self.driver_worker.execute_model )(execute_model_req=execute_model_req,) return output # async def check_health_async(self) -> None: # # AscendExecutor will always be healthy as long as # # it's running. # return
- vllm_npu/vllm_npu/executor/ascend_ray_executor.py:实现了昇腾环境下基于Ray进行多卡推理所需的RayAscendExecutor和RayAscendExecutorAsync类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
import asyncio import os import pickle from collections import defaultdict from itertools import islice, repeat from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple import vllm.envs as envs from vllm.executor.distributed_gpu_executor import ( # yapf: disable DistributedGPUExecutor, DistributedGPUExecutorAsync) from vllm.executor.ray_utils import RayWorkerWrapper, ray from vllm.logger import init_logger from vllm.sequence import ExecuteModelRequest, SamplerOutput from vllm.utils import (get_distributed_init_method, get_ip, get_open_port, get_vllm_instance_id, make_async) if ray is not None: from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy if TYPE_CHECKING: from ray.util.placement_group import PlacementGroup logger = init_logger(__name__) USE_RAY_COMPILED_DAG = envs.VLLM_USE_RAY_COMPILED_DAG class RayAscendExecutor(DistributedGPUExecutor): def _init_executor(self) -> None: assert (not self.speculative_config ), "Speculative decoding not yet supported for RayNPU backend." assert self.parallel_config.worker_use_ray placement_group = self.parallel_config.placement_group # Disable Ray usage stats collection. ray_usage = os.environ.get("RAY_USAGE_STATS_ENABLED", "0") if ray_usage != "1": os.environ["RAY_USAGE_STATS_ENABLED"] = "0" # Create the parallel GPU workers. self._init_workers_ray(placement_group) self.forward_dag = None if USE_RAY_COMPILED_DAG: self.forward_dag = self._compiled_ray_dag() def _init_workers_ray(self, placement_group: "PlacementGroup", **ray_remote_kwargs): if self.parallel_config.tensor_parallel_size == 1: # For single GPU case, we use a ray worker with constrained memory. num_gpus = self.cache_config.gpu_memory_utilization else: # Otherwise, the ray workers are allocated with a full GPU. num_gpus = 1 # The driver dummy worker does not actually use any resources. # It holds the resource for the driver worker. self.driver_dummy_worker: Optional[RayWorkerWrapper] = None # The remaining workers are the actual ray actors. self.workers: List[RayWorkerWrapper] = [] if self.parallel_config.ray_workers_use_nsight: ray_remote_kwargs = self._configure_ray_workers_use_nsight( ray_remote_kwargs) # Create the workers. driver_ip = get_ip() for bundle_id, bundle in enumerate(placement_group.bundle_specs): if not bundle.get("GPU", 0): continue scheduling_strategy = PlacementGroupSchedulingStrategy( placement_group=placement_group, placement_group_capture_child_tasks=True, placement_group_bundle_index=bundle_id, ) worker = ray.remote( num_cpus=0, num_gpus=num_gpus, scheduling_strategy=scheduling_strategy, **ray_remote_kwargs, )(RayWorkerWrapper).remote( worker_module_name="vllm_npu.worker.ascend_worker", worker_class_name="AscendWorker", trust_remote_code=self.model_config.trust_remote_code, ) worker_ip = ray.get(worker.get_node_ip.remote()) if worker_ip == driver_ip and self.driver_dummy_worker is None: # If the worker is on the same node as the driver, we use it # as the resource holder for the driver process. self.driver_dummy_worker = worker self.driver_worker = RayWorkerWrapper( worker_module_name="vllm_npu.worker.ascend_worker", worker_class_name="AscendWorker", trust_remote_code=self.model_config.trust_remote_code, ) else: # Else, added to the list of workers. self.workers.append(worker) if self.driver_dummy_worker is None: raise ValueError( "Ray does not allocate any GPUs on the driver node. Consider " "adjusting the Ray placement group or running the driver on a " "GPU node.") # Get the set of GPU IDs used on each node. worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids", use_dummy_driver=True) node_workers = defaultdict(list) node_gpus = defaultdict(list) for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids): node_workers[node_id].append(i) node_gpus[node_id].extend(gpu_ids) for node_id, gpu_ids in node_gpus.items(): node_gpus[node_id] = sorted(gpu_ids) VLLM_INSTANCE_ID = get_vllm_instance_id() # Set environment variables for the driver and workers. all_args_to_update_environment_variables = [({ "CUDA_VISIBLE_DEVICES": ",".join(map(str, node_gpus[node_id])), "VLLM_INSTANCE_ID": VLLM_INSTANCE_ID, "VLLM_TRACE_FUNCTION": str(envs.VLLM_TRACE_FUNCTION), }, ) for (node_id, _) in worker_node_and_gpu_ids] self._run_workers("update_environment_variables", all_args=all_args_to_update_environment_variables) distributed_init_method = get_distributed_init_method( driver_ip, get_open_port()) # Initialize the actual workers inside worker wrapper. init_worker_all_kwargs = [ self._get_worker_kwargs( local_rank=node_workers[node_id].index(rank), rank=rank, distributed_init_method=distributed_init_method, ) for rank, (node_id, _) in enumerate(worker_node_and_gpu_ids) ] self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs) self._run_workers("init_device") self._run_workers("load_model", max_concurrent_workers=self.parallel_config. max_parallel_loading_workers) def execute_model( self, execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]: all_outputs = self._run_workers( "execute_model", driver_kwargs={"execute_model_req": execute_model_req}, use_ray_compiled_dag=USE_RAY_COMPILED_DAG) # Only the driver worker returns the sampling results. return all_outputs[0] def _run_workers( self, method: str, *args, driver_args: Optional[Tuple[Any, ...]] = None, driver_kwargs: Optional[Dict[str, Any]] = None, all_args: Optional[List[Tuple[Any, ...]]] = None, all_kwargs: Optional[List[Dict[str, Any]]] = None, use_dummy_driver: bool = False, max_concurrent_workers: Optional[int] = None, use_ray_compiled_dag: bool = False, **kwargs, ) -> Any: """Runs the given method on all workers. Can be used in the following ways: - args/kwargs: All workers share the same args/kwargs - args/kwargs and driver_args/driver_kwargs: Driver worker has different args - all_args/all_kwargs: args/kwargs for each worker are specified individually """ if max_concurrent_workers: raise NotImplementedError( "max_concurrent_workers is not supported yet.") if driver_args is None: driver_args = args if all_args is None else all_args[0] if driver_kwargs is None: driver_kwargs = kwargs if all_kwargs is None else all_kwargs[0] count = len(self.workers) all_worker_args = repeat(args, count) if all_args is None \ else islice(all_args, 1, None) all_worker_kwargs = repeat(kwargs, count) if all_kwargs is None \ else islice(all_kwargs, 1, None) if use_ray_compiled_dag: assert self.forward_dag is not None output_channels = self.forward_dag.execute(1) else: # Start the ray workers first. ray_worker_outputs = [ worker.execute_method.remote(method, *worker_args, **worker_kwargs) for (worker, worker_args, worker_kwargs ) in zip(self.workers, all_worker_args, all_worker_kwargs) ] # Start the driver worker after all the ray workers. if not use_dummy_driver: driver_worker_output = self.driver_worker.execute_method( method, *driver_args, **driver_kwargs) else: assert self.driver_dummy_worker is not None driver_worker_output = ray.get( self.driver_dummy_worker.execute_method.remote( method, *driver_args, **driver_kwargs)) # Get the results of the ray workers. if self.workers: if use_ray_compiled_dag: try: ray_worker_outputs = [ pickle.loads(chan.begin_read()) for chan in output_channels ] finally: # Has to call end_read in order to reuse the DAG. for chan in output_channels: chan.end_read() else: ray_worker_outputs = ray.get(ray_worker_outputs) return [driver_worker_output] + ray_worker_outputs def _compiled_ray_dag(self): import pkg_resources required_version = "2.9" current_version = pkg_resources.get_distribution("ray").version if current_version < required_version: raise ValueError(f"Ray version {required_version} or greater is " f"required, but found {current_version}") from ray.dag import InputNode, MultiOutputNode assert self.parallel_config.worker_use_ray with InputNode() as input_data: forward_dag = MultiOutputNode([ worker.execute_model_compiled_dag_remote. bind( # type: ignore[attr-defined] input_data) for worker in self.workers ]) return forward_dag.experimental_compile() def check_health(self) -> None: """Raises an error if engine is unhealthy.""" self._check_if_any_actor_is_dead() def _check_if_any_actor_is_dead(self): if not self.workers: return dead_actors = [] for actor in self.workers: actor_state = ray.state.actors(actor._ray_actor_id.hex()) # pylint: disable=protected-access if actor_state["State"] == "DEAD": dead_actors.append(actor) if dead_actors: raise RuntimeError("At least one Worker is dead. " f"Dead Workers: {dead_actors}. ") class RayAscendExecutorAsync(RayAscendExecutor, DistributedGPUExecutorAsync): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.driver_executor = make_async(self.driver_worker.execute_method) async def _run_workers_async( self, method: str, *args, driver_args: Optional[Tuple[Any, ...]] = None, driver_kwargs: Optional[Dict[str, Any]] = None, **kwargs, ) -> Any: """Runs the given method on all workers.""" coros = [] if driver_args is None: driver_args = args if driver_kwargs is None: driver_kwargs = kwargs coros.append( self.driver_executor(method, *driver_args, **driver_kwargs)) # Run the ray workers asynchronously. for worker in self.workers: coros.append(worker.execute_method.remote(method, *args, **kwargs)) all_outputs = await asyncio.gather(*coros) return all_outputs
- vllm_npu/vllm_npu/executor/ray_utils.py:适配ray分布式环境的初始化操作,以便能在昇腾环境下成功初始化ray环境。
from typing import Optional, Tuple, TYPE_CHECKING from vllm.config import ParallelConfig from vllm.utils import is_hip from vllm.logger import init_logger logger = init_logger(__name__) try: import ray except ImportError as e: logger.warning(f"Failed to import Ray with {e!r}. " "For distributed inference, please install Ray with " "`pip install ray`.") ray = None if TYPE_CHECKING: from ray.util.placement_group import PlacementGroup def initialize_ray_cluster( parallel_config: ParallelConfig, ray_address: Optional[str] = None, ): """Initialize the distributed cluster with Ray. it will connect to the Ray cluster and create a placement group for the workers, which includes the specification of the resources for each distributed worker. Args: parallel_config: The configurations for parallel execution. ray_address: The address of the Ray cluster. If None, uses the default Ray cluster address. """ if ray is None: raise ImportError( "Ray is not installed. Please install Ray to use multi-node " "serving.") # Connect to a ray cluster. if is_hip(): ray.init(address=ray_address, ignore_reinit_error=True, num_gpus=parallel_config.world_size) else: """start adapt""" # without setting num_gpus, the function will try to detect num of # GPUs, but in ascend environment it may fail to detect gpus, thus # needed to be manually setted. ray.init(address=ray_address, ignore_reinit_error=True, num_gpus=parallel_config.world_size) """end adapt""" if parallel_config.placement_group: # Placement group is already set. return # Create placement group for worker processes current_placement_group = ray.util.get_current_placement_group() if current_placement_group: # We are in a placement group bundles = current_placement_group.bundle_specs # Verify that we can use the placement group. gpu_bundles = 0 for bundle in bundles: bundle_gpus = bundle.get("GPU", 0) if bundle_gpus > 1: raise ValueError( "Placement group bundle cannot have more than 1 GPU.") if bundle_gpus: gpu_bundles += 1 if parallel_config.world_size > gpu_bundles: raise ValueError( "The number of required GPUs exceeds the total number of " "available GPUs in the placement group.") else: num_gpus_in_cluster = ray.cluster_resources().get("GPU", 0) if parallel_config.world_size > num_gpus_in_cluster: raise ValueError( "The number of required GPUs exceeds the total number of " "available GPUs in the cluster.") # Create a new placement group placement_group_specs = ([{"GPU": 1}] * parallel_config.world_size) current_placement_group = ray.util.placement_group( placement_group_specs) # Wait until PG is ready - this will block until all # requested resources are available, and will timeout # if they cannot be provisioned. ray.get(current_placement_group.ready(), timeout=1800) # Set the placement group in the parallel config parallel_config.placement_group = current_placement_group
- vllm_npu/vllm_npu/model_executor/__init__.py:在model_executor中替换vllm原生loader里的get_architecture_class_name函数。
import vllm.model_executor.model_loader as vllm_model_loader import vllm_npu.model_executor.ascend_model_loader as ascend_model_loader vllm_model_loader.get_architecture_class_name = ascend_model_loader.get_architecture_class_name
- vllm_npu/vllm_npu/model_executor/ascend_model_loader.py:重写vllm原生的get_architecture_class_name函数,以适配量化场景;重写vllm原生的get_model函数,以便提供进入到MindIE整图模型的入口。
# Part of code in this file was copied from project [vLLM Team][vllm] for adapting usage import contextlib import torch import torch.nn as nn from vllm.config import DeviceConfig, ModelConfig, LoadConfig from vllm.model_executor.model_loader.weight_utils import initialize_dummy_weights from vllm_npu.model_executor.models.ascend.mindie_llm_wrapper import MindIELlmWrapper def get_architecture_class_name(model_config: ModelConfig) -> str: architectures = getattr(model_config.hf_config, "architectures", []) if (model_config.quantization is not None and model_config.quantization != "fp8" and "MixtralForCausalLM" in architectures): architectures = ["QuantMixtralForCausalLM"] return architectures[0] @contextlib.contextmanager def _set_default_torch_dtype(dtype: torch.dtype): """Sets the default torch dtype to the given dtype.""" old_dtype = torch.get_default_dtype() torch.set_default_dtype(dtype) yield torch.set_default_dtype(old_dtype) def get_model(model_config: ModelConfig, device_config: DeviceConfig, load_config: LoadConfig, mindie_model_config, **kwargs) -> nn.Module: lora_config = kwargs.get("lora_config", None) model_class = MindIELlmWrapper # Get the (maybe quantized) linear method. linear_method = None with _set_default_torch_dtype(model_config.dtype): # Create a model instance. # The weights will be initialized as empty tensors. with torch.device(device_config.device): if hasattr(model_class, "supported_lora_modules"): model = model_class(mindie_model_config, linear_method, lora_config) elif lora_config: raise ValueError( f"Model {model_class.__name__} does not support LoRA, " "but LoRA is enabled. Support for this model may " "be added in the future. If this is important to you, " "please open an issue on github.") else: model = model_class(mindie_model_config, linear_method) if load_config.load_format == "dummy": initialize_dummy_weights(model) else: # Load the weights from the cached or downloaded files. model.load_weights(model_config.model, load_config.download_dir, load_config.load_format, model_config.revision) model = model.npu() return model.eval()
- vllm_npu/vllm_npu/model_executor/layers/__init__.py:空白文件。
- vllm_npu/vllm_npu/model_executor/layers/ascend_sampler.py:实现昇腾后端的后处理类以便实现vllm数据结构和MindIE后处理的对接。
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. # Part of codes in this file was copied from project [vLLM Team][vllm] import random from typing import Dict, List, Optional, Tuple import torch import torch.nn as nn import numpy as np from vllm.sampling_params import SamplingType from vllm.sequence import SamplerOutput from vllm.model_executor.sampling_metadata import SamplingMetadata, SequenceGroupToSample from vllm.model_executor.layers.sampler import _get_logprobs, _build_sampler_output from mindie_llm.text_generator.utils.sampling_metadata import SamplingData, SamplingParam _SAMPLING_EPS = 1e-5 SampleResultType = List[Tuple[List[int], List[int]]] def _to_tensor(data, dtype=None): if dtype: return torch.tensor(data, dtype=dtype, device=torch.device("npu")) else: return torch.tensor(data, device=torch.device("npu")) class AscendSampler(nn.Module): def __init__(self, mindie_model): super().__init__() self.mindie_model = mindie_model self.include_gpu_probs_tensor = False def forward( self, logits: torch.Tensor, sampling_metadata: SamplingMetadata, ) -> Optional[SamplerOutput]: _, vocab_size = logits.shape mindie_sampling_data, mindie_sampling_param = self.construct_data(sampling_metadata, vocab_size) probs = torch.softmax(logits, dim=-1, dtype=torch.float) logprobs = torch.log_softmax(logits, dim=-1, dtype=torch.float) sampling_mask = [seq_group.do_sample for seq_group in sampling_metadata.seq_groups] filtered_logits = logits[sampling_mask] if filtered_logits.size(0) > 0: next_tokens, _= self.mindie_model.sample( filtered_logits, sampling_data=mindie_sampling_data, sampling_param=mindie_sampling_param, ) else: next_tokens = None sample_results, maybe_sampled_tokens_tensor = recover_data( sampling_metadata=sampling_metadata, sampled_tokens=next_tokens, logprobs=logprobs, include_gpu_probs_tensor=self.include_gpu_probs_tensor, ) if self.include_gpu_probs_tensor: if maybe_sampled_tokens_tensor is None: raise RuntimeError("maybe_sampled_tokens_tensor is None") on_device_tensors = (probs, logprobs, maybe_sampled_tokens_tensor) else: on_device_tensors = None # Get the logprobs query results. prompt_logprobs, sample_logprobs = _get_logprobs( logprobs, sampling_metadata, sample_results) return _build_sampler_output(sample_results, sampling_metadata, prompt_logprobs, sample_logprobs, on_device_tensors=on_device_tensors) def construct_data( self, sampling_metadata: SamplingMetadata, vocab_size: int, ) -> Tuple[SamplingData, SamplingParam]: all_input_tokens: List[List[int]] = [] prompt_tokens: List[List[int]] = [] output_tokens: List[List[int]] = [] top_ks: List[int] = [] temperatures: List[float] = [] top_ps: List[float] = [] min_ps: List[float] = [] presence_penalties: List[float] = [] frequency_penalties: List[float] = [] repetition_penalties: List[float] = [] sampling_seeds: List[int] = [] sample_indices: List[int] = [] do_samples: List[bool] = [] # To Do do_penalties = False do_top_p_top_k = False do_min_p = False greedy_flag = False if sampling_metadata.seq_groups is None: raise RuntimeError("sampling_metadata.seq_group is None, no data received.") for seq_group in sampling_metadata.seq_groups: do_samples.append(seq_group.do_sample) seq_ids = seq_group.seq_ids sampling_params = seq_group.sampling_params temperature = sampling_params.temperature p = sampling_params.presence_penalty f = sampling_params.frequency_penalty r = sampling_params.repetition_penalty top_p = sampling_params.top_p min_p = sampling_params.min_p is_greedy = sampling_params.sampling_type == SamplingType.GREEDY seed = sampling_params.seed if seed is None: if is_greedy: seed = 0 else: lo, hi = torch.iinfo(torch.long).min, torch.iinfo(torch.long).max seed = random.randint(lo, hi) if is_greedy: greedy_flag = True # k should not be greater than the vocab size. top_k = min(sampling_params.top_k, vocab_size) top_k = vocab_size if top_k == -1 else top_k if temperature < _SAMPLING_EPS: temperature = 1.0 if not do_top_p_top_k and (top_p < 1.0 - _SAMPLING_EPS or top_k != vocab_size): do_top_p_top_k = True if not do_min_p and min_p > _SAMPLING_EPS: do_min_p = True if not do_penalties: if abs(p) >= _SAMPLING_EPS: do_penalties = True elif abs(f) >= _SAMPLING_EPS: do_penalties = True elif abs(r - 1.0) >= _SAMPLING_EPS: do_penalties = True is_prompt = seq_group.is_prompt if (seq_group.is_prompt and sampling_params.prompt_logprobs is not None): # For tokens in the prompt that we only need to get # their logprobs query_len = seq_group.query_len if query_len is None: raise RuntimeError("query_len is None") prefill_len = len(seq_group.prompt_logprob_indices) temperatures += [temperature] * prefill_len sampling_seeds += [seed] * prefill_len top_ps += [top_p] * prefill_len top_ks += [top_k] * prefill_len min_ps += [min_p] * prefill_len presence_penalties += [0] * prefill_len frequency_penalties += [0] * prefill_len repetition_penalties += [1] * prefill_len prompt_tokens.extend([] for _ in range(prefill_len)) output_tokens.extend([] for _ in range(prefill_len)) all_input_tokens.extend([] for _ in range(prefill_len)) if seq_group.do_sample: sample_lens = len(seq_group.sample_indices) if sample_lens != len(seq_ids): raise ValueError("sample_lens != len(seq_ids)") for seq_id in seq_ids: seq_data = seq_group.seq_data[seq_id] prompt_tokens.append(seq_data.prompt_token_ids) output_tokens.append(seq_data.output_token_ids) all_input_tokens.append(seq_data.prompt_token_ids + seq_data.output_token_ids) temperatures += [temperature] * len(seq_ids) sampling_seeds += [seed] * len(seq_ids) top_ps += [top_p] * len(seq_ids) top_ks += [top_k] * len(seq_ids) min_ps += [min_p] * len(seq_ids) presence_penalties += [p] * len(seq_ids) frequency_penalties += [f] * len(seq_ids) repetition_penalties += [r] * len(seq_ids) repetition_penalties = np.array(repetition_penalties, dtype=np.float32) frequency_penalties = np.array(frequency_penalties, dtype=np.float32) presence_penalties = np.array(presence_penalties, dtype=np.float32) temperatures = np.array(temperatures, dtype=np.float32) top_ks = np.array(top_ks, dtype=np.int32) top_ps = np.array(top_ps, dtype=np.float32) sampling_seeds = np.array(sampling_seeds) do_samples = np.array(do_samples) max_tokens_len = max([len(tokens) for tokens in all_input_tokens], default=0) padded_all_input_tokens = [ tokens + [vocab_size] * (max_tokens_len - len(tokens)) for tokens in all_input_tokens ] padded_all_input_tokens = np.array(padded_all_input_tokens, dtype=np.int32) output_max_len = max([len(tokens) for tokens in output_tokens], default=0) padded_output_tokens = [ tokens + [vocab_size] * (output_max_len - len(tokens)) for tokens in output_tokens ] padded_output_tokens = np.array(padded_output_tokens, dtype=np.int32) all_input_ids_tensor = _to_tensor( padded_all_input_tokens, torch.int32 ) if padded_all_input_tokens is not None else None output_ids_tensor = _to_tensor( padded_output_tokens, torch.int32 ) if padded_output_tokens is not None else None mindie_sampling_data = SamplingData( all_input_ids=all_input_ids_tensor, output_ids=output_ids_tensor ) if greedy_flag: mindie_sampling_param = None else: mindie_sampling_param = SamplingParam.from_numpy( repetition_penalty=repetition_penalties, frequency_penalty=frequency_penalties, presence_penalty=presence_penalties, temperature=temperatures, top_k=top_ks, top_p=top_ps, seed=sampling_seeds, do_sample=do_samples, to_tensor=_to_tensor, ) return (mindie_sampling_data, mindie_sampling_param) def recover_data( sampling_metadata: SamplingMetadata, sampled_tokens: np.ndarray, logprobs: torch.Tensor, include_gpu_probs_tensor: bool, ) -> Tuple[SampleResultType, Optional[torch.Tensor]]: categorized_seq_group_ids: Dict[SamplingType, List[int]] = {t: [] for t in SamplingType} categorized_sample_indices = sampling_metadata.categorized_sample_indices for i, seq_group in enumerate(sampling_metadata.seq_groups): sampling_params = seq_group.sampling_params sampling_type = sampling_params.sampling_type categorized_seq_group_ids[sampling_type].append(i) sample_results_dict: Dict[int, Tuple[List[int], List[int]]] = {} sample_metadata = {} # Create output tensor for sampled token ids. if include_gpu_probs_tensor: sampled_token_ids_tensor = torch.empty(logprobs.shape[0], 1, dtype=torch.long, device=logprobs.device) else: sampled_token_ids_tensor = None for sampling_type in SamplingType: sample_indices = categorized_sample_indices[sampling_type][:, 0] num_tokens = len(sample_indices) if num_tokens == 0: continue seq_group_id = categorized_seq_group_ids[sampling_type] seq_groups = [sampling_metadata.seq_groups[i] for i in seq_group_id] sample_metadata[sampling_type] = (seq_group_id, seq_groups) for sampling_type in SamplingType: if sampling_type not in sample_metadata: continue (seq_group_id, seq_groups) = sample_metadata[sampling_type] if sampling_type in (SamplingType.GREEDY, SamplingType.RANDOM, SamplingType.RANDOM_SEED): sample_results = normal_wrap(seq_groups, sampled_tokens) elif sampling_type == SamplingType.BEAM: sample_results = beam_wrap(seq_groups, sampled_tokens) sample_results_dict.update(zip(seq_group_id, sample_results)) sample_results = [ sample_results_dict.get(i, ([], [])) for i in range(len(sampling_metadata.seq_groups)) ] return sample_results, sampled_token_ids_tensor def normal_wrap( selected_seq_groups: List[SequenceGroupToSample], samples: np.ndarray, ): samples = samples.tolist() sample_idx = 0 results: SampleResultType = [] for seq_group in selected_seq_groups: if not seq_group.do_sample: results.append(([], [])) continue seq_ids = seq_group.seq_ids num_parent_seqs = len(seq_ids) parent_ids = list(range(num_parent_seqs)) next_token_ids = [samples[sample_idx]] results.append((next_token_ids, parent_ids)) sample_idx += num_parent_seqs return results def beam_wrap( selected_seq_groups: List[SequenceGroupToSample], samples: np.ndarray, ): raise ValueError(f"Unsupported sampling type: beam search")
- vllm_npu/vllm_npu/model_executor/models/__init__.py:空白文件。
- vllm_npu/vllm_npu/model_executor/models/ascend/__init__.py:空白文件。
- vllm_npu/vllm_npu/model_executor/models/ascend/mindie_llm_wrapper.py:实现了对接MindIE整图模型的包装类,用于将vllm的数据结构解包出MindIE整图所需要的数据并透传下去。
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. import math from typing import List, Optional import torch from mindie_llm.text_generator.adapter.generator_torch import GeneratorTorch from torch import nn import torch.nn.functional as F from vllm.attention import AttentionMetadata from vllm.lora.request import LoRARequest from vllm.model_executor import SamplingMetadata from vllm.sequence import SamplerOutput from vllm_npu.model_executor.layers.ascend_sampler import AscendSampler from vllm_npu.worker.cache_engine import KVCache class MindIELlmWrapper(nn.Module): def __init__(self, mindie_model_config, linear_method=None, lora_config=None): super(MindIELlmWrapper, self).__init__() self.mindie_model_config = mindie_model_config self.rank = mindie_model_config["rank"] self.local_rank = mindie_model_config["local_rank"] self.npu_id = self.local_rank self.world_size = mindie_model_config["world_size"] self.kv_cache_dtype = mindie_model_config["kv_cache_dtype"] self.mindie_model = None self.sampler = None def forward( self, input_ids: torch.Tensor, positions: torch.Tensor, kv_caches: List[KVCache], attn_metadata: AttentionMetadata, lora_requests: List[LoRARequest], ) -> torch.Tensor: is_prompt = attn_metadata.prefill_metadata is not None if kv_caches[0][0] is None: kv_caches, block_tables, slots = self.create_dummy_kv_cache(attn_metadata, input_ids) else: block_tables = self.create_block_tables(attn_metadata) slots = attn_metadata.slot_mapping if attn_metadata.prefill_metadata is None: input_lengths = attn_metadata.decode_metadata.seq_lens_tensor max_seq_len = attn_metadata.decode_metadata.max_seq_len query_lens = [1] * len(attn_metadata.decode_metadata.seq_lens_tensor) lm_head_indices = None else: input_lengths = attn_metadata.prefill_metadata.seq_lens_tensor max_seq_len = attn_metadata.prefill_metadata.max_seq_len subquery_start_loc = attn_metadata.prefill_metadata.subquery_start_loc query_lens_tensor = subquery_start_loc[1:] - subquery_start_loc[:-1] if attn_metadata.decode_metadata is not None: input_lengths = torch.cat((input_lengths, attn_metadata.decode_metadata.seq_lens_tensor), dim=0) max_seq_len = max(max_seq_len, attn_metadata.decode_metadata.max_seq_len) query_lens_tensor = F.pad(query_lens_tensor, (0, attn_metadata.num_decode_tokens), "constant", 1) query_lens = query_lens_tensor.tolist() lm_head_indices = query_lens_tensor.cumsum(dim=-1) - 1 adapter_ids = [lora_request.lora_name if lora_request else None for lora_request in lora_requests] logits = self.mindie_model.forward_tensor( input_ids, positions, is_prompt, kv_caches, block_tables, slots, input_lengths, max_seq_len, lm_head_indices, adapter_ids=adapter_ids, q_lens=query_lens, ) return logits def compute_logits(self, hidden_states: torch.Tensor, sampling_metadata: SamplingMetadata) -> torch.Tensor: return hidden_states def sample( self, logits: torch.Tensor, sampling_metadata: SamplingMetadata, ) -> Optional[SamplerOutput]: # hidden_states is logits next_tokens = self.sampler(logits, sampling_metadata) return next_tokens def load_weights( self, model_name_or_path: str, cache_dir: Optional[str] = None, load_format: str = "auto", revision: Optional[str] = None, ): if load_format not in ["auto", "safetensors", "pt"]: raise ValueError("load-format support [safetensors, pt]") self.weight_dtype = torch.get_default_dtype() torch.set_default_dtype(torch.float32) self.mindie_model = GeneratorTorch(self.mindie_model_config) self.sampler = AscendSampler(self.mindie_model) torch.set_default_dtype(self.weight_dtype) # when warmup, create dummy kvcache, block_tables, slot_mapping def create_dummy_kv_cache(self, attn_metadata, input_ids): dummy_block_size = 128 max_s = max(attn_metadata.prefill_metadata.seq_lens_tensor) max_need_block = math.ceil(max_s / dummy_block_size) batch_size = len(attn_metadata.prefill_metadata.seq_lens_tensor) dummy_block_num = max_need_block * batch_size model_runner = self.mindie_model.model_wrapper.model_runner kv_cache = [ ( torch.empty( (dummy_block_num, dummy_block_size, model_runner.num_kv_heads, model_runner.head_size), dtype=self.kv_cache_dtype, device="npu", ), torch.empty( (dummy_block_num, dummy_block_size, model_runner.num_kv_heads, model_runner.head_size), dtype=self.kv_cache_dtype, device="npu", ), ) for _ in range(model_runner.num_layers) ] block_tables = torch.zeros(batch_size, max_need_block, dtype=int, device="npu") slot = [i for i in range(dummy_block_size)] slots = [] warm_up_len = len(input_ids) while warm_up_len > 0: if warm_up_len > dummy_block_size: slots.extend(slot) warm_up_len -= dummy_block_size else: slots.extend(slot[:warm_up_len]) warm_up_len = 0 slots = torch.tensor(slots, dtype=torch.long, device="npu") return kv_cache, block_tables, slots def create_block_tables(self, attn_metadata): if attn_metadata.prefill_metadata is None: return attn_metadata.decode_metadata.block_tables prefill_block_tables = attn_metadata.prefill_metadata.block_tables if prefill_block_tables.numel() == 0: return torch.tensor([0], dtype=torch.int32, device="npu") if attn_metadata.decode_metadata is None: return prefill_block_tables decode_block_tables = attn_metadata.decode_metadata.block_tables pad_size = prefill_block_tables.size(1) - decode_block_tables.size(1) if pad_size > 0: decode_block_tables = F.pad(decode_block_tables, (0, pad_size), "constant", 0) elif pad_size < 0: prefill_block_tables = F.pad(prefill_block_tables, (0, -pad_size), "constant", 0) return torch.cat((prefill_block_tables, decode_block_tables), dim=0)
- vllm_npu/vllm_npu/usage/__init__.py:热替换vllm原生的_report_usage_once函数以修复一个打印usage信息时的报错。
import types from vllm.engine.llm_engine import usage_message import vllm_npu.usage.usage_lib as vllm_npu_usage_lib usage_message._report_usage_once = types.MethodType(vllm_npu_usage_lib._report_usage_once, usage_message)
- vllm_npu/vllm_npu/usage/usage_lib.py:重新实现vllm原生的_report_usage_once函数以修复一个打印usage信息时的报错。
# Part of code in this file was copied from project [vLLM Team][vllm] for adapting usage import platform from typing import Any, Dict import cpuinfo import psutil import torch import vllm.envs as envs from vllm.usage.usage_lib import UsageContext, _detect_cloud_provider, _get_current_timestamp_ns def _report_usage_once(self, model_architecture: str, usage_context: UsageContext, extra_kvs: Dict[str, Any]) -> None: # Platform information if torch.npu.is_available(): device_property = torch.npu.get_device_properties() self.gpu_count = torch.npu.device_count() self.gpu_type = device_property.name self.gpu_memory_per_device = device_property.total_memory self.provider = _detect_cloud_provider() self.architecture = platform.machine() self.platform = platform.platform() self.total_memory = psutil.virtual_memory().total info = cpuinfo.get_cpu_info() self.num_cpu = info.get("count", None) self.cpu_type = info.get("brand_raw", "") self.cpu_family_model_stepping = ",".join([ str(info.get("family", "")), str(info.get("model", "")), str(info.get("stepping", "")) ]) # vLLM information import vllm # delayed import to prevent circular import self.context = usage_context.value self.vllm_version = vllm.__version__ self.model_architecture = model_architecture # Metadata self.log_time = _get_current_timestamp_ns() self.source = envs.VLLM_USAGE_SOURCE data = vars(self) if data["_report_usage_once"] is not None: del data["_report_usage_once"] if extra_kvs: data.update(extra_kvs) self._write_to_file(data) self._send_to_server(data)
- vllm_npu/vllm_npu/worker/__init__.py:替换vllm原生CacheEngine中的_allocate_kv_cache函数,以实现自定义的kv_cache分配方法。
from vllm_npu.worker.cache_engine import _allocate_kv_cache from vllm.worker import cache_engine cache_engine.CacheEngine._allocate_kv_cache = _allocate_kv_cache
- vllm_npu/vllm_npu/worker/ascend_model_runner.py : 实现昇腾后端使用的AscendModelRunner类,已适配prefix cache特性。
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. # Part of codes in this file was copied from project [vLLM Team][vllm] from typing import List, NamedTuple, Optional, Tuple import torch from vllm.attention import AttentionMetadata, AttentionMetadataPerStage from vllm.attention.backends.flashinfer import FlashInferBackend from vllm.config import ( DeviceConfig, LoadConfig, LoRAConfig, ModelConfig, ParallelConfig, SchedulerConfig, VisionLanguageConfig, ) from vllm.distributed import broadcast_tensor_dict from vllm.logger import init_logger from vllm.lora.layers import LoRAMapping from vllm.lora.request import LoRARequest from vllm.model_executor import SamplingMetadata from vllm.sampling_params import SamplingParams from vllm.sequence import SamplerOutput, SequenceGroupMetadata from vllm.utils import get_kv_cache_torch_dtype, is_hip, make_tensor_with_pad from vllm.worker.model_runner import ( _PAD_SLOT_ID, BatchType, ModelRunner, PrepareDecodeMetadata, PreparePromptMetadata, _prepare_fake_inputs, ) from vllm_npu.model_executor.ascend_model_loader import get_model logger = init_logger(__name__) LORA_WARMUP_RANK = 8 class PreparePromptMetadata(NamedTuple): input_tokens: List[int] input_positions: List[int] attn_metadata: Optional[AttentionMetadataPerStage] seq_lens: List[int] query_lens: List[int] lora_index_mapping: List[int] lora_prompt_mapping: List[int] lora_requests: List[LoRARequest] multi_modal_input: Optional[torch.Tensor] slot_mapping: List[int] @classmethod def empty(cls): return PreparePromptMetadata( input_tokens=[], input_positions=[], attn_metadata=None, seq_lens=[], query_lens=[], lora_index_mapping=[], lora_prompt_mapping=[], lora_requests=[], multi_modal_input=None, slot_mapping=[], ) class PrepareDecodeMetadata(NamedTuple): input_tokens: List[int] input_positions: List[int] attn_metadata: Optional[AttentionMetadata] lora_index_mapping: List[int] lora_prompt_mapping: List[int] lora_requests: List[LoRARequest] slot_mapping: List[int] @classmethod def empty(cls): return PrepareDecodeMetadata( input_tokens=[], input_positions=[], attn_metadata=None, lora_index_mapping=[], lora_prompt_mapping=[], lora_requests=[], slot_mapping=[], ) class AscendModelRunner(ModelRunner): def __init__( self, model_config: ModelConfig, parallel_config: ParallelConfig, scheduler_config: SchedulerConfig, device_config: DeviceConfig, load_config: LoadConfig, lora_config: Optional[LoRAConfig], mindie_model_config, kv_cache_dtype: Optional[str] = "auto", is_driver_worker: bool = False, vision_language_config: Optional[VisionLanguageConfig] = None, ): super(AscendModelRunner, self).__init__( model_config, parallel_config, scheduler_config, device_config, load_config, lora_config, kv_cache_dtype, is_driver_worker, vision_language_config, ) self.kv_cache_torch_dtype = get_kv_cache_torch_dtype(kv_cache_dtype, model_config.dtype) self.mindie_model_config = mindie_model_config self.mindie_model_config["kv_cache_dtype"] = self.kv_cache_torch_dtype def load_model(self) -> None: self.model = get_model( model_config=self.model_config, device_config=self.device_config, load_config=self.load_config, mindie_model_config=self.mindie_model_config, ) if self.kv_cache_dtype == "fp8" and is_hip(): # Currently scaled KV cache is only enabled on ROCm if self.model_config.quantization_param_path is not None: if callable(getattr(self.model, "load_kv_cache_scales", None)): self.model.load_kv_cache_scales(self.model_config.quantization_param_path) else: raise RuntimeError( "Using FP8 KV cache and scaling factors provided but " "model %s does not support loading scaling factors.", self.model.__class__, ) else: logger.warning( "Using FP8 KV cache but no scaling factors " "provided. Defaulting to scaling factors of 1.0. " "This may lead to less accurate results!" ) elif self.model_config.quantization_param_path is not None: logger.warning( "KV cache scaling factors provided, " "but the KV cache data type is not FP8. " "KV cache scaling factors will not be used." ) @torch.inference_mode() def profile_run(self) -> None: # Enable top-k sampling to reflect the accurate memory usage. sampling_params = SamplingParams(top_p=0.99, top_k=self.vocab_size - 1) max_num_batched_tokens = self.scheduler_config.max_num_batched_tokens max_num_seqs = self.scheduler_config.max_num_seqs dummy_lora_requests_per_seq = [] seqs: List[SequenceGroupMetadata] = [] if self.vision_language_config: max_num_seqs = min( max_num_seqs, int(max_num_batched_tokens / self.vision_language_config.image_feature_size) ) for group_id in range(max_num_seqs): seq_len = max_num_batched_tokens // max_num_seqs + (group_id < max_num_batched_tokens % max_num_seqs) seq_data, fake_multi_modal_input = _prepare_fake_inputs(seq_len, self.vision_language_config) seq = SequenceGroupMetadata( request_id=str(group_id), is_prompt=True, seq_data={group_id: seq_data}, sampling_params=sampling_params, block_tables=None, lora_request=dummy_lora_requests_per_seq[group_id] if dummy_lora_requests_per_seq else None, multi_modal_data=fake_multi_modal_input, ) seqs.append(seq) # Run the model with the dummy inputs. num_layers = self.model_config.get_num_layers(self.parallel_config) kv_caches = [(None, None)] * num_layers self.execute_model(seqs, kv_caches) torch.npu.synchronize() return def _prepare_prompt( self, seq_group_metadata_list: List[SequenceGroupMetadata], ) -> PreparePromptMetadata: input_tokens: List[int] = [] input_positions: List[int] = [] slot_mapping: List[int] = [] lora_index_mapping: List[int] = [] lora_prompt_mapping: List[int] = [] lora_requests: List[LoRARequest] = [] seq_lens: List[int] = [] context_lens: List[int] = [] query_lens: List[int] = [] prefix_block_tables: List[List[int]] = [] multi_modal_input_list: List[torch.Tensor] = [] if len(seq_group_metadata_list) == 0: return PreparePromptMetadata.empty() for seq_group_metadata in seq_group_metadata_list: if not seq_group_metadata.is_prompt: raise ValueError("Expected prompt sequence group metadata.") seq_ids = list(seq_group_metadata.seq_data.keys()) if len(seq_ids) != 1: raise ValueError("Expected only one sequence ID.") seq_id = seq_ids[0] computed_block_nums = seq_group_metadata.computed_block_nums if ( self.scheduler_config is not None and self.scheduler_config.chunked_prefill_enabled and not (computed_block_nums is None or computed_block_nums == []) ): raise RuntimeError("chunked prefill cannot be used with prefix caching " "now.") token_chunk_size = seq_group_metadata.token_chunk_size seq_data = seq_group_metadata.seq_data[seq_id] context_len = seq_data.get_num_computed_tokens() # We should use get_len here because in case of preemption # it contains output tokens. seq_len = min(seq_data.get_len(), context_len + token_chunk_size) prompt_tokens = seq_data.get_token_ids()[context_len:seq_len] seq_lens.append(seq_len) # NOTE: This only works for oooooooxxx style attention. if seq_group_metadata.block_tables is not None: block_table = seq_group_metadata.block_tables[seq_id] else: block_table = [] if block_table: prefix_block_tables.append(block_table) else: prefix_block_tables.append([]) if computed_block_nums is not None and len(computed_block_nums) > 0 and self.sliding_window is None: # Prefix is not supported with sliding_window context_len = len(computed_block_nums) * self.block_size prompt_tokens = prompt_tokens[context_len:] # actual prompt lens context_lens.append(context_len) query_lens.append(seq_len - context_len) input_tokens.extend(prompt_tokens) # NOTE(woosuk): Here we assume that the first token in the prompt # is always the first token in the sequence. input_positions.extend(list(range(context_len, seq_len))) lora_id = seq_group_metadata.lora_int_id lora_requests.append(seq_group_metadata.lora_request) lora_index_mapping += [lora_id] * (seq_len - context_len) lora_prompt_mapping.extend( [lora_id] * (seq_len - context_len if seq_group_metadata.sampling_params.prompt_logprobs else 1) ) if seq_group_metadata.multi_modal_data: multi_modal_input_list.append(seq_group_metadata.multi_modal_data.data) if seq_group_metadata.block_tables is None: # During memory profiling, the block tables are not initialized # yet. In this case, we just use a dummy slot mapping. slot_mapping.extend([_PAD_SLOT_ID] * seq_len) continue # Compute the slot mapping. block_table = seq_group_metadata.block_tables[seq_id] # Mask the [0, start_idx) tokens of the prompt with _PAD_SLOT_ID, # where start_idx is max(0, seq_len - sliding_window). # For example, if the prompt len is 10, sliding window is 8, and # block size is 4, the first two tokens are masked and the slot # mapping will be [-1, -1, 2, 3, 4, 5, 6, 7, 0, 1]. start_idx = 0 if self.sliding_window is not None: if context_len != 0: raise ValueError("Prefix caching is currently not supported with sliding window attention") start_idx = max(0, seq_len - self.sliding_window) for i in range(context_len, seq_len): if i < start_idx: slot_mapping.append(_PAD_SLOT_ID) continue block_number = block_table[i // self.block_size] block_offset = i % self.block_size slot = block_number * self.block_size + block_offset slot_mapping.append(slot) max_query_len = max(query_lens) max_seq_len = max(seq_lens) if max_query_len <= 0: raise ValueError("max_query_len must be greater than 0") context_lens_tensor = torch.tensor(context_lens, dtype=torch.int, device=self.device) if multi_modal_input_list: if not self.vision_language_config: raise ValueError("Multi-modal inputs are only supported by vision language models.") multi_modal_input = torch.cat(multi_modal_input_list, dim=0).to(self.device) else: multi_modal_input = None # Prepare prefix block tables max_prompt_block_table_len = max(len(t) for t in prefix_block_tables) block_tables = make_tensor_with_pad( prefix_block_tables, max_len=max_prompt_block_table_len, pad=0, dtype=torch.int, device=self.device, ) # Query length can be shorter than key (i.e., prompt) when prefill # is chunked or prefix cached. query_lens_tensor = torch.tensor(query_lens, dtype=torch.long, device=self.device) subquery_start_loc = torch.zeros(query_lens_tensor.shape[0] + 1, dtype=torch.int32, device=self.device) seq_lens_tensor = torch.tensor(seq_lens, dtype=torch.int, device=self.device) seq_start_loc = torch.zeros(seq_lens_tensor.shape[0] + 1, dtype=torch.int32, device=self.device) torch.cumsum(query_lens_tensor, dim=0, dtype=subquery_start_loc.dtype, out=subquery_start_loc[1:]) torch.cumsum(seq_lens_tensor, dim=0, dtype=seq_start_loc.dtype, out=seq_start_loc[1:]) if self.attn_backend is FlashInferBackend: attn_metadata = self.attn_backend.make_metadata( is_prompt=True, use_cuda_graph=False, seq_start_loc=seq_start_loc, max_seq_len=max_seq_len, block_tables=block_tables, ) else: attn_metadata = self.attn_backend.make_metadata( is_prompt=True, seq_lens=seq_lens, seq_lens_tensor=seq_lens_tensor, max_query_len=max_query_len, max_seq_len=max_seq_len, subquery_start_loc=subquery_start_loc, seq_start_loc=seq_start_loc, context_lens_tensor=context_lens_tensor, block_tables=block_tables, use_cuda_graph=False, ) return PreparePromptMetadata( input_tokens=input_tokens, input_positions=input_positions, attn_metadata=attn_metadata, seq_lens=seq_lens, query_lens=query_lens, lora_index_mapping=lora_index_mapping, lora_prompt_mapping=lora_prompt_mapping, lora_requests=lora_requests, multi_modal_input=multi_modal_input, slot_mapping=slot_mapping, ) def _prepare_decode( self, seq_group_metadata_list: List[SequenceGroupMetadata], ) -> PrepareDecodeMetadata: input_tokens: List[int] = [] input_positions: List[int] = [] slot_mapping: List[int] = [] seq_lens: List[int] = [] block_tables: List[List[int]] = [] lora_index_mapping: List[int] = [] lora_prompt_mapping: List[int] = [] lora_requests: List[LoRARequest] = [] if len(seq_group_metadata_list) == 0: return PrepareDecodeMetadata.empty() for seq_group_metadata in seq_group_metadata_list: if seq_group_metadata.is_prompt: raise ValueError("seq_group_metadata should not be a prompt") if seq_group_metadata.token_chunk_size != 1: raise ValueError("token_chunk_size should be 1") seq_ids = list(seq_group_metadata.seq_data.keys()) lora_id = seq_group_metadata.lora_int_id lora_requests.append(seq_group_metadata.lora_request) for seq_id in seq_ids: seq_data = seq_group_metadata.seq_data[seq_id] generation_token = seq_data.get_last_token_id() input_tokens.append(generation_token) seq_len = seq_data.get_len() position = seq_len - 1 input_positions.append(position) seq_len = seq_len if self.sliding_window is None else min(seq_len, self.sliding_window) seq_lens.append(seq_len) block_table = seq_group_metadata.block_tables[seq_id] block_number = block_table[position // self.block_size] block_offset = position % self.block_size slot = block_number * self.block_size + block_offset slot_mapping.append(slot) lora_index_mapping.append(lora_id) lora_prompt_mapping.append(lora_id) if self.sliding_window is not None: sliding_window_blocks = self.sliding_window // self.block_size block_table = block_table[-sliding_window_blocks:] block_tables.append(block_table) max_seq_len = max(seq_lens) seq_lens_tensor = torch.tensor(seq_lens, dtype=torch.int, device=self.device) max_block_table_len = max(len(block_table) for block_table in block_tables) block_tables = make_tensor_with_pad( block_tables, max_len=max_block_table_len, pad=0, dtype=torch.int, device=self.device, ) attn_metadata = self.attn_backend.make_metadata( is_prompt=False, seq_lens=None, seq_lens_tensor=seq_lens_tensor, max_query_len=None, max_seq_len=max_seq_len, subquery_start_loc=None, seq_start_loc=None, context_lens_tensor=None, block_tables=block_tables, use_cuda_graph=False, ) return PrepareDecodeMetadata( input_tokens=input_tokens, input_positions=input_positions, attn_metadata=attn_metadata, lora_index_mapping=lora_index_mapping, lora_prompt_mapping=lora_prompt_mapping, lora_requests=lora_requests, slot_mapping=slot_mapping, ) def prepare_input_tensors( self, seq_group_metadata_list: List[SequenceGroupMetadata], ) -> Tuple[ torch.Tensor, torch.Tensor, AttentionMetadata, SamplingMetadata, List[LoRARequest], LoRAMapping, torch.Tensor ]: if self.is_driver_worker: prefill_reqs = [] decode_reqs = [] for seq_group_meta in seq_group_metadata_list: if seq_group_meta.is_prompt: prefill_reqs.append(seq_group_meta) else: decode_reqs.append(seq_group_meta) # Prepare input tensors. ( input_tokens, input_positions, prefill_attn_metadata, seq_lens, query_lens, lora_index_mapping, lora_prompt_mapping, lora_requests, multi_modal_input, slot_mapping, ) = self._prepare_prompt(prefill_reqs) ( decode_input_tokens, decode_input_positions, decode_attn_metadata, decode_lora_index_mapping, decode_lora_prompt_mapping, decode_lora_requests, decode_slot_mapping, ) = self._prepare_decode(decode_reqs) sampling_metadata = SamplingMetadata.prepare( seq_group_metadata_list, seq_lens, query_lens, self.device, self.pin_memory ) if not self.scheduler_config.chunked_prefill_enabled and prefill_reqs and decode_reqs: raise ValueError("Cannot have both prefill and decode requests when chunked_prefill_enabled is False") num_prefills = len(seq_lens) num_prefill_tokens = len(input_tokens) num_decode_tokens = len(decode_input_tokens) # Coalesce tensors. Note that attn_metadata is currently not # coalesced for simplicity. input_tokens.extend(decode_input_tokens) input_positions.extend(decode_input_positions) slot_mapping.extend(decode_slot_mapping) lora_index_mapping.extend(decode_lora_index_mapping) lora_prompt_mapping.extend(decode_lora_prompt_mapping) lora_requests.extend(decode_lora_requests) input_tokens = torch.tensor(input_tokens, dtype=torch.long, device=self.device) input_positions = torch.tensor(input_positions, dtype=torch.long, device=self.device) slot_mapping = torch.tensor(slot_mapping, dtype=torch.long, device=self.device) if self.lora_config: lora_mapping = LoRAMapping( lora_index_mapping, lora_prompt_mapping, ) else: lora_mapping = None # Broadcast the metadata. # If batch contains both prefill and decode, it sends 2 broadcasts. # If it only contains 1 type, it triggers a single broadcast. if prefill_attn_metadata is not None and decode_attn_metadata is not None: batch_type = BatchType.MIXED elif prefill_attn_metadata is not None: batch_type = BatchType.PREFILL else: batch_type = BatchType.DECODE metadata_dict = { "input_tokens": input_tokens, "input_positions": input_positions, "selected_token_indices": sampling_metadata.selected_token_indices, "lora_requests": lora_requests, "lora_mapping": lora_mapping, "multi_modal_input": multi_modal_input, "num_prefill_tokens": num_prefill_tokens, "num_decode_tokens": num_decode_tokens, "slot_mapping": slot_mapping, "num_prefills": num_prefills, "batch_type": batch_type, } if prefill_attn_metadata is not None: metadata_dict.update(prefill_attn_metadata.asdict_zerocopy()) else: if decode_attn_metadata is None: raise ValueError("decode_attn_metadata is None") metadata_dict.update(decode_attn_metadata.asdict_zerocopy()) broadcast_tensor_dict(metadata_dict, src=0) # Broadcast decode attn metadata for mixed batch type. # The additional broadcast costs 300us overhead on 4 A10 GPUs. # We can potentially reduce the overhead by coelescing tensors. if batch_type == BatchType.MIXED: if decode_attn_metadata is None: raise ValueError("decode_attn_metadata is None") metadata_dict = decode_attn_metadata.asdict_zerocopy() broadcast_tensor_dict(metadata_dict, src=0) else: metadata_dict = broadcast_tensor_dict(src=0) input_tokens = metadata_dict.pop("input_tokens") input_positions = metadata_dict.pop("input_positions") slot_mapping = metadata_dict.pop("slot_mapping") num_prefills = metadata_dict.pop("num_prefills") selected_token_indices = metadata_dict.pop("selected_token_indices") lora_mapping = metadata_dict.pop("lora_mapping") lora_requests = metadata_dict.pop("lora_requests") multi_modal_input = metadata_dict.pop("multi_modal_input") num_prefill_tokens = metadata_dict.pop("num_prefill_tokens") num_decode_tokens = metadata_dict.pop("num_decode_tokens") batch_type = metadata_dict.pop("batch_type") # Create an attention metadata. prefill_attn_metadata = None decode_attn_metadata = None if batch_type == BatchType.PREFILL or batch_type == BatchType.MIXED: prefill_attn_metadata = self.attn_backend.make_metadata(**metadata_dict) else: decode_attn_metadata = self.attn_backend.make_metadata(**metadata_dict) sampling_metadata = SamplingMetadata( seq_groups=None, selected_token_indices=selected_token_indices, categorized_sample_indices=None, num_prompts=0, ) # if it is a mixed batch, decode attn_metadata is broadcasted # separately. if batch_type == BatchType.MIXED: metadata_dict = broadcast_tensor_dict(src=0) decode_attn_metadata = self.attn_backend.make_metadata(**metadata_dict) attn_metadata = AttentionMetadata( num_prefills=num_prefills, slot_mapping=slot_mapping, num_prefill_tokens=num_prefill_tokens, num_decode_tokens=num_decode_tokens, prefill_metadata=prefill_attn_metadata, decode_metadata=decode_attn_metadata, kv_cache_dtype=self.kv_cache_torch_dtype, ) return ( input_tokens, input_positions, attn_metadata, sampling_metadata, lora_requests, lora_mapping, multi_modal_input, ) @torch.inference_mode() def execute_model( self, seq_group_metadata_list: List[SequenceGroupMetadata], kv_caches: List[Tuple[torch.Tensor, torch.Tensor]], ) -> Optional[SamplerOutput]: (input_tokens, input_positions, attn_metadata, sampling_metadata, lora_requests, _, _) = ( self.prepare_input_tensors(seq_group_metadata_list) ) # Currently cuda graph is only supported by the decode phase. model_executable = self.model execute_model_kwargs = { "input_ids": input_tokens, "positions": input_positions, "kv_caches": kv_caches, "attn_metadata": attn_metadata, "lora_requests": lora_requests, } hidden_states = model_executable(**execute_model_kwargs) # Only perform sampling in the driver worker. if not self.is_driver_worker: return None # Sample the next token. output = self.model.sample( logits=hidden_states, sampling_metadata=sampling_metadata, ) return output
- vllm_npu/vllm_npu/worker/ascend_worker.py:实现昇腾后端所使用的AscendWorker类。
"""A Ascend worker class.""" import gc from typing import Dict, List, Tuple, Set, Optional, Any import torch import torch.distributed from vllm.config import (CacheConfig, DeviceConfig, ModelConfig, LoadConfig, ParallelConfig, SchedulerConfig, LoRAConfig, VisionLanguageConfig) from vllm.model_executor import set_random_seed from vllm.distributed import (broadcast_tensor_dict, ensure_model_parallel_initialized, init_distributed_environment) from vllm.sequence import SamplerOutput, ExecuteModelRequest from vllm.worker.cache_engine import CacheEngine from vllm.lora.request import LoRARequest from vllm.worker.worker_base import WorkerBase from vllm.worker.worker import raise_if_cache_size_invalid from vllm_npu.worker.ascend_model_runner import AscendModelRunner class AscendWorker(WorkerBase): """A worker class that executes the model on a group of Ascend NPUs. """ def __init__( self, model_config: ModelConfig, parallel_config: ParallelConfig, scheduler_config: SchedulerConfig, device_config: DeviceConfig, cache_config: CacheConfig, load_config: LoadConfig, local_rank: int, rank: int, distributed_init_method: str, lora_config: Optional[LoRAConfig] = None, vision_language_config: Optional[VisionLanguageConfig] = None, is_driver_worker: bool = False, ) -> None: self.model_config = model_config self.parallel_config = parallel_config self.scheduler_config = scheduler_config self.device_config = device_config self.cache_config = cache_config self.local_rank = local_rank self.rank = rank self.distributed_init_method = distributed_init_method self.lora_config = lora_config self.load_config = load_config self.is_driver_worker = is_driver_worker if self.is_driver_worker: assert self.rank == 0, "The driver worker must have rank 0." if self.model_config.trust_remote_code: # note: lazy import to avoid importing torch before initializing from vllm.utils import init_cached_hf_modules init_cached_hf_modules() self.vision_language_config = vision_language_config mindie_model_config = { 'backend_type': 'atb', 'model_id': model_config.model, 'rank': rank, 'local_rank': local_rank, 'world_size': parallel_config.world_size, 'npu_device_id': local_rank, "trust_remote_code": model_config.trust_remote_code, 'inference_mode': 2 if scheduler_config.chunked_prefill_enabled or cache_config.enable_prefix_caching else 0, } self.model_runner = AscendModelRunner( model_config, parallel_config, scheduler_config, device_config, load_config=load_config, lora_config=self.lora_config, kv_cache_dtype=self.cache_config.cache_dtype, is_driver_worker=is_driver_worker, mindie_model_config=mindie_model_config) # Uninitialized cache engine. Will be initialized by # self.initialize_cache(). self.cache_engine: CacheEngine self.gpu_cache: List[torch.Tensor] def init_device(self) -> None: self.device = torch.device(f"npu:{self.local_rank}") torch.npu.set_device(self.device) # Initialize the distributed environment. init_worker_distributed_environment(self.parallel_config, self.rank, self.distributed_init_method, self.local_rank) # Initialize the model. set_random_seed(self.model_config.seed) def load_model(self): self.model_runner.load_model() @torch.inference_mode() def determine_num_available_blocks(self) -> Tuple[int, int]: """Profiles the peak memory usage of the model and returns the maximum number of NPU and CPU cache blocks that can be allocated. """ # Profile the memory usage of the model and get the maximum number of # cache blocks that can be allocated with the remaining free memory. torch.npu.empty_cache() torch.npu.reset_peak_memory_stats() # Execute a forward pass with dummy inputs to profile the memory usage # of the model. self.model_runner.profile_run() block_size = self.cache_config.block_size dummy_block_size = 128 dummy_num_blocks = dummy_block_size // block_size # Calculate the number of blocks that can be allocated with the # profiled peak memory. torch.npu.synchronize() peak_memory = torch.npu.max_memory_allocated() total_gpu_memory = torch.npu.get_device_properties(self.rank).total_memory cache_block_size = CacheEngine.get_cache_block_size( self.cache_config, self.model_config, self.parallel_config) num_gpu_blocks = int( (total_gpu_memory * self.cache_config.gpu_memory_utilization - peak_memory) // cache_block_size) + dummy_num_blocks num_cpu_blocks = int(self.cache_config.swap_space_bytes // cache_block_size) num_gpu_blocks = max(num_gpu_blocks, 0) num_cpu_blocks = max(num_cpu_blocks, 0) if self.model_runner.lora_manager: self.model_runner.remove_all_loras() gc.collect() torch.npu.empty_cache() return num_gpu_blocks, num_cpu_blocks def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks: int) -> None: raise_if_cache_size_invalid(num_gpu_blocks, self.cache_config.block_size, self.model_config.max_model_len) self.cache_config.num_gpu_blocks = num_gpu_blocks self.cache_config.num_cpu_blocks = num_cpu_blocks self._init_cache_engine() self._warm_up_model() def _init_cache_engine(self): assert self.cache_config.num_gpu_blocks is not None self.cache_engine = CacheEngine(self.cache_config, self.model_config, self.parallel_config) self.gpu_cache = self.cache_engine.gpu_cache self.model_runner.set_block_size(self.cache_engine.block_size) def _warm_up_model(self) -> None: pass def cache_swap( self, blocks_to_swap_in: Dict[int, int], blocks_to_swap_out: Dict[int, int], blocks_to_copy: Dict[int, List[int]], ) -> None: if blocks_to_swap_in: self.cache_engine.swap_in(blocks_to_swap_in) if blocks_to_swap_out: self.cache_engine.swap_out(blocks_to_swap_out) if blocks_to_copy: self.cache_engine.copy(blocks_to_copy) @torch.inference_mode() def execute_model( self, execute_model_req: Optional[ExecuteModelRequest] = None ) -> List[SamplerOutput]: if execute_model_req is None: seq_group_metadata_list = None else: seq_group_metadata_list = execute_model_req.seq_group_metadata_list if self.is_driver_worker: assert seq_group_metadata_list is not None assert execute_model_req is not None num_seq_groups = len(seq_group_metadata_list) blocks_to_swap_in = execute_model_req.blocks_to_swap_in blocks_to_swap_out = execute_model_req.blocks_to_swap_out blocks_to_copy = execute_model_req.blocks_to_copy data: Dict[str, Any] = { "num_seq_groups": num_seq_groups, "blocks_to_swap_in": blocks_to_swap_in, "blocks_to_swap_out": blocks_to_swap_out, "blocks_to_copy": blocks_to_copy, } broadcast_tensor_dict(data, src=0) else: data = broadcast_tensor_dict(src=0) num_seq_groups = data["num_seq_groups"] blocks_to_swap_in = data["blocks_to_swap_in"] blocks_to_swap_out = data["blocks_to_swap_out"] blocks_to_copy = data["blocks_to_copy"] self.cache_swap(blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy) # If there is no input, we don't need to execute the model. if num_seq_groups == 0: return [] output = self.model_runner.execute_model(seq_group_metadata_list, self.gpu_cache) # Worker only supports single-step execution. Wrap the output in a list # to conform to interface. return [output] def add_lora(self, lora_request: LoRARequest) -> bool: return self.model_runner.add_lora(lora_request) def remove_lora(self, lora_id: int) -> bool: return self.model_runner.remove_lora(lora_id) def list_loras(self) -> Set[int]: return self.model_runner.list_loras() def get_cache_block_size_bytes(self) -> int: """Get the size of the KV cache block size in bytes. """ return CacheEngine.get_cache_block_size(self.cache_config, self.model_config, self.parallel_config) def init_worker_distributed_environment( parallel_config: ParallelConfig, rank: int, distributed_init_method: Optional[str] = None, local_rank: int = -1, ) -> None: """Initialize the distributed environment.""" init_distributed_environment(parallel_config.world_size, rank, distributed_init_method, local_rank) ensure_model_parallel_initialized(parallel_config.tensor_parallel_size, parallel_config.pipeline_parallel_size)
- vllm_npu/vllm_npu/worker/cache_engine.py:重新实现_allocate_kv_cache函数,以便以元组的形式进行kv_cache的创建。
from typing import Tuple, List import torch KVCache = Tuple[torch.Tensor, torch.Tensor] def _allocate_kv_cache( self, num_blocks: int, device: str, ) -> List[KVCache]: """Allocates KV cache on the specified device.""" kv_cache: List[KVCache] = [] key_block_shape = (self.block_size, self.num_heads, self.head_size) value_block_shape = (self.block_size, self.num_heads, self.head_size) for _ in range(self.num_layers): key_blocks = torch.empty( size=(num_blocks, *key_block_shape), dtype=self.dtype, device=device, ) value_blocks = torch.empty( size=(num_blocks, *value_block_shape), dtype=self.dtype, device=device, ) kv_cache.append((key_blocks, value_blocks)) return kv_cache
- vllm_npu/vllm_npu/config.py:重写DeviceConfig类以适配npu类型的device;重写_get_and_verify_max_len函数以解决一些模型config加载时的问题。
import warnings from typing import Optional import torch from transformers import PretrainedConfig from vllm.logger import init_logger logger = init_logger(__name__) class DeviceConfig: def __init__(self, device: str = "auto") -> None: if device == "auto": # Automated device type detection if getattr(torch.version, "cann", None) is not None: self.device_type = "npu" else: warnings.warn( "Failed to detect cann in your environment. \ Please check whether you have installed cann correctly. \ Now the device type for processing input is set to cpu." ) self.device_type = "cpu" else: # Device type is assigned explicitly self.device_type = device self.device = torch.device(self.device_type) def _get_and_verify_max_len( hf_config: PretrainedConfig, max_model_len: Optional[int], ) -> int: """Get and verify the model's maximum length.""" derived_max_model_len = float("inf") possible_keys = [ # OPT "max_position_embeddings", # GPT-2 "n_positions", # MPT "max_seq_len", # ChatGLM2 "seq_length", # Command-R "model_max_length", # Others "max_sequence_length", "max_seq_length", "seq_len", ] for key in possible_keys: max_len_key = getattr(hf_config, key, None) if max_len_key is not None: derived_max_model_len = min(derived_max_model_len, max_len_key) if derived_max_model_len == float("inf"): if max_model_len is not None: # If max_model_len is specified, we use it. return max_model_len default_max_len = 2048 logger.warning( "The model's config.json does not contain any of the following " "keys to determine the original maximum length of the model: " f"{possible_keys}. Assuming the model's maximum length is " f"{default_max_len}." ) derived_max_model_len = default_max_len rope_scaling = getattr(hf_config, "rope_scaling", None) if rope_scaling is not None: if "type" in rope_scaling: rope_type = rope_scaling["type"] elif "rope_type" in rope_scaling: rope_type = rope_scaling["rope_type"] else: raise ValueError("rope_scaling must have a 'type' or 'rope_type' key.") # The correct one should be "longrope", kept "su" here # to be backward compatible if rope_type not in ("su", "longrope", "llama3"): if "factor" not in rope_scaling: raise ValueError("rope_scaling must have a 'factor' key.") scaling_factor = rope_scaling["factor"] if rope_type == "yarn": derived_max_model_len = rope_scaling["original_max_position_embeddings"] derived_max_model_len *= scaling_factor if max_model_len is None: max_model_len = derived_max_model_len elif max_model_len > derived_max_model_len: raise ValueError( f"User-specified max_model_len ({max_model_len}) is greater than " f"the derived max_model_len ({max_len_key}={derived_max_model_len}" " in model's config.json). This may lead to incorrect model " "outputs or CUDA errors. Make sure the value is correct and " "within the model context size." ) return int(max_model_len)
- vllm_npu/vllm_npu/utils.py:添加是否是昇腾后端的判断函数;重写get_ip函数以修复多卡运行时卡住的问题。
# Part of codes in this file was copied from project [vLLM Team][vllm] import socket import warnings from functools import lru_cache import vllm.envs as envs @lru_cache(maxsize=None) def is_ascend() -> bool: try: import torch_npu except ImportError: torch_npu = None return torch_npu is not None def get_ip() -> str: host_ip = envs.VLLM_HOST_IP if host_ip: return host_ip # IP is not set, try to get it from the network interface # try ipv4 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: s.connect(("localhost", 80)) # Doesn't need to be reachable socket_name = s.getsockname()[0] s.close() return socket_name except Exception: warnings.warn("Encounted with connection errors. Using 0.0.0.0 by default.") s.close() # try ipv6 try: s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) # Google's public DNS server, see # https://developers.google.com/speed/public-dns/docs/using#addresses s.connect(("localhost", 80)) # Doesn't need to be reachable socket_name = s.getsockname()[0] s.close() return socket_name except Exception: warnings.warn("Encounted with connection errors. Using 0.0.0.0 by default.") s.close() s.close() warnings.warn( "Failed to get the IP address, using 0.0.0.0 by default." "The value can be set by the environment variable" " VLLM_HOST_IP or HOST_IP.", stacklevel=2) return "0.0.0.0"
- vllm_npu/vllm_npu/npu_adaptor.py:编写mocky patch去屏蔽掉vllm原生会造成报错的一些import操作。
import importlib import sys def replace_modules(old_module_name, new_module_name): if old_module_name in sys.modules: del sys.modules[old_module_name] sys.modules[old_module_name] = importlib.import_module(new_module_name) _default_ops = ( 'xformers', 'xformers.ops', 'vllm._C', 'xformers.ops.fmha.attn_bias', 'vllm.model_executor.layers.ops.sample' ) for _ops in _default_ops: replace_modules(_ops, 'vllm_npu') # dummy class to avoid import error. class ops: pass class cuda_utils: pass class cache_ops: pass class BlockDiagonalCausalMask: pass class LowerTriangularMaskWithTensorBias: pass def context_attention_fwd(): pass def get_num_triton_sampler_splits(): pass def sample(): pass
- vllm_npu/vllm_npu/__init__.py:做子模块以外的公共模块的热替换,已经mocky patch的应用。
import torch import torch_npu from torch_npu.contrib import transfer_to_npu from vllm_npu.npu_adaptor import (BlockDiagonalCausalMask, LowerTriangularMaskWithTensorBias, cache_ops, cuda_utils, ops, context_attention_fwd, get_num_triton_sampler_splits, sample) import vllm_npu.core import vllm_npu.engine import vllm_npu.worker import vllm_npu.model_executor import vllm_npu.executor import vllm_npu.attention import vllm_npu.usage from vllm_npu.utils import get_ip from vllm_npu.config import DeviceConfig, _get_and_verify_max_len import vllm.utils as utils import vllm.executor.ray_utils as ray_utils import vllm.config as vconfig import vllm.engine.arg_utils as varg_utils utils.get_ip = get_ip ray_utils.get_ip = get_ip vconfig.DeviceConfig = DeviceConfig vconfig._get_and_verify_max_len = _get_and_verify_max_len varg_utils.DeviceConfig = DeviceConfig __version__ = "0.4.2"
- examples/start_server.sh:在线模式运行vllm的示例脚本。
#!/bin/bash export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors python -m vllm.entrypoints.openai.api_server --model=facebook/opt-125m --trust-remote-code --enforce-eager --worker-use-ray
- examples/offline_inference.py:离线模式运行vllm的示例脚本。
from vllm import LLM, SamplingParams import json import argparse parser = argparse.ArgumentParser() parser.add_argument('--model_path', type=str, default="facebook/opt-125m") # input prompts for test prompts = [ "Hello, my name is", "The president of the United States is", "The capital of France is", "The future of AI is", ] sampling_params = SamplingParams(max_tokens=512, temperature=0) args = parser.parse_args() model_path = args.model_path llm = LLM(model=model_path, block_size=128, max_model_len=4096, # max length of prompt tensor_parallel_size=8, # number of NPUs to be used max_num_seqs=256, # max batch number enforce_eager=True, # disable CUDA graph mode trust_remote_code=True, # If the model is a custom model not yet available in the HuggingFace transformers library worker_use_ray=True, ) outputs = llm.generate(prompts, sampling_params) for i, output in enumerate(outputs): prompt = output.prompt generated_text = output.outputs[0].text print(f"req_num: {i}\nPrompt: {prompt!r}\nGenerated text: {generated_text!r}")
- examples/offline_inference.sh:离线模式运行vllm的示例脚本。
#!/bin/bash export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors python3 offline_inference.py --model_path facebook/opt-125m
- install.sh:一键安装脚本。
#!/bin/bash if [ -d "./vllm" ]; then echo "./vllm directory has already exist!" exit 1 fi git clone -b v0.4.2 https://github.com/vllm-project/vllm.git vllm yes | cp -r cover/* vllm/ cd vllm pip install -r requirements-ascend.txt python3 setup.py install cd ../vllm_npu pip install -r requirements.txt python3 setup.py install
- README.md:
# Vllm-MindIE ## 介绍 昇腾推理引擎对接vLLM开源框架v0.4.2稳定版本补丁 ## 适配方案 在昇腾环境中适配vLLM框架的方案如下: - 上层维持vLLM框架原生逻辑,包括请求调度、batch构建以及通过Ray分布式框架启动多卡服务等功能; - 下层的模型推理与后处理则通过MindIE-LLM提供的`GeneratorTorch`统一接口,接入MindIE模型仓进行统一管理,从而利用整图加速库实现模型推理加速。 ## 支持特性 0.4.2版本已经支持的功能特性如下 - 浮点模型推理 - 量化模型推理 - LoRA模型推理 - SplitFuse ## 环境准备 ### 依赖版本 - Vllm-MindIE适配仓代码配套可运行的硬件型号 - Atlas 800I A2(32GB/64GB显存) - Vllm-MindIE适配仓代码运行相关配套软件 - 系统OS - 驱动(HDK) - CANN - Python - PTA - 开源软件依赖 - 版本配套关系 - 当前Vllm-MindIE适配仓需基于CANN包8.0版本及以上,Python 3.10,torch 2.1.0进行环境部署与运行 ### 环境配置 环境配置的详细过程参照[MindIE-LLM模型仓](https://gitee.com/ascend/MindIE-LLM) ## 安装教程 确保昇腾推理基础环境安装完成后,执行`install.sh`文件即可完成vllm及昇腾补丁的安装: ```sh bash install.sh ``` ## 使用说明 这里提供了vllm离线模式与在线服务的启动demo作为参考。 **注:请根据实际情况修改运行脚本offline_inference.sh和start_server.sh中的模型路径参数以使用特定的模型进行推理** - 离线模式:使用前先设置offline_inference.sh脚本中的model_path参数为推理使用的模型路径 ```sh cd examples bash offline_inference.sh ``` - 在线服务:使用前先设置start_server.sh脚本中的model参数为推理使用的模型路径;此外,常用的其他参数配置如下: - -tp n:设置模型推理使用的卡数 - --port port_num:指定推理服务使用的端口号 - --max-num-seqs bs:设置推理服务支持的最大batch size - 配置使用特定的若干张卡进行推理:添加环境变量,如想使用前四卡进行推理,则在脚本中python命令之前添加如下命令: ```sh export ASCEND_RT_VISIBLE_DEVICES=0,1,2,3 ``` 完成上述参数配置后,运行: ```sh cd examples bash start_server.sh ``` - 多LoRA:需要在multilora_inference.sh文件中设置如下参数: - model-path参数:指定推理使用的基础模型路径 - lora-modules参数:指定推理使用的LoRA模型,格式为{LoRA模型名=LoRA模型路径},可以指定多个LoRA模型(**注:在该测试用例中至少要指定两个lora模型才能运行**) 完成上述参数配置后,运行: ```sh cd examples bash multilora_inference.sh ``` - Split Fuse: 使用前先设置test_split_fuse.sh脚本中的model_path参数为推理使用的模型路径 ```sh cd examples bash test_split_fuse.sh ``` ## 详细说明 `vllm_npu_0.4.2` 补丁包修改了六个主要模块:`attention`、`engine`、`usage`、`executor`、`model_executor` 和 `worker`。这些模块通过热替换确保与 Ascend 后端兼容,同时保持与 vLLM 原生框架中对应模块的一致性。以下是各模块的修改内容: ### Attention 模块 在该模块中,定义了`AscendAttentionBackend`类以整合Ascend后端做attention计算所需要的数据 ### Engine 模块 该模块引入了一些关键更改,以确保能够正确进入补丁仓的`executor` 模块,通过重新实现`LLMEngine`和`AsyncLLMEngine`的`from_engine_args`函数实现。此外,在 Ray 模块初始化时,将 `num_gpus` 参数设置为 `parallel_config.world_size`,解决了多卡服务启动问题。 ### Usage 模块 该模块主要为了解决vLLM原生框架的`usage_lib`在Ascend环境下获取设备信息报错的问题 ### Executor 模块 该模块中定义了`AscendExecutor`和`RayAscendExecutor`类,用于正确对接后续的`AscendWorker`类。 ### Model Executor 模块 该模块负责与 MindIE-LLM 推理和后处理管道的对接,包含两个子模块:`layers` 和 `models`。`layers` 子模块主要负责后处理操作,而 `models` 子模块专注于模型推理。 在 `models` 子模块中,创建了一个名为 `MindIELlmWrapper` 的类。该类实例化了 MindIE-LLM 提供的统一接口 `GeneratorTorch`,并从 vLLM 原生框架的数据结构中提取 MindIE-LLM 所需的推理参数,进而通过统一接口进行推理。此外,该类还负责在预热时构造虚拟数据。 同时添加了 `ascend_model_loader.py` 文件,提供 `get_model` 方法,用于在 `ModelRunner` 中加载模型。 ### Worker 模块 该模块定义了 `AscendWorker` 类和 `AscendModelRunner` 类,分别定义在`ascend_worker.py` 和 `ascend_model_runner.py` 文件中,用于与 `MindIELlmWrapper` 进行交互,确保模型的加载、预热、推理和后处理过程能够正确执行。此外,还修改了原框架中 `CacheEngine` 的 `_allocate_kv_cache` 函数,确保与后端接收的参数形式一致。
将上述所有代码内容,按照项目的目录结构完全还原后,进入项目根目录执行如下脚本,即可完成vLLM昇腾适配版的安装。
bash install.sh