昇腾社区首页
EN
注册

参考代码

vLLM 0.4.2版本昇腾框架适配代码目录结构如下所示:

├── cover
│   ├── requirements-ascend.txt
│   ├── setup.py
│   └── vllm
│       └── __init__.py
├── examples
│   ├── offline_inference.py
│   ├── offline_inference.sh
│   └── start_server.sh
├── install.sh
├── README.md
└── vllm_npu
    ├── README.md
    ├── requirements.txt
    ├── setup.py
    └── vllm_npu
        ├── attention
        │   ├── backends.py
        │   ├── __init__.py
        │   └── selector.py
        ├── config.py
        ├── core
        │   └── __init__.py
        ├── engine
        │   ├── arg_utils.py
        │   ├── ascend_engine.py
        │   ├── async_ascend_engine.py
        │   └── __init__.py
        ├── executor
        │   ├── ascend_executor.py
        │   ├── ascend_ray_executor.py
        │   ├── __init__.py
        │   └── ray_utils.py
        ├── __init__.py
        ├── model_executor
        │   ├── ascend_model_loader.py
        │   ├── __init__.py
        │   ├── layers
        │   │   ├── ascend_sampler.py
        │   │   └── __init__.py
        │   └── models
        │       ├── ascend
        │       │   ├── __init__.py
        │       │   └── mindie_llm_wrapper.py
        │       └── __init__.py
        ├── npu_adaptor.py
        ├── usage
        │   ├── __init__.py
        │   └── usage_lib.py
        ├── utils.py
        └── worker
            ├── ascend_model_runner.py
            ├── ascend_worker.py
            ├── cache_engine.py
            └── __init__.py

其中主要包括如下四个部分

  1. cover文件夹下包含了对vllm框架源码的修改内容。
  2. examples文件夹下包含了离线模式和在线模式的使用实例代码。
  3. vllm_npu文件夹下包含了补丁仓的源码内容。
  4. install.sh为一键式安装脚本,在将所有的代码文件都还原后,即可运行该脚本一键安装昇腾适配版的vllm框架,其中会自动拉取源码安装vllm原生框架并打上适配补丁。

代码仓中各个文件的代码内容

  • cover/requirements-ascend.txt:昇腾环境上运行所需的python依赖
    cmake >= 3.21
    ninja  # For faster builds.
    psutil
    sentencepiece  # Required for LLaMA tokenizer.
    numpy
    requests
    py-cpuinfo
    transformers >= 4.40.0  # Required for StarCoder2 & Llava, Llama 3.
    tokenizers >= 0.19.1  # Required for Llama 3.
    fastapi
    openai
    uvicorn[standard]
    pydantic >= 2.0  # Required for OpenAI server.
    prometheus_client >= 0.18.0
    prometheus-fastapi-instrumentator >= 7.0.0
    tiktoken == 0.6.0  # Required for DBRX tokenizer
    lm-format-enforcer == 0.10.1
    typing_extensions
    filelock >= 3.10.4 # filelock starts to support `mode` argument from 3.10.4
    ray == 2.9.3
    pynvml == 11.5.0
    outlines == 0.0.34
  • cover/setup.py:适配vllm的安装,在其中添加NPU后端,并更新相关的版本名称、依赖模块等内容
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    import importlib.util
    import io
    import logging
    import os
    import re
    import subprocess
    import sys
    from shutil import which
    from typing import Dict, List
    import torch
    from packaging.version import Version, parse
    from setuptools import Extension, find_packages, setup
    from setuptools.command.build_ext import build_ext
    from torch.utils.cpp_extension import CUDA_HOME, BuildExtension
    def load_module_from_path(module_name, path):
        spec = importlib.util.spec_from_file_location(module_name, path)
        module = importlib.util.module_from_spec(spec)
        sys.modules[module_name] = module
        spec.loader.exec_module(module)
        return module
    ROOT_DIR = os.path.dirname(__file__)
    logger = logging.getLogger(__name__)
    # cannot import envs directly because it depends on vllm,
    #  which is not installed yet
    envs = load_module_from_path('envs', os.path.join(ROOT_DIR, 'vllm', 'envs.py'))
    VLLM_TARGET_DEVICE = 'npu'
    # vLLM only supports Linux platform
    assert sys.platform.startswith(
        "linux"), "vLLM only supports Linux platform (including WSL)."
    MAIN_CUDA_VERSION = "12.1"
    def is_sccache_available() -> bool:
        return which("sccache") is not None
    def is_ccache_available() -> bool:
        return which("ccache") is not None
    def is_ninja_available() -> bool:
        return which("ninja") is not None
    def remove_prefix(text, prefix):
        if text.startswith(prefix):
            return text[len(prefix):]
        return text
    class CMakeExtension(Extension):
        def __init__(self, name: str, cmake_lists_dir: str = '.', **kwa) -> None:
            super().__init__(name, sources=[], **kwa)
            self.cmake_lists_dir = os.path.abspath(cmake_lists_dir)
    
    def _is_cuda() -> bool:
        return VLLM_TARGET_DEVICE == "cuda" \
                and torch.version.cuda is not None \
                and not _is_neuron()
    def _is_hip() -> bool:
        return (VLLM_TARGET_DEVICE == "cuda"
                or VLLM_TARGET_DEVICE == "rocm") and torch.version.hip is not None
    def _is_neuron() -> bool:
        torch_neuronx_installed = True
        try:
            subprocess.run(["neuron-ls"], capture_output=True, check=True)
        except (FileNotFoundError, PermissionError, subprocess.CalledProcessError):
            torch_neuronx_installed = False
        return torch_neuronx_installed or envs.VLLM_BUILD_WITH_NEURON
    def _is_cpu() -> bool:
        return VLLM_TARGET_DEVICE == "cpu"
    def _is_npu() -> bool:
        return VLLM_TARGET_DEVICE == "npu"
    def _install_punica() -> bool:
        return envs.VLLM_INSTALL_PUNICA_KERNELS
    def get_hipcc_rocm_version():
        # Run the hipcc --version command
        result = subprocess.run(['hipcc', '--version'],
                                stdout=subprocess.PIPE,
                                stderr=subprocess.STDOUT,
                                text=True)
        # Check if the command was executed successfully
        if result.returncode != 0:
            print("Error running 'hipcc --version'")
            return None
        # Extract the version using a regular expression
        match = re.search(r'HIP version: (\S+)', result.stdout)
        if match:
            # Return the version string
            return match.group(1)
        else:
            print("Could not find HIP version in the output")
            return None
    def get_neuronxcc_version():
        import sysconfig
        site_dir = sysconfig.get_paths()["purelib"]
        version_file = os.path.join(site_dir, "neuronxcc", "version",
                                    "__init__.py")
        # Check if the command was executed successfully
        with open(version_file, "rt") as fp:
            content = fp.read()
        # Extract the version using a regular expression
        match = re.search(r"__version__ = '(\S+)'", content)
        if match:
            # Return the version string
            return match.group(1)
        else:
            raise RuntimeError("Could not find HIP version in the output")
    def get_nvcc_cuda_version() -> Version:
        """Get the CUDA version from nvcc.
        Adapted from https://github.com/NVIDIA/apex/blob/8b7a1ff183741dd8f9b87e7bafd04cfde99cea28/setup.py
        """
        assert CUDA_HOME is not None, "CUDA_HOME is not set"
        nvcc_output = subprocess.check_output([CUDA_HOME + "/bin/nvcc", "-V"],
                                              universal_newlines=True)
        output = nvcc_output.split()
        release_idx = output.index("release") + 1
        nvcc_cuda_version = parse(output[release_idx].split(",")[0])
        return nvcc_cuda_version
    def get_path(*filepath) -> str:
        return os.path.join(ROOT_DIR, *filepath)
    def find_version(filepath: str) -> str:
        """Extract version information from the given filepath.
        Adapted from https://github.com/ray-project/ray/blob/0b190ee1160eeca9796bc091e07eaebf4c85b511/python/setup.py
        """
        with open(filepath) as fp:
            version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
                                      fp.read(), re.M)
            if version_match:
                return version_match.group(1)
            raise RuntimeError("Unable to find version string.")
    def get_vllm_version() -> str:
        version = find_version(get_path("vllm", "__init__.py"))
        # return version
        if _is_cuda():
            cuda_version = str(get_nvcc_cuda_version())
            if cuda_version != MAIN_CUDA_VERSION:
                cuda_version_str = cuda_version.replace(".", "")[:3]
                version += f"+cu{cuda_version_str}"
        elif _is_hip():
            # Get the HIP version
            hipcc_version = get_hipcc_rocm_version()
            if hipcc_version != MAIN_CUDA_VERSION:
                rocm_version_str = hipcc_version.replace(".", "")[:3]
                version += f"+rocm{rocm_version_str}"
        elif _is_neuron():
            # Get the Neuron version
            neuron_version = str(get_neuronxcc_version())
            if neuron_version != MAIN_CUDA_VERSION:
                neuron_version_str = neuron_version.replace(".", "")[:3]
                version += f"+neuron{neuron_version_str}"
        elif _is_npu():
            version += "+npu"
        elif _is_cpu():
            version += "+cpu"
        else:
            raise RuntimeError("Unknown runtime environment")
        return version
    def read_readme() -> str:
        """Read the README file if present."""
        p = get_path("README.md")
        if os.path.isfile(p):
            return io.open(get_path("README.md"), "r", encoding="utf-8").read()
        else:
            return ""
    def get_requirements() -> List[str]:
        """Get Python package dependencies from requirements.txt."""
        def _read_requirements(filename: str) -> List[str]:
            with open(get_path(filename)) as f:
                requirements = f.read().strip().split("\n")
            resolved_requirements = []
            for line in requirements:
                if line.startswith("-r "):
                    resolved_requirements += _read_requirements(line.split()[1])
                else:
                    resolved_requirements.append(line)
            return resolved_requirements
        if _is_cuda():
            requirements = _read_requirements("requirements-cuda.txt")
            cuda_major, cuda_minor = torch.version.cuda.split(".")
            modified_requirements = []
            for req in requirements:
                if "vllm-nccl-cu12" in req:
                    req = req.replace("vllm-nccl-cu12",
                                      f"vllm-nccl-cu{cuda_major}")
                elif ("vllm-flash-attn" in req
                      and not (cuda_major == "12" and cuda_minor == "1")):
                    # vllm-flash-attn is built only for CUDA 12.1.
                    # Skip for other versions.
                    continue
                modified_requirements.append(req)
            requirements = modified_requirements
        elif _is_hip():
            requirements = _read_requirements("requirements-rocm.txt")
        elif _is_neuron():
            requirements = _read_requirements("requirements-neuron.txt")
        elif _is_npu():
            requirements = _read_requirements("requirements-ascend.txt")
        elif _is_cpu():
            requirements = _read_requirements("requirements-cpu.txt")
        else:
            raise ValueError(
                "Unsupported platform, please use CUDA, ROCm, Neuron, NPU or CPU.")
        return requirements
    ext_modules = []
    if _is_cuda():
        ext_modules.append(CMakeExtension(name="vllm._moe_C"))
    """
    if not _is_neuron():
        ext_modules.append(CMakeExtension(name="vllm._C"))
        if _install_punica():
            ext_modules.append(CMakeExtension(name="vllm._punica_C"))
    """
    package_data = {
        "vllm": ["py.typed", "model_executor/layers/fused_moe/configs/*.json"]
    }
    if envs.VLLM_USE_PRECOMPILED:
        ext_modules = []
        package_data["vllm"].append("*.so")
    setup(
        name="vllm",
        version=get_vllm_version(),
        author="vLLM Team",
        license="Apache 2.0",
        description=("A high-throughput and memory-efficient inference and "
                     "serving engine for LLMs"),
        long_description=read_readme(),
        long_description_content_type="text/markdown",
        url="https://github.com/vllm-project/vllm",
        project_urls={
            "Homepage": "https://github.com/vllm-project/vllm",
            "Documentation": "https://vllm.readthedocs.io/en/latest/",
        },
        classifiers=[
            "Programming Language :: Python :: 3.8",
            "Programming Language :: Python :: 3.9",
            "Programming Language :: Python :: 3.10",
            "Programming Language :: Python :: 3.11",
            "License :: OSI Approved :: Apache Software License",
            "Topic :: Scientific/Engineering :: Artificial Intelligence",
        ],
        packages=find_packages(exclude=("benchmarks", "csrc", "docs", "examples",
                                        "tests*")),
        python_requires=">=3.8",
        install_requires=get_requirements(),
        ext_modules=ext_modules,
        extras_require={
            "tensorizer": ["tensorizer==2.9.0"],
        },
        cmdclass={"build_ext": BuildExtension} if not _is_neuron() else {},
        package_data=package_data,
    )
    
  • cover/vllm/__init__.py:在vllm的初始化文件里导入vllm_npu,以实现适配功能的接入。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    """vLLM: a high-throughput and memory-efficient inference engine for LLMs"""
    import vllm_npu
    from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs
    from vllm.engine.async_llm_engine import AsyncLLMEngine
    from vllm.engine.llm_engine import LLMEngine
    from vllm.entrypoints.llm import LLM
    from vllm.executor.ray_utils import initialize_ray_cluster
    from vllm.model_executor.models import ModelRegistry
    from vllm.outputs import CompletionOutput, RequestOutput
    from vllm.sampling_params import SamplingParams
    __version__ = "0.4.2"
    __all__ = [
        "LLM",
        "ModelRegistry",
        "SamplingParams",
        "RequestOutput",
        "CompletionOutput",
        "LLMEngine",
        "EngineArgs",
        "AsyncLLMEngine",
        "AsyncEngineArgs",
        "initialize_ray_cluster",
    ]
    
  • vllm_npu/README.md:空白文件
  • vllm_npu/requirements.txt:vllm_npu模块所需要的python依赖库
    numpy
    decorator
    attrs
    psutil
    absl-py
    cloudpickle
    scipy
    tornado
    transformers
    accelerate
    pandas
  • vllm_npu/setup.py:vllm_npu模块的安装脚本
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    import io
    import os
    import re
    from typing import List
    import setuptools
    ROOT_DIR = os.path.dirname(__file__)
    def get_path(*filepath) -> str:
        return os.path.join(ROOT_DIR, *filepath)
    def find_version(filepath: str):
        """Extract version information from the given filepath.
        """
        with open(filepath) as fp:
            version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
                                      fp.read(), re.M)
            if version_match:
                return version_match.group(1)
            raise RuntimeError("Unable to find version string.")
    def read_readme() -> str:
        """Read the README file."""
        return io.open(get_path("README.md"), "r", encoding="utf-8").read()
    def get_requirements() -> List[str]:
        """Get Python package dependencies from requirements.txt."""
        with open(get_path("requirements.txt")) as f:
            requirements = f.read().strip().split("\n")
        return requirements
    setuptools.setup(
        name="vllm_npu",
        version=find_version(get_path("vllm_npu", "__init__.py")),
        author="Huawei",
        license="Apache 2.0",
        description=("A high-throughput and memory-efficient inference and "
                     "serving engine for LLMs"),
        long_description=read_readme(),
        long_description_content_type="text/markdown",
        url="",
        project_urls={
            "Homepage": "",
            "Documentation": "",
        },
        classifiers=[
            "Programming Language :: Python :: 3.8",
            "Programming Language :: Python :: 3.9",
            "Programming Language :: Python :: 3.10",
            "Programming Language :: Python :: 3.11",
            "Topic :: Scientific/Engineering :: Artificial Intelligence",
        ],
        packages=setuptools.find_packages(exclude=("benchmarks", "examples", "tests")),
        python_requires=">=3.8",
        install_requires=get_requirements(),
    )
    
  • vllm_npu/vllm_npu/attention/__init__.py:在attention模块的初始化过程中,对vllm里的get_attn_backend函数进行热替换
    1
    2
    3
    4
    5
    6
    7
    from vllm_npu.attention.selector import get_attn_backend
    import vllm.attention.selector as selector
    import vllm.worker.model_runner as mr
    import vllm.worker.cache_engine as ce
    selector.get_attn_backend = get_attn_backend
    mr.get_attn_backend = get_attn_backend
    ce.get_attn_backend = get_attn_backend
    
  • vllm_npu/vllm_npu/attention/backends.py:昇腾环境下使用的attention后端实现。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    # Part of codes in this file was copied from project [vLLM Team][vllm]
    from dataclasses import dataclass
    from typing import List, Optional, Tuple
    from vllm.attention.backends.abstract import (AttentionBackend,
                                                  AttentionMetadataPerStage)
    import torch
    def get_kv_cache_shape(
            num_blocks: int,
            block_size: int,
            num_kv_heads: int,
            head_size: int,
        ) -> Tuple[int, ...]:
            return (2, num_blocks, block_size, num_kv_heads, head_size)
    class AscendAttentionBackend(AttentionBackend):
        @staticmethod
        def get_name() -> str:
            return "ascend-attn-backend"
        @staticmethod
        def get_impl_cls():
            return None
        @staticmethod
        def make_metadata(*args, **kwargs) -> "AttentionMetadata":
            return AttentionMetadata(*args, **kwargs)
        @staticmethod
        def get_kv_cache_shape(
            num_blocks: int,
            block_size: int,
            num_kv_heads: int,
            head_size: int,
        ) -> Tuple[int, ...]:
            return get_kv_cache_shape(num_blocks, block_size, num_kv_heads, head_size)
        @staticmethod
        def swap_blocks(
            src_kv_cache: torch.Tensor,
            dst_kv_cache: torch.Tensor,
            src_to_dst: torch.Tensor,
        ) -> None:
            pass
        @staticmethod
        def copy_blocks(
            kv_caches: List[torch.Tensor],
            src_to_dists: torch.Tensor,
        ) -> None:
            pass
    @dataclass
    class AttentionMetadata(AttentionMetadataPerStage):
        """Metadata for AscendAttentionBackend.
        """
        # Currently, input sequences can only contain all prompts
        # or all decoding. True if all sequences are prompts.
        is_prompt: bool
        # (batch_size,). The sequence length per sequence. Sequence length means
        # the computed tokens + new tokens None if it is a decoding.
        seq_lens: Optional[List[int]]
        # seq_lens stored as a tensor.
        seq_lens_tensor: Optional[torch.Tensor]
        # Maximum query length in the batch.
        max_query_len: Optional[int]
        # Maximum sequence length in the batch.
        max_seq_len: Optional[int]
        # (batch_size + 1,). The cumulative subquery lengths of the sequences in
        # the batch, used to index into subquery. E.g., if the subquery length
        # is [4, 6], it is [0, 4, 10].
        subquery_start_loc: Optional[torch.Tensor]
        # (batch_size + 1,). The cumulative sequence lengths of the sequences in
        # the batch, used to index into sequence. E.g., if the sequence length is
        # [4, 6], it is [0, 4, 10].
        seq_start_loc: Optional[torch.Tensor]
        # (batch_size,) A tensor of context lengths (tokens that are computed
        # so far).
        context_lens_tensor: Optional[torch.Tensor]
        block_tables: Optional[torch.Tensor]
        # Whether or not if cuda graph is enabled.
        use_cuda_graph: bool
    
  • vllm_npu/vllm_npu/attention/selector.py:定义返回昇腾后端的get_attn_backend函数,用于替换vllm原生的对应函数
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    # Part of codes in this file was copied from project [vLLM Team][vllm]
    from functools import lru_cache
    from typing import Type
    import torch
    from vllm.attention.backends.abstract import AttentionBackend
    from vllm.logger import init_logger
    logger = init_logger(__name__)
    @lru_cache(maxsize=None)
    def get_attn_backend(dtype: torch.dtype) -> Type[AttentionBackend]:
        logger.info("Using Ascend backend.")
        from vllm_npu.attention.backends import AscendAttentionBackend
        return AscendAttentionBackend
    
  • vllm_npu/vllm_npu/core/__init__.py:空白文件。
  • vllm_npu/vllm_npu/engine/__init__.py:在engine模块初始化过程中替换实例化LLMEngine和AsyncLLMEngine类的from_engine_args函数,并替换vllm的arg_utils模块中的EngineArgs类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    from vllm.engine.llm_engine import LLMEngine
    from vllm.engine.async_llm_engine import AsyncLLMEngine
    import vllm.engine.arg_utils as v_arg_utils
    from vllm_npu.engine.ascend_engine import from_engine_args
    from vllm_npu.engine.async_ascend_engine import from_engine_args_async
    from .arg_utils import EngineArgs
    LLMEngine.from_engine_args = from_engine_args
    AsyncLLMEngine.from_engine_args = from_engine_args_async
    v_arg_utils.EngineArgs = EngineArgs
    
  • vllm_npu/vllm_npu/engine/ascend_engine.py:重写实例化LLMEngine类的from_engine_args函数,以实现昇腾环境下Executor类的载入
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    from vllm.engine.arg_utils import EngineArgs
    from vllm.usage.usage_lib import UsageContext
    from vllm_npu.executor.ray_utils import initialize_ray_cluster
    @classmethod
    def from_engine_args(
        cls,
        engine_args: EngineArgs,
        usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
    ) -> "LLMEngine":
        """Creates an LLM engine from the engine arguments."""
        # Create the engine configs.
        engine_config = engine_args.create_engine_config()
        # Initialize the cluster and specify the executor class.
        if engine_config.device_config.device_type == "neuron":
            from vllm.executor.neuron_executor import NeuronExecutor
            executor_class = NeuronExecutor
        elif engine_config.device_config.device_type == "cpu":
            from vllm.executor.cpu_executor import CPUExecutor
            executor_class = CPUExecutor
        elif engine_config.device_config.device_type == "npu":
            if engine_config.parallel_config.worker_use_ray:
                initialize_ray_cluster(engine_config.parallel_config)
                from vllm_npu.executor.ascend_ray_executor import RayAscendExecutor
                executor_class = RayAscendExecutor
            else:
                from vllm_npu.executor.ascend_executor import AscendExecutor
                executor_class = AscendExecutor
        elif engine_config.parallel_config.worker_use_ray:
            initialize_ray_cluster(engine_config.parallel_config)
            from vllm.executor.ray_gpu_executor import RayGPUExecutor
            executor_class = RayGPUExecutor
        else:
            if engine_config.parallel_config.world_size != 1:
                raise ValueError("Ray is required if parallel_config.world_size > 1.")
            from vllm.executor.gpu_executor import GPUExecutor
            executor_class = GPUExecutor
        # Create the LLM engine.
        engine = cls(
            **engine_config.to_dict(),
            executor_class=executor_class,
            log_stats=not engine_args.disable_log_stats,
            usage_context=usage_context,
        )
        return engine
    
  • vllm_npu/vllm_npu/engine/async_ascend_engine.py:重写实例化AsyncLLMEngine类的from_engine_args函数,以实现昇腾环境下Executor类的载入
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    from vllm.engine.arg_utils import AsyncEngineArgs
    from vllm.usage.usage_lib import UsageContext
    from vllm_npu.executor.ray_utils import initialize_ray_cluster
    @classmethod
    def from_engine_args_async(
        cls,
        engine_args: AsyncEngineArgs,
        start_engine_loop: bool = True,
        usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
    ) -> "AsyncLLMEngine":
        """Creates an async LLM engine from the engine arguments."""
        # Create the engine configs.
        engine_config = engine_args.create_engine_config()
        if engine_config.device_config.device_type == "neuron":
            from vllm.executor.neuron_executor import NeuronExecutorAsync
            executor_class = NeuronExecutorAsync
        elif engine_config.device_config.device_type == "cpu":
            if engine_config.parallel_config.worker_use_ray:
                raise RuntimeError("Ray is not supported with the CPU backend.")
            from vllm.executor.cpu_executor import CPUExecutorAsync
            executor_class = CPUExecutorAsync
        elif engine_config.device_config.device_type == "npu":
            if engine_config.parallel_config.worker_use_ray:
                initialize_ray_cluster(engine_config.parallel_config)
                from vllm_npu.executor.ascend_ray_executor import RayAscendExecutorAsync
                executor_class = RayAscendExecutorAsync
            else:
                from vllm_npu.executor.ascend_executor import AscendExecutorAsync
                executor_class = AscendExecutorAsync
        elif engine_config.parallel_config.worker_use_ray:
            initialize_ray_cluster(engine_config.parallel_config)
            from vllm.executor.ray_gpu_executor import RayGPUExecutorAsync
            executor_class = RayGPUExecutorAsync
        else:
            if engine_config.parallel_config.world_size != 1:
                raise RuntimeError("Ray is required if parallel_config.world_size > 1.")
            from vllm.executor.gpu_executor import GPUExecutorAsync
            executor_class = GPUExecutorAsync
        # Create the async LLM engine.
        engine = cls(
            engine_config.parallel_config.worker_use_ray,
            engine_args.engine_use_ray,
            **engine_config.to_dict(),
            executor_class=executor_class,
            log_requests=not engine_args.disable_log_requests,
            log_stats=not engine_args.disable_log_stats,
            max_log_len=engine_args.max_log_len,
            start_engine_loop=start_engine_loop,
            usage_context=usage_context,
        )
        return engine
    
  • vllm_npu/vllm_npu/engine/arg_utils.py:重写vllm的arg_utils模块中的EngineArgs类,为block_size参数添加128的可选项
    import argparse
    import dataclasses
    from dataclasses import dataclass
    from typing import List, Optional, Union
    from vllm.config import (
        CacheConfig,
        DecodingConfig,
        EngineConfig,
        LoadConfig,
        LoRAConfig,
        ModelConfig,
        ParallelConfig,
        SchedulerConfig,
        SpeculativeConfig,
        TokenizerPoolConfig,
        VisionLanguageConfig,
    )
    from vllm.model_executor.layers.quantization import QUANTIZATION_METHODS
    from vllm.utils import str_to_int_tuple
    from vllm_npu.config import DeviceConfig
    DEFAULT_TYPE = "auto"
    def nullable_str(val: str):
        if not val or val == "None":
            return None
        return val
    @dataclass
    class EngineArgs:
        """Arguments for vLLM engine."""
        model: str
        served_model_name: Optional[Union[List[str]]] = None
        tokenizer: Optional[str] = None
        skip_tokenizer_init: bool = False
        tokenizer_mode: str = DEFAULT_TYPE
        trust_remote_code: bool = False
        download_dir: Optional[str] = None
        load_format: str = DEFAULT_TYPE
        dtype: str = DEFAULT_TYPE
        kv_cache_dtype: str = DEFAULT_TYPE
        quantization_param_path: Optional[str] = None
        seed: int = 0
        max_model_len: Optional[int] = None
        worker_use_ray: bool = False
        pipeline_parallel_size: int = 1
        tensor_parallel_size: int = 1
        max_parallel_loading_workers: Optional[int] = None
        block_size: int = 128
        enable_prefix_caching: bool = False
        use_v2_block_manager: bool = False
        swap_space: int = 4  # GiB
        gpu_memory_utilization: float = 0.90
        max_num_batched_tokens: Optional[int] = None
        max_num_seqs: int = 256
        max_logprobs: int = 5  # OpenAI default value
        disable_log_stats: bool = False
        revision: Optional[str] = None
        code_revision: Optional[str] = None
        tokenizer_revision: Optional[str] = None
        quantization: Optional[str] = None
        enforce_eager: bool = True
        max_context_len_to_capture: Optional[int] = None
        max_seq_len_to_capture: int = 8192
        disable_custom_all_reduce: bool = False
        tokenizer_pool_size: int = 0
        tokenizer_pool_type: str = "ray"
        tokenizer_pool_extra_config: Optional[dict] = None
        enable_lora: bool = False
        max_loras: int = 1
        max_lora_rank: int = 16
        fully_sharded_loras: bool = False
        lora_extra_vocab_size: int = 256
        lora_dtype = DEFAULT_TYPE
        max_cpu_loras: Optional[int] = None
        device: str = DEFAULT_TYPE
        ray_workers_use_nsight: bool = False
        num_gpu_blocks_override: Optional[int] = None
        num_lookahead_slots: int = 0
        model_loader_extra_config: Optional[dict] = None
        # Related to Vision-language models such as llava
        image_input_type: Optional[str] = None
        image_token_id: Optional[int] = None
        image_input_shape: Optional[str] = None
        image_feature_size: Optional[int] = None
        scheduler_delay_factor: float = 0.0
        enable_chunked_prefill: bool = False
        guided_decoding_backend: str = "outlines"
        # Speculative decoding configuration.
        speculative_model: Optional[str] = None
        num_speculative_tokens: Optional[int] = None
        speculative_max_model_len: Optional[int] = None
        ngram_prompt_lookup_max: Optional[int] = None
        ngram_prompt_lookup_min: Optional[int] = None
        def __post_init__(self):
            if self.tokenizer is None:
                self.tokenizer = self.model
        @staticmethod
        def add_cli_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
            """Shared CLI arguments for vLLM engine."""
            # Model arguments
            parser.add_argument(
                "--model",
                type=str,
                default="facebook/opt-125m",
                help="Name or path of the huggingface model to use.",
            )
            parser.add_argument(
                "--tokenizer",
                type=nullable_str,
                default=EngineArgs.tokenizer,
                help="Name or path of the huggingface tokenizer to use.",
            )
            parser.add_argument(
                "--skip-tokenizer-init",
                action="store_true",
                help="Skip initialization of tokenizer and detokenizer",
            )
            parser.add_argument(
                "--revision",
                type=nullable_str,
                default=None,
                help="The specific model version to use. It can be a branch "
                "name, a tag name, or a commit id. If unspecified, will use "
                "the default version.",
            )
            parser.add_argument(
                "--code-revision",
                type=nullable_str,
                default=None,
                help="The specific revision to use for the model code on "
                "Hugging Face Hub. It can be a branch name, a tag name, or a "
                "commit id. If unspecified, will use the default version.",
            )
            parser.add_argument(
                "--tokenizer-revision",
                type=nullable_str,
                default=None,
                help="The specific tokenizer version to use. It can be a branch "
                "name, a tag name, or a commit id. If unspecified, will use "
                "the default version.",
            )
            parser.add_argument(
                "--tokenizer-mode",
                type=str,
                default=EngineArgs.tokenizer_mode,
                choices=[DEFAULT_TYPE, "slow"],
                help='The tokenizer mode.\n\n* "auto" will use the '
                'fast tokenizer if available.\n* "slow" will '
                "always use the slow tokenizer.",
            )
            parser.add_argument(
                "--trust-remote-code",
                action="store_true",
                help="Trust remote code from huggingface.",
            )
            parser.add_argument(
                "--download-dir",
                type=nullable_str,
                default=EngineArgs.download_dir,
                help="Directory to download and load the weights, "
                "default to the default cache dir of "
                "huggingface.",
            )
            parser.add_argument(
                "--load-format",
                type=str,
                default=EngineArgs.load_format,
                choices=[
                    DEFAULT_TYPE,
                    "pt",
                    "safetensors",
                    "npcache",
                    "dummy",
                    "tensorizer",
                ],
                help="The format of the model weights to load.\n\n"
                '* "auto" will try to load the weights in the safetensors format '
                "and fall back to the pytorch bin format if safetensors format "
                "is not available.\n"
                '* "pt" will load the weights in the pytorch bin format.\n'
                '* "safetensors" will load the weights in the safetensors format.\n'
                '* "npcache" will load the weights in pytorch format and store '
                "a numpy cache to speed up the loading.\n"
                '* "dummy" will initialize the weights with random values, '
                "which is mainly for profiling.\n"
                '* "tensorizer" will load the weights using tensorizer from '
                "CoreWeave which assumes tensorizer_uri is set to the location of "
                "the serialized weights.",
            )
            parser.add_argument(
                "--dtype",
                type=str,
                default=EngineArgs.dtype,
                choices=[DEFAULT_TYPE, "half", "float16", "bfloat16", "float", "float32"],
                help="Data type for model weights and activations.\n\n"
                '* "auto" will use FP16 precision for FP32 and FP16 models, and '
                "BF16 precision for BF16 models.\n"
                '* "half" for FP16. Recommended for AWQ quantization.\n'
                '* "float16" is the same as "half".\n'
                '* "bfloat16" for a balance between precision and range.\n'
                '* "float" is shorthand for FP32 precision.\n'
                '* "float32" for FP32 precision.',
            )
            parser.add_argument(
                "--kv-cache-dtype",
                type=str,
                choices=[DEFAULT_TYPE, "fp8"],
                default=EngineArgs.kv_cache_dtype,
                help='Data type for kv cache storage. If "auto", will use model '
                "data type. FP8_E5M2 (without scaling) is only supported on cuda "
                "version greater than 11.8. On ROCm (AMD GPU), FP8_E4M3 is instead "
                "supported for common inference criteria.",
            )
            parser.add_argument(
                "--quantization-param-path",
                type=nullable_str,
                default=None,
                help="Path to the JSON file containing the KV cache "
                "scaling factors. This should generally be supplied, when "
                "KV cache dtype is FP8. Otherwise, KV cache scaling factors "
                "default to 1.0, which may cause accuracy issues. "
                "FP8_E5M2 (without scaling) is only supported on cuda version"
                "greater than 11.8. On ROCm (AMD GPU), FP8_E4M3 is instead "
                "supported for common inference criteria.",
            )
            parser.add_argument(
                "--max-model-len",
                type=int,
                default=EngineArgs.max_model_len,
                help="Model context length. If unspecified, will "
                "be automatically derived from the model config.",
            )
            parser.add_argument(
                "--guided-decoding-backend",
                type=str,
                default="outlines",
                choices=["outlines", "lm-format-enforcer"],
                help="Which engine will be used for guided decoding"
                " (JSON schema / regex etc) by default. Currently support "
                "https://github.com/outlines-dev/outlines and "
                "https://github.com/noamgat/lm-format-enforcer."
                " Can be overridden per request via guided_decoding_backend"
                " parameter.",
            )
            # Parallel arguments
            parser.add_argument(
                "--worker-use-ray",
                action="store_true",
                help="Use Ray for distributed serving, will be "
                "automatically set when using more than 1 GPU.",
            )
            parser.add_argument(
                "--pipeline-parallel-size",
                "-pp",
                type=int,
                default=EngineArgs.pipeline_parallel_size,
                help="Number of pipeline stages.",
            )
            parser.add_argument(
                "--tensor-parallel-size",
                "-tp",
                type=int,
                default=EngineArgs.tensor_parallel_size,
                help="Number of tensor parallel replicas.",
            )
            parser.add_argument(
                "--max-parallel-loading-workers",
                type=int,
                default=EngineArgs.max_parallel_loading_workers,
                help="Load model sequentially in multiple batches, "
                "to avoid RAM OOM when using tensor "
                "parallel and large models.",
            )
            parser.add_argument(
                "--ray-workers-use-nsight",
                action="store_true",
                help="If specified, use nsight to profile Ray workers.",
            )
            # KV cache arguments
            parser.add_argument(
                "--block-size",
                type=int,
                default=EngineArgs.block_size,
                choices=[8, 16, 32, 128],
                help="Token block size for contiguous chunks of " "tokens.",
            )
            parser.add_argument(
                "--enable-prefix-caching",
                action="store_true",
                help="Enables automatic prefix caching.",
            )
            parser.add_argument(
                "--use-v2-block-manager",
                action="store_true",
                help="Use BlockSpaceMangerV2.",
            )
            parser.add_argument(
                "--num-lookahead-slots",
                type=int,
                default=EngineArgs.num_lookahead_slots,
                help="Experimental scheduling config necessary for "
                "speculative decoding. This will be replaced by "
                "speculative config in the future; it is present "
                "to enable correctness tests until then.",
            )
            parser.add_argument(
                "--seed",
                type=int,
                default=EngineArgs.seed,
                help="Random seed for operations.",
            )
            parser.add_argument(
                "--swap-space",
                type=int,
                default=EngineArgs.swap_space,
                help="CPU swap space size (GiB) per GPU.",
            )
            parser.add_argument(
                "--gpu-memory-utilization",
                type=float,
                default=EngineArgs.gpu_memory_utilization,
                help="The fraction of GPU memory to be used for the model "
                "executor, which can range from 0 to 1. For example, a value of "
                "0.5 would imply 50%% GPU memory utilization. If unspecified, "
                "will use the default value of 0.9.",
            )
            parser.add_argument(
                "--num-gpu-blocks-override",
                type=int,
                default=None,
                help="If specified, ignore GPU profiling result and use this number"
                "of GPU blocks. Used for testing preemption.",
            )
            parser.add_argument(
                "--max-num-batched-tokens",
                type=int,
                default=EngineArgs.max_num_batched_tokens,
                help="Maximum number of batched tokens per " "iteration.",
            )
            parser.add_argument(
                "--max-num-seqs",
                type=int,
                default=EngineArgs.max_num_seqs,
                help="Maximum number of sequences per iteration.",
            )
            parser.add_argument(
                "--max-logprobs",
                type=int,
                default=EngineArgs.max_logprobs,
                help=(
                    "Max number of log probs to return logprobs is specified in"
                    " SamplingParams."
                ),
            )
            parser.add_argument(
                "--disable-log-stats",
                action="store_true",
                help="Disable logging statistics.",
            )
            # Quantization settings.
            parser.add_argument(
                "--quantization",
                "-q",
                type=nullable_str,
                choices=[*QUANTIZATION_METHODS, None],
                default=EngineArgs.quantization,
                help="Method used to quantize the weights. If "
                "None, we first check the `quantization_config` "
                "attribute in the model config file. If that is "
                "None, we assume the model weights are not "
                "quantized and use `dtype` to determine the data "
                "type of the weights.",
            )
            parser.add_argument(
                "--enforce-eager",
                action="store_true",
                help="Always use eager-mode PyTorch. If False, "
                "will use eager mode and CUDA graph in hybrid "
                "for maximal performance and flexibility.",
            )
            parser.add_argument(
                "--max-context-len-to-capture",
                type=int,
                default=EngineArgs.max_context_len_to_capture,
                help="Maximum context length covered by CUDA "
                "graphs. When a sequence has context length "
                "larger than this, we fall back to eager mode. "
                "(DEPRECATED. Use --max-seq_len-to-capture instead"
                ")",
            )
            parser.add_argument(
                "--max-seq_len-to-capture",
                type=int,
                default=EngineArgs.max_seq_len_to_capture,
                help="Maximum sequence length covered by CUDA "
                "graphs. When a sequence has context length "
                "larger than this, we fall back to eager mode.",
            )
            parser.add_argument(
                "--disable-custom-all-reduce",
                action="store_true",
                default=EngineArgs.disable_custom_all_reduce,
                help="See ParallelConfig.",
            )
            parser.add_argument(
                "--tokenizer-pool-size",
                type=int,
                default=EngineArgs.tokenizer_pool_size,
                help="Size of tokenizer pool to use for "
                "asynchronous tokenization. If 0, will "
                "use synchronous tokenization.",
            )
            parser.add_argument(
                "--tokenizer-pool-type",
                type=str,
                default=EngineArgs.tokenizer_pool_type,
                help="Type of tokenizer pool to use for "
                "asynchronous tokenization. Ignored "
                "if tokenizer_pool_size is 0.",
            )
            parser.add_argument(
                "--tokenizer-pool-extra-config",
                type=nullable_str,
                default=EngineArgs.tokenizer_pool_extra_config,
                help="Extra config for tokenizer pool. "
                "This should be a JSON string that will be "
                "parsed into a dictionary. Ignored if "
                "tokenizer_pool_size is 0.",
            )
            # LoRA related configs
            parser.add_argument(
                "--enable-lora",
                action="store_true",
                help="If True, enable handling of LoRA adapters.",
            )
            parser.add_argument(
                "--max-loras",
                type=int,
                default=EngineArgs.max_loras,
                help="Max number of LoRAs in a single batch.",
            )
            parser.add_argument(
                "--max-lora-rank",
                type=int,
                default=EngineArgs.max_lora_rank,
                help="Max LoRA rank.",
            )
            parser.add_argument(
                "--lora-extra-vocab-size",
                type=int,
                default=EngineArgs.lora_extra_vocab_size,
                help=(
                    "Maximum size of extra vocabulary that can be "
                    "present in a LoRA adapter (added to the base "
                    "model vocabulary)."
                ),
            )
            parser.add_argument(
                "--lora-dtype",
                type=str,
                default=EngineArgs.lora_dtype,
                choices=[DEFAULT_TYPE, "float16", "bfloat16", "float32"],
                help=("Data type for LoRA. If auto, will default to " "base model dtype."),
            )
            parser.add_argument(
                "--max-cpu-loras",
                type=int,
                default=EngineArgs.max_cpu_loras,
                help=(
                    "Maximum number of LoRAs to store in CPU memory. "
                    "Must be >= than max_num_seqs. "
                    "Defaults to max_num_seqs."
                ),
            )
            parser.add_argument(
                "--fully-sharded-loras",
                action="store_true",
                help=(
                    "By default, only half of the LoRA computation is "
                    "sharded with tensor parallelism. "
                    "Enabling this will use the fully sharded layers. "
                    "At high sequence length, max rank or "
                    "tensor parallel size, this is likely faster."
                ),
            )
            parser.add_argument(
                "--device",
                type=str,
                default=EngineArgs.device,
                choices=["auto", "cuda", "neuron", "cpu", "npu"],
                help="Device type for vLLM execution.",
            )
            # Related to Vision-language models such as llava
            parser.add_argument(
                "--image-input-type",
                type=nullable_str,
                default=None,
                choices=[t.name.lower() for t in VisionLanguageConfig.ImageInputType],
                help=(
                    "The image input type passed into vLLM. "
                    'Should be one of "pixel_values" or "image_features".'
                ),
            )
            parser.add_argument(
                "--image-token-id",
                type=int,
                default=None,
                help=("Input id for image token."),
            )
            parser.add_argument(
                "--image-input-shape",
                type=nullable_str,
                default=None,
                help=(
                    "The biggest image input shape (worst for memory footprint) "
                    "given an input type. Only used for vLLM's profile_run."
                ),
            )
            parser.add_argument(
                "--image-feature-size",
                type=int,
                default=None,
                help=("The image feature size along the context dimension."),
            )
            parser.add_argument(
                "--scheduler-delay-factor",
                type=float,
                default=EngineArgs.scheduler_delay_factor,
                help="Apply a delay (of delay factor multiplied by previous"
                "prompt latency) before scheduling next prompt.",
            )
            parser.add_argument(
                "--enable-chunked-prefill",
                action="store_true",
                help="If set, the prefill requests can be chunked based on the "
                "max_num_batched_tokens.",
            )
            parser.add_argument(
                "--speculative-model",
                type=nullable_str,
                default=EngineArgs.speculative_model,
                help="The name of the draft model to be used in speculative decoding.",
            )
            parser.add_argument(
                "--num-speculative-tokens",
                type=int,
                default=EngineArgs.num_speculative_tokens,
                help="The number of speculative tokens to sample from "
                "the draft model in speculative decoding.",
            )
            parser.add_argument(
                "--speculative-max-model-len",
                type=int,
                default=EngineArgs.speculative_max_model_len,
                help="The maximum sequence length supported by the "
                "draft model. Sequences over this length will skip "
                "speculation.",
            )
            parser.add_argument(
                "--ngram-prompt-lookup-max",
                type=int,
                default=EngineArgs.ngram_prompt_lookup_max,
                help="Max size of window for ngram prompt lookup in speculative "
                "decoding.",
            )
            parser.add_argument(
                "--ngram-prompt-lookup-min",
                type=int,
                default=EngineArgs.ngram_prompt_lookup_min,
                help="Min size of window for ngram prompt lookup in speculative "
                "decoding.",
            )
            parser.add_argument(
                "--model-loader-extra-config",
                type=nullable_str,
                default=EngineArgs.model_loader_extra_config,
                help="Extra config for model loader. "
                "This will be passed to the model loader "
                "corresponding to the chosen load_format. "
                "This should be a JSON string that will be "
                "parsed into a dictionary.",
            )
            parser.add_argument(
                "--served-model-name",
                nargs="+",
                type=str,
                default=None,
                help="The model name(s) used in the API. If multiple "
                "names are provided, the server will respond to any "
                "of the provided names. The model name in the model "
                "field of a response will be the first name in this "
                "list. If not specified, the model name will be the "
                "same as the `--model` argument. Noted that this name(s)"
                "will also be used in `model_name` tag content of "
                "prometheus metrics, if multiple names provided, metrics"
                "tag will take the first one.",
            )
            return parser
        @classmethod
        def from_cli_args(cls, args: argparse.Namespace) -> "EngineArgs":
            # Get the list of attributes of this dataclass.
            attrs = [attr.name for attr in dataclasses.fields(cls)]
            # Set the attributes from the parsed arguments.
            engine_args = cls(**{attr: getattr(args, attr) for attr in attrs})
            return engine_args
        def create_engine_config(
            self,
        ) -> EngineConfig:
            device_config = DeviceConfig(self.device)
            model_config = ModelConfig(
                self.model,
                self.tokenizer,
                self.tokenizer_mode,
                self.trust_remote_code,
                self.dtype,
                self.seed,
                self.revision,
                self.code_revision,
                self.tokenizer_revision,
                self.max_model_len,
                self.quantization,
                self.quantization_param_path,
                self.enforce_eager,
                self.max_context_len_to_capture,
                self.max_seq_len_to_capture,
                self.max_logprobs,
                self.skip_tokenizer_init,
                self.served_model_name,
            )
            cache_config = CacheConfig(
                self.block_size,
                self.gpu_memory_utilization,
                self.swap_space,
                self.kv_cache_dtype,
                self.num_gpu_blocks_override,
                model_config.get_sliding_window(),
                self.enable_prefix_caching,
            )
            parallel_config = ParallelConfig(
                self.pipeline_parallel_size,
                self.tensor_parallel_size,
                self.worker_use_ray,
                self.max_parallel_loading_workers,
                self.disable_custom_all_reduce,
                TokenizerPoolConfig.create_config(
                    self.tokenizer_pool_size,
                    self.tokenizer_pool_type,
                    self.tokenizer_pool_extra_config,
                ),
                self.ray_workers_use_nsight,
            )
            speculative_config = SpeculativeConfig.maybe_create_spec_config(
                target_model_config=model_config,
                target_parallel_config=parallel_config,
                target_dtype=self.dtype,
                speculative_model=self.speculative_model,
                num_speculative_tokens=self.num_speculative_tokens,
                speculative_max_model_len=self.speculative_max_model_len,
                enable_chunked_prefill=self.enable_chunked_prefill,
                use_v2_block_manager=self.use_v2_block_manager,
                ngram_prompt_lookup_max=self.ngram_prompt_lookup_max,
                ngram_prompt_lookup_min=self.ngram_prompt_lookup_min,
            )
            scheduler_config = SchedulerConfig(
                self.max_num_batched_tokens,
                self.max_num_seqs,
                model_config.max_model_len,
                self.use_v2_block_manager,
                num_lookahead_slots=(
                    self.num_lookahead_slots
                    if speculative_config is None
                    else speculative_config.num_lookahead_slots
                ),
                delay_factor=self.scheduler_delay_factor,
                enable_chunked_prefill=self.enable_chunked_prefill,
            )
            lora_config = (
                LoRAConfig(
                    max_lora_rank=self.max_lora_rank,
                    max_loras=self.max_loras,
                    fully_sharded_loras=self.fully_sharded_loras,
                    lora_extra_vocab_size=self.lora_extra_vocab_size,
                    lora_dtype=self.lora_dtype,
                    max_cpu_loras=(
                        self.max_cpu_loras
                        if self.max_cpu_loras and self.max_cpu_loras > 0
                        else None
                    ),
                )
                if self.enable_lora
                else None
            )
            load_config = LoadConfig(
                load_format=self.load_format,
                download_dir=self.download_dir,
                model_loader_extra_config=self.model_loader_extra_config,
            )
            if self.image_input_type:
                if (
                    not self.image_token_id
                    or not self.image_input_shape
                    or not self.image_feature_size
                ):
                    raise ValueError(
                        "Specify `image_token_id`, `image_input_shape` and "
                        "`image_feature_size` together with `image_input_type`."
                    )
                vision_language_config = VisionLanguageConfig(
                    image_input_type=VisionLanguageConfig.get_image_input_enum_type(
                        self.image_input_type
                    ),
                    image_token_id=self.image_token_id,
                    image_input_shape=str_to_int_tuple(self.image_input_shape),
                    image_feature_size=self.image_feature_size,
                )
            else:
                vision_language_config = None
            decoding_config = DecodingConfig(
                guided_decoding_backend=self.guided_decoding_backend
            )
            return EngineConfig(
                model_config=model_config,
                cache_config=cache_config,
                parallel_config=parallel_config,
                scheduler_config=scheduler_config,
                device_config=device_config,
                lora_config=lora_config,
                vision_language_config=vision_language_config,
                speculative_config=speculative_config,
                load_config=load_config,
                decoding_config=decoding_config,
            )
  • vllm_npu/vllm_npu/executor/__init__.py:在executor的初始化模块里替换ray集群的初始化函数
    1
    2
    3
    from vllm_npu.executor.ray_utils import initialize_ray_cluster
    from vllm.executor import ray_utils
    ray_utils.initialize_ray_cluster = initialize_ray_cluster
    
  • vllm_npu/vllm_npu/executor/ascend_executor.py:实现了昇腾环境下单卡推理所需的AscendExecutor和AscendExecutorAsync
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    # Part of codes in this file was copied from project [vLLM Team][vllm]
    from typing import List, Set, Tuple
    from vllm.executor.executor_base import ExecutorAsyncBase, ExecutorBase
    from vllm.logger import init_logger
    from vllm.lora.request import LoRARequest
    from vllm.sequence import ExecuteModelRequest, SamplerOutput
    from vllm.utils import (get_distributed_init_method, get_ip, get_open_port,
                            make_async)
    from vllm_npu.worker.ascend_worker import AscendWorker
    logger = init_logger(__name__)
    class AscendExecutor(ExecutorBase):
        def _init_executor(self) -> None:
            assert (not self.speculative_config
                ), "Speculative decoding is not yet supported for Ascend backend."
            # Instantiate the worker and load the model to the device.
            self._init_worker()
        def _init_worker(self):
            distributed_init_method = get_distributed_init_method(
                    get_ip(), get_open_port())
            self.driver_worker = AscendWorker(
                self.model_config,
                self.parallel_config,
                self.scheduler_config,
                self.device_config,
                self.cache_config,
                self.load_config,
                local_rank=0,
                rank=0,
                distributed_init_method=distributed_init_method,
                lora_config=self.lora_config,
                is_driver_worker=True,
            )
            self.driver_worker.init_device()
            self.driver_worker.load_model()
        def determine_num_available_blocks(self) -> Tuple[int, int]:
            """Determine the number of available KV blocks by invoking the
            underlying worker.
            """
            return self.driver_worker.determine_num_available_blocks()
        def initialize_cache(self, num_gpu_blocks: int,
                             num_cpu_blocks: int) -> None:
            """Initialize the KV cache by invoking the underlying worker.
            """
            self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks)
        def execute_model(
                self,
                execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
            output = self.driver_worker.execute_model(execute_model_req)
            return output
        def add_lora(self, lora_request: LoRARequest) -> bool:
            return self.driver_worker.add_lora(lora_request)
        def remove_lora(self, lora_id: int) -> bool:
            return self.driver_worker.remove_lora(lora_id)
        def list_loras(self) -> Set[int]:
            return self.driver_worker.list_loras()
        def check_health(self) -> None:
            # NeuronExecutor will always be healthy as long as
            # it's running.
            return
    class AscendExecutorAsync(AscendExecutor, ExecutorAsyncBase):
        async def execute_model_async(
            self,
            execute_model_req: ExecuteModelRequest,
        ) -> List[SamplerOutput]:
            output = await make_async(
                self.driver_worker.execute_model
            )(execute_model_req=execute_model_req,)
            return output
        # async def check_health_async(self) -> None:
        #     # AscendExecutor will always be healthy as long as
        #     # it's running.
        #     return
    
  • vllm_npu/vllm_npu/executor/ascend_ray_executor.py:实现了昇腾环境下基于Ray进行多卡推理所需的RayAscendExecutor和RayAscendExecutorAsync
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    import asyncio
    import os
    import pickle
    from collections import defaultdict
    from itertools import islice, repeat
    from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
    import vllm.envs as envs
    from vllm.executor.distributed_gpu_executor import (  # yapf: disable
        DistributedGPUExecutor, DistributedGPUExecutorAsync)
    from vllm.executor.ray_utils import RayWorkerWrapper, ray
    from vllm.logger import init_logger
    from vllm.sequence import ExecuteModelRequest, SamplerOutput
    from vllm.utils import (get_distributed_init_method, get_ip, get_open_port,
                            get_vllm_instance_id, make_async)
    if ray is not None:
        from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
    if TYPE_CHECKING:
        from ray.util.placement_group import PlacementGroup
    logger = init_logger(__name__)
    USE_RAY_COMPILED_DAG = envs.VLLM_USE_RAY_COMPILED_DAG
    class RayAscendExecutor(DistributedGPUExecutor):
        def _init_executor(self) -> None:
            assert (not self.speculative_config
                    ), "Speculative decoding not yet supported for RayNPU backend."
            assert self.parallel_config.worker_use_ray
            placement_group = self.parallel_config.placement_group
            # Disable Ray usage stats collection.
            ray_usage = os.environ.get("RAY_USAGE_STATS_ENABLED", "0")
            if ray_usage != "1":
                os.environ["RAY_USAGE_STATS_ENABLED"] = "0"
            # Create the parallel GPU workers.
            self._init_workers_ray(placement_group)
            self.forward_dag = None
            if USE_RAY_COMPILED_DAG:
                self.forward_dag = self._compiled_ray_dag()
        def _init_workers_ray(self, placement_group: "PlacementGroup",
                              **ray_remote_kwargs):
            if self.parallel_config.tensor_parallel_size == 1:
                # For single GPU case, we use a ray worker with constrained memory.
                num_gpus = self.cache_config.gpu_memory_utilization
            else:
                # Otherwise, the ray workers are allocated with a full GPU.
                num_gpus = 1
            # The driver dummy worker does not actually use any resources.
            # It holds the resource for the driver worker.
            self.driver_dummy_worker: Optional[RayWorkerWrapper] = None
            # The remaining workers are the actual ray actors.
            self.workers: List[RayWorkerWrapper] = []
            if self.parallel_config.ray_workers_use_nsight:
                ray_remote_kwargs = self._configure_ray_workers_use_nsight(
                    ray_remote_kwargs)
            # Create the workers.
            driver_ip = get_ip()
            for bundle_id, bundle in enumerate(placement_group.bundle_specs):
                if not bundle.get("GPU", 0):
                    continue
                scheduling_strategy = PlacementGroupSchedulingStrategy(
                    placement_group=placement_group,
                    placement_group_capture_child_tasks=True,
                    placement_group_bundle_index=bundle_id,
                )
                worker = ray.remote(
                    num_cpus=0,
                    num_gpus=num_gpus,
                    scheduling_strategy=scheduling_strategy,
                    **ray_remote_kwargs,
                )(RayWorkerWrapper).remote(
                    worker_module_name="vllm_npu.worker.ascend_worker",
                    worker_class_name="AscendWorker",
                    trust_remote_code=self.model_config.trust_remote_code,
                )
                worker_ip = ray.get(worker.get_node_ip.remote())
                if worker_ip == driver_ip and self.driver_dummy_worker is None:
                    # If the worker is on the same node as the driver, we use it
                    # as the resource holder for the driver process.
                    self.driver_dummy_worker = worker
                    self.driver_worker = RayWorkerWrapper(
                        worker_module_name="vllm_npu.worker.ascend_worker",
                        worker_class_name="AscendWorker",
                        trust_remote_code=self.model_config.trust_remote_code,
                    )
                else:
                    # Else, added to the list of workers.
                    self.workers.append(worker)
            if self.driver_dummy_worker is None:
                raise ValueError(
                    "Ray does not allocate any GPUs on the driver node. Consider "
                    "adjusting the Ray placement group or running the driver on a "
                    "GPU node.")
            # Get the set of GPU IDs used on each node.
            worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids",
                                                        use_dummy_driver=True)
            node_workers = defaultdict(list)
            node_gpus = defaultdict(list)
            for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids):
                node_workers[node_id].append(i)
                node_gpus[node_id].extend(gpu_ids)
            for node_id, gpu_ids in node_gpus.items():
                node_gpus[node_id] = sorted(gpu_ids)
            VLLM_INSTANCE_ID = get_vllm_instance_id()
            # Set environment variables for the driver and workers.
            all_args_to_update_environment_variables = [({
                "CUDA_VISIBLE_DEVICES":
                ",".join(map(str, node_gpus[node_id])),
                "VLLM_INSTANCE_ID":
                VLLM_INSTANCE_ID,
                "VLLM_TRACE_FUNCTION":
                str(envs.VLLM_TRACE_FUNCTION),
            }, ) for (node_id, _) in worker_node_and_gpu_ids]
            self._run_workers("update_environment_variables",
                              all_args=all_args_to_update_environment_variables)
            distributed_init_method = get_distributed_init_method(
                driver_ip, get_open_port())
            # Initialize the actual workers inside worker wrapper.
            init_worker_all_kwargs = [
                self._get_worker_kwargs(
                    local_rank=node_workers[node_id].index(rank),
                    rank=rank,
                    distributed_init_method=distributed_init_method,
                ) for rank, (node_id, _) in enumerate(worker_node_and_gpu_ids)
            ]
            self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs)
            self._run_workers("init_device")
            self._run_workers("load_model",
                              max_concurrent_workers=self.parallel_config.
                              max_parallel_loading_workers)
        def execute_model(
                self,
                execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
            all_outputs = self._run_workers(
                "execute_model",
                driver_kwargs={"execute_model_req": execute_model_req},
                use_ray_compiled_dag=USE_RAY_COMPILED_DAG)
            # Only the driver worker returns the sampling results.
            return all_outputs[0]
        def _run_workers(
            self,
            method: str,
            *args,
            driver_args: Optional[Tuple[Any, ...]] = None,
            driver_kwargs: Optional[Dict[str, Any]] = None,
            all_args: Optional[List[Tuple[Any, ...]]] = None,
            all_kwargs: Optional[List[Dict[str, Any]]] = None,
            use_dummy_driver: bool = False,
            max_concurrent_workers: Optional[int] = None,
            use_ray_compiled_dag: bool = False,
            **kwargs,
        ) -> Any:
            """Runs the given method on all workers. Can be used in the following
            ways:
            - args/kwargs: All workers share the same args/kwargs
            - args/kwargs and driver_args/driver_kwargs: Driver worker has
              different args
            - all_args/all_kwargs: args/kwargs for each worker are specified
              individually
            """
            if max_concurrent_workers:
                raise NotImplementedError(
                    "max_concurrent_workers is not supported yet.")
            if driver_args is None:
                driver_args = args if all_args is None else all_args[0]
            if driver_kwargs is None:
                driver_kwargs = kwargs if all_kwargs is None else all_kwargs[0]
            count = len(self.workers)
            all_worker_args = repeat(args, count) if all_args is None \
                else islice(all_args, 1, None)
            all_worker_kwargs = repeat(kwargs, count) if all_kwargs is None \
                else islice(all_kwargs, 1, None)
            if use_ray_compiled_dag:
                assert self.forward_dag is not None
                output_channels = self.forward_dag.execute(1)
            else:
                # Start the ray workers first.
                ray_worker_outputs = [
                    worker.execute_method.remote(method, *worker_args,
                                                 **worker_kwargs)
                    for (worker, worker_args, worker_kwargs
                         ) in zip(self.workers, all_worker_args, all_worker_kwargs)
                ]
            # Start the driver worker after all the ray workers.
            if not use_dummy_driver:
                driver_worker_output = self.driver_worker.execute_method(
                    method, *driver_args, **driver_kwargs)
            else:
                assert self.driver_dummy_worker is not None
                driver_worker_output = ray.get(
                    self.driver_dummy_worker.execute_method.remote(
                        method, *driver_args, **driver_kwargs))
            # Get the results of the ray workers.
            if self.workers:
                if use_ray_compiled_dag:
                    try:
                        ray_worker_outputs = [
                            pickle.loads(chan.begin_read())
                            for chan in output_channels
                        ]
                    finally:
                        # Has to call end_read in order to reuse the DAG.
                        for chan in output_channels:
                            chan.end_read()
                else:
                    ray_worker_outputs = ray.get(ray_worker_outputs)
            return [driver_worker_output] + ray_worker_outputs
        def _compiled_ray_dag(self):
            import pkg_resources
            required_version = "2.9"
            current_version = pkg_resources.get_distribution("ray").version
            if current_version < required_version:
                raise ValueError(f"Ray version {required_version} or greater is "
                                 f"required, but found {current_version}")
            from ray.dag import InputNode, MultiOutputNode
            assert self.parallel_config.worker_use_ray
            with InputNode() as input_data:
                forward_dag = MultiOutputNode([
                    worker.execute_model_compiled_dag_remote.
                    bind(  # type: ignore[attr-defined]
                        input_data) for worker in self.workers
                ])
            return forward_dag.experimental_compile()
        def check_health(self) -> None:
            """Raises an error if engine is unhealthy."""
            self._check_if_any_actor_is_dead()
        def _check_if_any_actor_is_dead(self):
            if not self.workers:
                return
            dead_actors = []
            for actor in self.workers:
                actor_state = ray.state.actors(actor._ray_actor_id.hex())  # pylint: disable=protected-access
                if actor_state["State"] == "DEAD":
                    dead_actors.append(actor)
            if dead_actors:
                raise RuntimeError("At least one Worker is dead. "
                                   f"Dead Workers: {dead_actors}. ")
    class RayAscendExecutorAsync(RayAscendExecutor, DistributedGPUExecutorAsync):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.driver_executor = make_async(self.driver_worker.execute_method)
        async def _run_workers_async(
            self,
            method: str,
            *args,
            driver_args: Optional[Tuple[Any, ...]] = None,
            driver_kwargs: Optional[Dict[str, Any]] = None,
            **kwargs,
        ) -> Any:
            """Runs the given method on all workers."""
            coros = []
            if driver_args is None:
                driver_args = args
            if driver_kwargs is None:
                driver_kwargs = kwargs
            coros.append(
                self.driver_executor(method, *driver_args, **driver_kwargs))
            # Run the ray workers asynchronously.
            for worker in self.workers:
                coros.append(worker.execute_method.remote(method, *args, **kwargs))
            all_outputs = await asyncio.gather(*coros)
            return all_outputs
    
  • vllm_npu/vllm_npu/executor/ray_utils.py:适配ray分布式环境的初始化操作,以便能在昇腾环境下成功初始化ray环境
    from typing import Optional, Tuple, TYPE_CHECKING
    from vllm.config import ParallelConfig
    from vllm.utils import is_hip
    from vllm.logger import init_logger
    logger = init_logger(__name__)
    try:
        import ray
    except ImportError as e:
        logger.warning(f"Failed to import Ray with {e!r}. "
                       "For distributed inference, please install Ray with "
                       "`pip install ray`.")
        ray = None
    if TYPE_CHECKING:
        from ray.util.placement_group import PlacementGroup
    def initialize_ray_cluster(
        parallel_config: ParallelConfig,
        ray_address: Optional[str] = None,
    ):
        """Initialize the distributed cluster with Ray.
        it will connect to the Ray cluster and create a placement group
        for the workers, which includes the specification of the resources
        for each distributed worker.
        Args:
            parallel_config: The configurations for parallel execution.
            ray_address: The address of the Ray cluster. If None, uses
                the default Ray cluster address.
        """
        if ray is None:
            raise ImportError(
                "Ray is not installed. Please install Ray to use multi-node "
                "serving.")
        # Connect to a ray cluster.
        if is_hip():
            ray.init(address=ray_address,
                     ignore_reinit_error=True,
                     num_gpus=parallel_config.world_size)
        else:
            """start adapt"""
            # without setting num_gpus, the function will try to detect num of 
            # GPUs, but in ascend environment it may fail to detect gpus, thus
            # needed to be manually setted.
            ray.init(address=ray_address, ignore_reinit_error=True, 
                     num_gpus=parallel_config.world_size)
            """end adapt"""
        if parallel_config.placement_group:
            # Placement group is already set.
            return
        # Create placement group for worker processes
        current_placement_group = ray.util.get_current_placement_group()
        if current_placement_group:
            # We are in a placement group
            bundles = current_placement_group.bundle_specs
            # Verify that we can use the placement group.
            gpu_bundles = 0
            for bundle in bundles:
                bundle_gpus = bundle.get("GPU", 0)
                if bundle_gpus > 1:
                    raise ValueError(
                        "Placement group bundle cannot have more than 1 GPU.")
                if bundle_gpus:
                    gpu_bundles += 1
            if parallel_config.world_size > gpu_bundles:
                raise ValueError(
                    "The number of required GPUs exceeds the total number of "
                    "available GPUs in the placement group.")
        else:
            num_gpus_in_cluster = ray.cluster_resources().get("GPU", 0)
            if parallel_config.world_size > num_gpus_in_cluster:
                raise ValueError(
                    "The number of required GPUs exceeds the total number of "
                    "available GPUs in the cluster.")
            # Create a new placement group
            placement_group_specs = ([{"GPU": 1}] * parallel_config.world_size)
            current_placement_group = ray.util.placement_group(
                placement_group_specs)
            # Wait until PG is ready - this will block until all
            # requested resources are available, and will timeout
            # if they cannot be provisioned.
            ray.get(current_placement_group.ready(), timeout=1800)
        # Set the placement group in the parallel config
        parallel_config.placement_group = current_placement_group
  • vllm_npu/vllm_npu/model_executor/__init__.py:在model_executor中替换vllm原生loader里的get_architecture_class_name函数
    import vllm.model_executor.model_loader as vllm_model_loader
    import vllm_npu.model_executor.ascend_model_loader as ascend_model_loader
    vllm_model_loader.get_architecture_class_name = ascend_model_loader.get_architecture_class_name
  • vllm_npu/vllm_npu/model_executor/ascend_model_loader.py:重写vllm原生的get_architecture_class_name函数,以适配量化场景;重写vllm原生的get_model函数,以便提供进入到MindIE整图模型的入口
    # Part of code in this file was copied from project [vLLM Team][vllm] for adapting usage
    import contextlib
    import torch
    import torch.nn as nn
    from vllm.config import DeviceConfig, ModelConfig, LoadConfig
    from vllm.model_executor.model_loader.weight_utils import initialize_dummy_weights
    from vllm_npu.model_executor.models.ascend.mindie_llm_wrapper import MindIELlmWrapper
    def get_architecture_class_name(model_config: ModelConfig) -> str:
        architectures = getattr(model_config.hf_config, "architectures", [])
        if (model_config.quantization is not None
                and model_config.quantization != "fp8"
                and "MixtralForCausalLM" in architectures):
            architectures = ["QuantMixtralForCausalLM"]
        return architectures[0]
        
    @contextlib.contextmanager
    def _set_default_torch_dtype(dtype: torch.dtype):
        """Sets the default torch dtype to the given dtype."""
        old_dtype = torch.get_default_dtype()
        torch.set_default_dtype(dtype)
        yield
        torch.set_default_dtype(old_dtype)
    def get_model(model_config: ModelConfig, device_config: DeviceConfig,
                  load_config: LoadConfig, 
                  mindie_model_config, **kwargs) -> nn.Module:
        lora_config = kwargs.get("lora_config", None)
        model_class = MindIELlmWrapper
        # Get the (maybe quantized) linear method.
        linear_method = None
        with _set_default_torch_dtype(model_config.dtype):
            # Create a model instance.
            # The weights will be initialized as empty tensors.
            with torch.device(device_config.device):
                if hasattr(model_class, "supported_lora_modules"):
                    model = model_class(mindie_model_config, linear_method,
                                        lora_config)
                elif lora_config:
                    raise ValueError(
                        f"Model {model_class.__name__} does not support LoRA, "
                        "but LoRA is enabled. Support for this model may "
                        "be added in the future. If this is important to you, "
                        "please open an issue on github.")
                else:
                    model = model_class(mindie_model_config, linear_method)
            if load_config.load_format == "dummy":
                initialize_dummy_weights(model)
            else:
                # Load the weights from the cached or downloaded files.
                model.load_weights(model_config.model, load_config.download_dir,
                                   load_config.load_format, model_config.revision)
            model = model.npu()
        return model.eval()
  • vllm_npu/vllm_npu/model_executor/layers/__init__.py:空白文件。
  • vllm_npu/vllm_npu/model_executor/layers/ascend_sampler.py:实现昇腾后端的后处理类以便实现vllm数据结构和MindIE后处理的对接
    # Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved.
    # Part of codes in this file was copied from project [vLLM Team][vllm]
    import random
    from typing import Dict, List, Optional, Tuple
    import torch
    import torch.nn as nn
    import numpy as np
    from vllm.sampling_params import SamplingType
    from vllm.sequence import SamplerOutput
    from vllm.model_executor.sampling_metadata import SamplingMetadata, SequenceGroupToSample
    from vllm.model_executor.layers.sampler import _get_logprobs, _build_sampler_output
    from mindie_llm.text_generator.utils.sampling_metadata import SamplingData, SamplingParam
    _SAMPLING_EPS = 1e-5
    
    SampleResultType = List[Tuple[List[int], List[int]]]
    
    
    def _to_tensor(data, dtype=None):
        if dtype:
            return torch.tensor(data, dtype=dtype, device=torch.device("npu"))
        else:
            return torch.tensor(data, device=torch.device("npu"))
    
    
    
    class AscendSampler(nn.Module):
        def __init__(self, mindie_model):
            super().__init__()
            self.mindie_model = mindie_model
            self.include_gpu_probs_tensor = False
    
        def forward(
            self,
            logits: torch.Tensor,
            sampling_metadata: SamplingMetadata,
        ) -> Optional[SamplerOutput]:
            _, vocab_size = logits.shape
            mindie_sampling_data, mindie_sampling_param = self.construct_data(sampling_metadata, vocab_size)
            probs = torch.softmax(logits, dim=-1, dtype=torch.float)
            logprobs = torch.log_softmax(logits, dim=-1, dtype=torch.float)
    
            sampling_mask = [seq_group.do_sample for seq_group in sampling_metadata.seq_groups]
            filtered_logits = logits[sampling_mask]
    
            if filtered_logits.size(0) > 0:
                next_tokens, _= self.mindie_model.sample(
                    filtered_logits, 
                    sampling_data=mindie_sampling_data, 
                    sampling_param=mindie_sampling_param,
                )
            else:
                next_tokens = None
    
            sample_results, maybe_sampled_tokens_tensor = recover_data(
                sampling_metadata=sampling_metadata, 
                sampled_tokens=next_tokens, 
                logprobs=logprobs, 
                include_gpu_probs_tensor=self.include_gpu_probs_tensor,
            )
            if self.include_gpu_probs_tensor:
                if maybe_sampled_tokens_tensor is None:
                    raise RuntimeError("maybe_sampled_tokens_tensor is None")
                on_device_tensors = (probs, logprobs, maybe_sampled_tokens_tensor)
            else:
                on_device_tensors = None
    
            # Get the logprobs query results.
            prompt_logprobs, sample_logprobs = _get_logprobs(
                logprobs, sampling_metadata, sample_results)
            return _build_sampler_output(sample_results,
                                         sampling_metadata,
                                         prompt_logprobs,
                                         sample_logprobs,
                                         on_device_tensors=on_device_tensors)
    
        def construct_data(
            self,
            sampling_metadata: SamplingMetadata,
            vocab_size: int,
        ) -> Tuple[SamplingData, SamplingParam]:
            all_input_tokens: List[List[int]] = []
            prompt_tokens: List[List[int]] = []
            output_tokens: List[List[int]] = []
            top_ks: List[int] = []
            temperatures: List[float] = []
            top_ps: List[float] = []
            min_ps: List[float] = []
            presence_penalties: List[float] = []
            frequency_penalties: List[float] = []
            repetition_penalties: List[float] = []
            sampling_seeds: List[int] = []
            sample_indices: List[int] = []
            do_samples: List[bool] = []  # To Do
            do_penalties = False
            do_top_p_top_k = False
            do_min_p = False
            greedy_flag = False
    
            if sampling_metadata.seq_groups is None:
                raise RuntimeError("sampling_metadata.seq_group is None, no data received.")
            for seq_group in sampling_metadata.seq_groups:
                do_samples.append(seq_group.do_sample)
                seq_ids = seq_group.seq_ids
                sampling_params = seq_group.sampling_params
                temperature = sampling_params.temperature
                p = sampling_params.presence_penalty
                f = sampling_params.frequency_penalty
                r = sampling_params.repetition_penalty
                top_p = sampling_params.top_p
                min_p = sampling_params.min_p
                is_greedy = sampling_params.sampling_type == SamplingType.GREEDY
                seed = sampling_params.seed
                if seed is None:
                    if is_greedy:
                        seed = 0
                    else:
                        lo, hi = torch.iinfo(torch.long).min, torch.iinfo(torch.long).max
                        seed = random.randint(lo, hi)
                if is_greedy:
                    greedy_flag = True
    
                # k should not be greater than the vocab size.
                top_k = min(sampling_params.top_k, vocab_size)
                top_k = vocab_size if top_k == -1 else top_k
                if temperature < _SAMPLING_EPS:
                    temperature = 1.0
                if not do_top_p_top_k and (top_p < 1.0 - _SAMPLING_EPS
                                           or top_k != vocab_size):
                    do_top_p_top_k = True
                if not do_min_p and min_p > _SAMPLING_EPS:
                    do_min_p = True
                if not do_penalties:
                    if abs(p) >= _SAMPLING_EPS:
                        do_penalties = True
                    elif abs(f) >= _SAMPLING_EPS:
                        do_penalties = True
                    elif abs(r - 1.0) >= _SAMPLING_EPS:
                        do_penalties = True
    
                is_prompt = seq_group.is_prompt
                if (seq_group.is_prompt
                        and sampling_params.prompt_logprobs is not None):
                    # For tokens in the prompt that we only need to get
                    # their logprobs
                    query_len = seq_group.query_len
                    if query_len is None:
                        raise RuntimeError("query_len is None")
                    prefill_len = len(seq_group.prompt_logprob_indices)
                    temperatures += [temperature] * prefill_len
                    sampling_seeds += [seed] * prefill_len
                    top_ps += [top_p] * prefill_len
                    top_ks += [top_k] * prefill_len
                    min_ps += [min_p] * prefill_len
                    presence_penalties += [0] * prefill_len
                    frequency_penalties += [0] * prefill_len
                    repetition_penalties += [1] * prefill_len
                    prompt_tokens.extend([] for _ in range(prefill_len))
                    output_tokens.extend([] for _ in range(prefill_len))
                    all_input_tokens.extend([] for _ in range(prefill_len))
    
                if seq_group.do_sample:
                    sample_lens = len(seq_group.sample_indices)
                    if sample_lens != len(seq_ids):
                        raise ValueError("sample_lens != len(seq_ids)")
                    for seq_id in seq_ids:
                        seq_data = seq_group.seq_data[seq_id]
                        prompt_tokens.append(seq_data.prompt_token_ids)
                        output_tokens.append(seq_data.output_token_ids)
                        all_input_tokens.append(seq_data.prompt_token_ids + seq_data.output_token_ids)
                    temperatures += [temperature] * len(seq_ids)
                    sampling_seeds += [seed] * len(seq_ids)
                    top_ps += [top_p] * len(seq_ids)
                    top_ks += [top_k] * len(seq_ids)
                    min_ps += [min_p] * len(seq_ids)
                    presence_penalties += [p] * len(seq_ids)
                    frequency_penalties += [f] * len(seq_ids)
                    repetition_penalties += [r] * len(seq_ids)
    
            repetition_penalties = np.array(repetition_penalties, dtype=np.float32)
            frequency_penalties = np.array(frequency_penalties, dtype=np.float32)
            presence_penalties = np.array(presence_penalties, dtype=np.float32)
            temperatures = np.array(temperatures, dtype=np.float32)
            top_ks = np.array(top_ks, dtype=np.int32)
            top_ps = np.array(top_ps, dtype=np.float32)
            sampling_seeds = np.array(sampling_seeds)
            do_samples = np.array(do_samples)
    
            max_tokens_len = max([len(tokens) for tokens in all_input_tokens], default=0)
            padded_all_input_tokens = [
                tokens + [vocab_size] * (max_tokens_len - len(tokens))
                for tokens in all_input_tokens
            ]
            padded_all_input_tokens = np.array(padded_all_input_tokens, dtype=np.int32)
            output_max_len = max([len(tokens) for tokens in output_tokens], default=0)
            padded_output_tokens = [
                tokens + [vocab_size] * (output_max_len - len(tokens))
                for tokens in output_tokens
            ]
            padded_output_tokens = np.array(padded_output_tokens, dtype=np.int32)
    
            all_input_ids_tensor = _to_tensor(
                padded_all_input_tokens, 
                torch.int32
            ) if padded_all_input_tokens is not None else None
            output_ids_tensor = _to_tensor(
                padded_output_tokens, 
                torch.int32
            ) if padded_output_tokens is not None else None
            mindie_sampling_data = SamplingData(
                all_input_ids=all_input_ids_tensor, 
                output_ids=output_ids_tensor
            )
    
            if greedy_flag:
                mindie_sampling_param = None
            else:
                mindie_sampling_param = SamplingParam.from_numpy(
                    repetition_penalty=repetition_penalties,
                    frequency_penalty=frequency_penalties,
                    presence_penalty=presence_penalties,
                    temperature=temperatures,
                    top_k=top_ks,
                    top_p=top_ps,
                    seed=sampling_seeds,
                    do_sample=do_samples,
                    to_tensor=_to_tensor,
                )
            return (mindie_sampling_data, mindie_sampling_param)
    
    
    def recover_data(
        sampling_metadata: SamplingMetadata,
        sampled_tokens: np.ndarray,
        logprobs: torch.Tensor,
        include_gpu_probs_tensor: bool,
    ) -> Tuple[SampleResultType, Optional[torch.Tensor]]:
        categorized_seq_group_ids: Dict[SamplingType,
                                        List[int]] = {t: []
                                                      for t in SamplingType}
        categorized_sample_indices = sampling_metadata.categorized_sample_indices
        for i, seq_group in enumerate(sampling_metadata.seq_groups):
            sampling_params = seq_group.sampling_params
            sampling_type = sampling_params.sampling_type
            categorized_seq_group_ids[sampling_type].append(i)
    
        sample_results_dict: Dict[int, Tuple[List[int], List[int]]] = {}
        sample_metadata = {}
    
        # Create output tensor for sampled token ids.
        if include_gpu_probs_tensor:
            sampled_token_ids_tensor = torch.empty(logprobs.shape[0],
                                                   1,
                                                   dtype=torch.long,
                                                   device=logprobs.device)
        else:
            sampled_token_ids_tensor = None
    
        for sampling_type in SamplingType:
            sample_indices = categorized_sample_indices[sampling_type][:, 0]
            num_tokens = len(sample_indices)
            if num_tokens == 0:
                continue
    
            seq_group_id = categorized_seq_group_ids[sampling_type]
            seq_groups = [sampling_metadata.seq_groups[i] for i in seq_group_id]
            sample_metadata[sampling_type] = (seq_group_id, seq_groups)
    
        for sampling_type in SamplingType:
            if sampling_type not in sample_metadata:
                continue
            (seq_group_id, seq_groups) = sample_metadata[sampling_type]
            if sampling_type in (SamplingType.GREEDY, SamplingType.RANDOM, SamplingType.RANDOM_SEED):
                sample_results = normal_wrap(seq_groups, sampled_tokens)
            elif sampling_type == SamplingType.BEAM:
                sample_results = beam_wrap(seq_groups, sampled_tokens)
            sample_results_dict.update(zip(seq_group_id, sample_results))
    
        sample_results = [
            sample_results_dict.get(i, ([], []))
            for i in range(len(sampling_metadata.seq_groups))
        ]
        return sample_results, sampled_token_ids_tensor
    
    
    def normal_wrap(
        selected_seq_groups: List[SequenceGroupToSample],
        samples: np.ndarray,
    ):
        samples = samples.tolist()
        sample_idx = 0
        results: SampleResultType = []
        for seq_group in selected_seq_groups:
            if not seq_group.do_sample:
                results.append(([], []))
                continue
    
            seq_ids = seq_group.seq_ids
            num_parent_seqs = len(seq_ids)
            parent_ids = list(range(num_parent_seqs))
            next_token_ids = [samples[sample_idx]]
            results.append((next_token_ids, parent_ids))
            sample_idx += num_parent_seqs
        return results
    
    
    def beam_wrap(
        selected_seq_groups: List[SequenceGroupToSample],
        samples: np.ndarray,
    ):
        raise ValueError(f"Unsupported sampling type: beam search")
    
  • vllm_npu/vllm_npu/model_executor/models/__init__.py:空白文件。
  • vllm_npu/vllm_npu/model_executor/models/ascend/__init__.py:空白文件。
  • vllm_npu/vllm_npu/model_executor/models/ascend/mindie_llm_wrapper.py:实现了对接MindIE整图模型的包装类,用于将vllm的数据结构解包出MindIE整图所需要的数据并透传下去
    # Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved.
    import math
    from typing import List, Optional
    
    import torch
    from mindie_llm.text_generator.adapter.generator_torch import GeneratorTorch
    from torch import nn
    import torch.nn.functional as F
    from vllm.attention import AttentionMetadata
    from vllm.lora.request import LoRARequest
    from vllm.model_executor import SamplingMetadata
    from vllm.sequence import SamplerOutput
    from vllm_npu.model_executor.layers.ascend_sampler import AscendSampler
    from vllm_npu.worker.cache_engine import KVCache
    
    
    class MindIELlmWrapper(nn.Module):
    
        def __init__(self, mindie_model_config, linear_method=None, lora_config=None):
            super(MindIELlmWrapper, self).__init__()
    
            self.mindie_model_config = mindie_model_config
            self.rank = mindie_model_config["rank"]
            self.local_rank = mindie_model_config["local_rank"]
            self.npu_id = self.local_rank
            self.world_size = mindie_model_config["world_size"]
            self.kv_cache_dtype = mindie_model_config["kv_cache_dtype"]
            self.mindie_model = None
            self.sampler = None
    
        def forward(
            self,
            input_ids: torch.Tensor,
            positions: torch.Tensor,
            kv_caches: List[KVCache],
            attn_metadata: AttentionMetadata,
            lora_requests: List[LoRARequest],
        ) -> torch.Tensor:
            is_prompt = attn_metadata.prefill_metadata is not None
    
            if kv_caches[0][0] is None:
                kv_caches, block_tables, slots = self.create_dummy_kv_cache(attn_metadata, input_ids)
            else:
                block_tables = self.create_block_tables(attn_metadata)
                slots = attn_metadata.slot_mapping
            if attn_metadata.prefill_metadata is None:
                input_lengths = attn_metadata.decode_metadata.seq_lens_tensor
                max_seq_len = attn_metadata.decode_metadata.max_seq_len
                query_lens = [1] * len(attn_metadata.decode_metadata.seq_lens_tensor)
                lm_head_indices = None
            else:
                input_lengths = attn_metadata.prefill_metadata.seq_lens_tensor
                max_seq_len = attn_metadata.prefill_metadata.max_seq_len
                subquery_start_loc = attn_metadata.prefill_metadata.subquery_start_loc
                query_lens_tensor = subquery_start_loc[1:] - subquery_start_loc[:-1]
                if attn_metadata.decode_metadata is not None:
                    input_lengths = torch.cat((input_lengths, attn_metadata.decode_metadata.seq_lens_tensor), dim=0)
                    max_seq_len = max(max_seq_len, attn_metadata.decode_metadata.max_seq_len)
                    query_lens_tensor = F.pad(query_lens_tensor, (0, attn_metadata.num_decode_tokens), "constant", 1)
                query_lens = query_lens_tensor.tolist()
                lm_head_indices = query_lens_tensor.cumsum(dim=-1) - 1
    
            adapter_ids = [lora_request.lora_name if lora_request else None for lora_request in lora_requests]
    
            logits = self.mindie_model.forward_tensor(
                input_ids,
                positions,
                is_prompt,
                kv_caches,
                block_tables,
                slots,
                input_lengths,
                max_seq_len,
                lm_head_indices,
                adapter_ids=adapter_ids,
                q_lens=query_lens,
            )
    
            return logits
    
        def compute_logits(self, hidden_states: torch.Tensor, sampling_metadata: SamplingMetadata) -> torch.Tensor:
            return hidden_states
    
        def sample(
            self,
            logits: torch.Tensor,
            sampling_metadata: SamplingMetadata,
        ) -> Optional[SamplerOutput]:
            # hidden_states is logits
            next_tokens = self.sampler(logits, sampling_metadata)
            return next_tokens
    
        def load_weights(
            self,
            model_name_or_path: str,
            cache_dir: Optional[str] = None,
            load_format: str = "auto",
            revision: Optional[str] = None,
        ):
            if load_format not in ["auto", "safetensors", "pt"]:
                raise ValueError("load-format support [safetensors, pt]")
    
            self.weight_dtype = torch.get_default_dtype()
            torch.set_default_dtype(torch.float32)
    
            self.mindie_model = GeneratorTorch(self.mindie_model_config)
            self.sampler = AscendSampler(self.mindie_model)
    
            torch.set_default_dtype(self.weight_dtype)
    
        # when warmup, create dummy kvcache, block_tables, slot_mapping
        def create_dummy_kv_cache(self, attn_metadata, input_ids):
            dummy_block_size = 128
            max_s = max(attn_metadata.prefill_metadata.seq_lens_tensor)
            max_need_block = math.ceil(max_s / dummy_block_size)
            batch_size = len(attn_metadata.prefill_metadata.seq_lens_tensor)
            dummy_block_num = max_need_block * batch_size
    
            model_runner = self.mindie_model.model_wrapper.model_runner
            kv_cache = [
                (
                    torch.empty(
                        (dummy_block_num, dummy_block_size, model_runner.num_kv_heads, model_runner.head_size),
                        dtype=self.kv_cache_dtype,
                        device="npu",
                    ),
                    torch.empty(
                        (dummy_block_num, dummy_block_size, model_runner.num_kv_heads, model_runner.head_size),
                        dtype=self.kv_cache_dtype,
                        device="npu",
                    ),
                )
                for _ in range(model_runner.num_layers)
            ]
    
            block_tables = torch.zeros(batch_size, max_need_block, dtype=int, device="npu")
            slot = [i for i in range(dummy_block_size)]
            slots = []
            warm_up_len = len(input_ids)
            while warm_up_len > 0:
                if warm_up_len > dummy_block_size:
                    slots.extend(slot)
                    warm_up_len -= dummy_block_size
                else:
                    slots.extend(slot[:warm_up_len])
                    warm_up_len = 0
            slots = torch.tensor(slots, dtype=torch.long, device="npu")
            return kv_cache, block_tables, slots
    
        def create_block_tables(self, attn_metadata):
            if attn_metadata.prefill_metadata is None:
                return attn_metadata.decode_metadata.block_tables
            prefill_block_tables = attn_metadata.prefill_metadata.block_tables
            if prefill_block_tables.numel() == 0:
                return torch.tensor([0], dtype=torch.int32, device="npu")
            if attn_metadata.decode_metadata is None:
                return prefill_block_tables
    
            decode_block_tables = attn_metadata.decode_metadata.block_tables
            pad_size = prefill_block_tables.size(1) - decode_block_tables.size(1)
            if pad_size > 0:
                decode_block_tables = F.pad(decode_block_tables, (0, pad_size), "constant", 0)
            elif pad_size < 0:
                prefill_block_tables = F.pad(prefill_block_tables, (0, -pad_size), "constant", 0)
            return torch.cat((prefill_block_tables, decode_block_tables), dim=0)
  • vllm_npu/vllm_npu/usage/__init__.py:热替换vllm原生的_report_usage_once函数以修复一个打印usage信息时的报错
    import types
    from vllm.engine.llm_engine import usage_message
    import vllm_npu.usage.usage_lib as vllm_npu_usage_lib
    usage_message._report_usage_once = types.MethodType(vllm_npu_usage_lib._report_usage_once, usage_message)
  • vllm_npu/vllm_npu/usage/usage_lib.py:重新实现vllm原生的_report_usage_once函数以修复一个打印usage信息时的报错
    # Part of code in this file was copied from project [vLLM Team][vllm] for adapting usage
    import platform
    from typing import Any, Dict
    import cpuinfo
    import psutil
    import torch
    import vllm.envs as envs
    from vllm.usage.usage_lib import UsageContext, _detect_cloud_provider, _get_current_timestamp_ns
    def _report_usage_once(self, model_architecture: str,
                               usage_context: UsageContext,
                               extra_kvs: Dict[str, Any]) -> None:
        # Platform information
        if torch.npu.is_available():
            device_property = torch.npu.get_device_properties()
            self.gpu_count = torch.npu.device_count()
            self.gpu_type = device_property.name
            self.gpu_memory_per_device = device_property.total_memory
        self.provider = _detect_cloud_provider()
        self.architecture = platform.machine()
        self.platform = platform.platform()
        self.total_memory = psutil.virtual_memory().total
        info = cpuinfo.get_cpu_info()
        self.num_cpu = info.get("count", None)
        self.cpu_type = info.get("brand_raw", "")
        self.cpu_family_model_stepping = ",".join([
            str(info.get("family", "")),
            str(info.get("model", "")),
            str(info.get("stepping", ""))
        ])
        # vLLM information
        import vllm  # delayed import to prevent circular import
        self.context = usage_context.value
        self.vllm_version = vllm.__version__
        self.model_architecture = model_architecture
        # Metadata
        self.log_time = _get_current_timestamp_ns()
        self.source = envs.VLLM_USAGE_SOURCE
        data = vars(self)
        if data["_report_usage_once"] is not None:
            del data["_report_usage_once"]
        if extra_kvs:
            data.update(extra_kvs)
        self._write_to_file(data)
        self._send_to_server(data)
  • vllm_npu/vllm_npu/worker/__init__.py:替换vllm原生CacheEngine中的_allocate_kv_cache函数,以实现自定义的kv_cache分配方法
    from vllm_npu.worker.cache_engine import _allocate_kv_cache
    from vllm.worker import cache_engine
    cache_engine.CacheEngine._allocate_kv_cache = _allocate_kv_cache
  • vllm_npu/vllm_npu/worker/ascend_model_runner.py : 实现昇腾后端使用的AscendModelRunner类,已适配prefix cache特性。
    # Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved.
    # Part of codes in this file was copied from project [vLLM Team][vllm]
    from typing import List, NamedTuple, Optional, Tuple
    
    import torch
    from vllm.attention import AttentionMetadata, AttentionMetadataPerStage
    from vllm.attention.backends.flashinfer import FlashInferBackend
    from vllm.config import (
        DeviceConfig,
        LoadConfig,
        LoRAConfig,
        ModelConfig,
        ParallelConfig,
        SchedulerConfig,
        VisionLanguageConfig,
    )
    from vllm.distributed import broadcast_tensor_dict
    from vllm.logger import init_logger
    from vllm.lora.layers import LoRAMapping
    from vllm.lora.request import LoRARequest
    from vllm.model_executor import SamplingMetadata
    from vllm.sampling_params import SamplingParams
    from vllm.sequence import SamplerOutput, SequenceGroupMetadata
    from vllm.utils import get_kv_cache_torch_dtype, is_hip, make_tensor_with_pad
    from vllm.worker.model_runner import (
        _PAD_SLOT_ID,
        BatchType,
        ModelRunner,
        PrepareDecodeMetadata,
        PreparePromptMetadata,
        _prepare_fake_inputs,
    )
    from vllm_npu.model_executor.ascend_model_loader import get_model
    
    logger = init_logger(__name__)
    LORA_WARMUP_RANK = 8
    
    
    class PreparePromptMetadata(NamedTuple):
        input_tokens: List[int]
        input_positions: List[int]
        attn_metadata: Optional[AttentionMetadataPerStage]
        seq_lens: List[int]
        query_lens: List[int]
        lora_index_mapping: List[int]
        lora_prompt_mapping: List[int]
        lora_requests: List[LoRARequest]
        multi_modal_input: Optional[torch.Tensor]
        slot_mapping: List[int]
    
        @classmethod
        def empty(cls):
            return PreparePromptMetadata(
                input_tokens=[],
                input_positions=[],
                attn_metadata=None,
                seq_lens=[],
                query_lens=[],
                lora_index_mapping=[],
                lora_prompt_mapping=[],
                lora_requests=[],
                multi_modal_input=None,
                slot_mapping=[],
            )
    
    
    class PrepareDecodeMetadata(NamedTuple):
        input_tokens: List[int]
        input_positions: List[int]
        attn_metadata: Optional[AttentionMetadata]
        lora_index_mapping: List[int]
        lora_prompt_mapping: List[int]
        lora_requests: List[LoRARequest]
        slot_mapping: List[int]
    
        @classmethod
        def empty(cls):
            return PrepareDecodeMetadata(
                input_tokens=[],
                input_positions=[],
                attn_metadata=None,
                lora_index_mapping=[],
                lora_prompt_mapping=[],
                lora_requests=[],
                slot_mapping=[],
            )
    
    
    class AscendModelRunner(ModelRunner):
        def __init__(
            self,
            model_config: ModelConfig,
            parallel_config: ParallelConfig,
            scheduler_config: SchedulerConfig,
            device_config: DeviceConfig,
            load_config: LoadConfig,
            lora_config: Optional[LoRAConfig],
            mindie_model_config,
            kv_cache_dtype: Optional[str] = "auto",
            is_driver_worker: bool = False,
            vision_language_config: Optional[VisionLanguageConfig] = None,
        ):
            super(AscendModelRunner, self).__init__(
                model_config,
                parallel_config,
                scheduler_config,
                device_config,
                load_config,
                lora_config,
                kv_cache_dtype,
                is_driver_worker,
                vision_language_config,
            )
            self.kv_cache_torch_dtype = get_kv_cache_torch_dtype(kv_cache_dtype, model_config.dtype)
            self.mindie_model_config = mindie_model_config
            self.mindie_model_config["kv_cache_dtype"] = self.kv_cache_torch_dtype
    
        def load_model(self) -> None:
            self.model = get_model(
                model_config=self.model_config,
                device_config=self.device_config,
                load_config=self.load_config,
                mindie_model_config=self.mindie_model_config,
            )
    
            if self.kv_cache_dtype == "fp8" and is_hip():
                # Currently scaled KV cache is only enabled on ROCm
                if self.model_config.quantization_param_path is not None:
                    if callable(getattr(self.model, "load_kv_cache_scales", None)):
                        self.model.load_kv_cache_scales(self.model_config.quantization_param_path)
                    else:
                        raise RuntimeError(
                            "Using FP8 KV cache and scaling factors provided but "
                            "model %s does not support loading scaling factors.",
                            self.model.__class__,
                        )
                else:
                    logger.warning(
                        "Using FP8 KV cache but no scaling factors "
                        "provided. Defaulting to scaling factors of 1.0. "
                        "This may lead to less accurate results!"
                    )
            elif self.model_config.quantization_param_path is not None:
                logger.warning(
                    "KV cache scaling factors provided, "
                    "but the KV cache data type is not FP8. "
                    "KV cache scaling factors will not be used."
                )
    
        @torch.inference_mode()
        def profile_run(self) -> None:
            # Enable top-k sampling to reflect the accurate memory usage.
            sampling_params = SamplingParams(top_p=0.99, top_k=self.vocab_size - 1)
            max_num_batched_tokens = self.scheduler_config.max_num_batched_tokens
            max_num_seqs = self.scheduler_config.max_num_seqs
    
            dummy_lora_requests_per_seq = []
    
            seqs: List[SequenceGroupMetadata] = []
    
            if self.vision_language_config:
                max_num_seqs = min(
                    max_num_seqs, int(max_num_batched_tokens / self.vision_language_config.image_feature_size)
                )
            for group_id in range(max_num_seqs):
                seq_len = max_num_batched_tokens // max_num_seqs + (group_id < max_num_batched_tokens % max_num_seqs)
                seq_data, fake_multi_modal_input = _prepare_fake_inputs(seq_len, self.vision_language_config)
                seq = SequenceGroupMetadata(
                    request_id=str(group_id),
                    is_prompt=True,
                    seq_data={group_id: seq_data},
                    sampling_params=sampling_params,
                    block_tables=None,
                    lora_request=dummy_lora_requests_per_seq[group_id] if dummy_lora_requests_per_seq else None,
                    multi_modal_data=fake_multi_modal_input,
                )
                seqs.append(seq)
    
            # Run the model with the dummy inputs.
            num_layers = self.model_config.get_num_layers(self.parallel_config)
            kv_caches = [(None, None)] * num_layers
            self.execute_model(seqs, kv_caches)
            torch.npu.synchronize()
            return
    
        def _prepare_prompt(
            self,
            seq_group_metadata_list: List[SequenceGroupMetadata],
        ) -> PreparePromptMetadata:
            input_tokens: List[int] = []
            input_positions: List[int] = []
            slot_mapping: List[int] = []
            lora_index_mapping: List[int] = []
            lora_prompt_mapping: List[int] = []
            lora_requests: List[LoRARequest] = []
    
            seq_lens: List[int] = []
            context_lens: List[int] = []
            query_lens: List[int] = []
            prefix_block_tables: List[List[int]] = []
            multi_modal_input_list: List[torch.Tensor] = []
    
            if len(seq_group_metadata_list) == 0:
                return PreparePromptMetadata.empty()
    
            for seq_group_metadata in seq_group_metadata_list:
                if not seq_group_metadata.is_prompt:
                    raise ValueError("Expected prompt sequence group metadata.")
                seq_ids = list(seq_group_metadata.seq_data.keys())
                if len(seq_ids) != 1:
                    raise ValueError("Expected only one sequence ID.")
                seq_id = seq_ids[0]
    
                computed_block_nums = seq_group_metadata.computed_block_nums
                if (
                    self.scheduler_config is not None
                    and self.scheduler_config.chunked_prefill_enabled
                    and not (computed_block_nums is None or computed_block_nums == [])
                ):
                    raise RuntimeError("chunked prefill cannot be used with prefix caching " "now.")
    
                token_chunk_size = seq_group_metadata.token_chunk_size
                seq_data = seq_group_metadata.seq_data[seq_id]
                context_len = seq_data.get_num_computed_tokens()
                # We should use get_len here because in case of preemption
                # it contains output tokens.
                seq_len = min(seq_data.get_len(), context_len + token_chunk_size)
                prompt_tokens = seq_data.get_token_ids()[context_len:seq_len]
                seq_lens.append(seq_len)
    
                # NOTE: This only works for oooooooxxx style attention.
                if seq_group_metadata.block_tables is not None:
                    block_table = seq_group_metadata.block_tables[seq_id]
                else:
                    block_table = []
                if block_table:
                    prefix_block_tables.append(block_table)
                else:
                    prefix_block_tables.append([])
                if computed_block_nums is not None and len(computed_block_nums) > 0 and self.sliding_window is None:
                    # Prefix is not supported with sliding_window
                    context_len = len(computed_block_nums) * self.block_size
                    prompt_tokens = prompt_tokens[context_len:]
    
                # actual prompt lens
                context_lens.append(context_len)
                query_lens.append(seq_len - context_len)
    
                input_tokens.extend(prompt_tokens)
                # NOTE(woosuk): Here we assume that the first token in the prompt
                # is always the first token in the sequence.
                input_positions.extend(list(range(context_len, seq_len)))
                lora_id = seq_group_metadata.lora_int_id
    
                lora_requests.append(seq_group_metadata.lora_request)
    
                lora_index_mapping += [lora_id] * (seq_len - context_len)
                lora_prompt_mapping.extend(
                    [lora_id] * (seq_len - context_len if seq_group_metadata.sampling_params.prompt_logprobs else 1)
                )
    
                if seq_group_metadata.multi_modal_data:
                    multi_modal_input_list.append(seq_group_metadata.multi_modal_data.data)
    
                if seq_group_metadata.block_tables is None:
                    # During memory profiling, the block tables are not initialized
                    # yet. In this case, we just use a dummy slot mapping.
                    slot_mapping.extend([_PAD_SLOT_ID] * seq_len)
                    continue
    
                # Compute the slot mapping.
                block_table = seq_group_metadata.block_tables[seq_id]
    
                # Mask the [0, start_idx) tokens of the prompt with _PAD_SLOT_ID,
                # where start_idx is max(0, seq_len - sliding_window).
                # For example, if the prompt len is 10, sliding window is 8, and
                # block size is 4, the first two tokens are masked and the slot
                # mapping will be [-1, -1, 2, 3, 4, 5, 6, 7, 0, 1].
                start_idx = 0
                if self.sliding_window is not None:
                    if context_len != 0:
                        raise ValueError("Prefix caching is currently not supported with sliding window attention")
                    start_idx = max(0, seq_len - self.sliding_window)
    
                for i in range(context_len, seq_len):
                    if i < start_idx:
                        slot_mapping.append(_PAD_SLOT_ID)
                        continue
    
                    block_number = block_table[i // self.block_size]
                    block_offset = i % self.block_size
                    slot = block_number * self.block_size + block_offset
                    slot_mapping.append(slot)
    
            max_query_len = max(query_lens)
            max_seq_len = max(seq_lens)
            if max_query_len <= 0:
                raise ValueError("max_query_len must be greater than 0")
    
            context_lens_tensor = torch.tensor(context_lens, dtype=torch.int, device=self.device)
    
            if multi_modal_input_list:
                if not self.vision_language_config:
                    raise ValueError("Multi-modal inputs are only supported by vision language models.")
                multi_modal_input = torch.cat(multi_modal_input_list, dim=0).to(self.device)
            else:
                multi_modal_input = None
    
            # Prepare prefix block tables
            max_prompt_block_table_len = max(len(t) for t in prefix_block_tables)
            block_tables = make_tensor_with_pad(
                prefix_block_tables,
                max_len=max_prompt_block_table_len,
                pad=0,
                dtype=torch.int,
                device=self.device,
            )
    
            # Query length can be shorter than key (i.e., prompt) when prefill
            # is chunked or prefix cached.
            query_lens_tensor = torch.tensor(query_lens, dtype=torch.long, device=self.device)
            subquery_start_loc = torch.zeros(query_lens_tensor.shape[0] + 1, dtype=torch.int32, device=self.device)
    
            seq_lens_tensor = torch.tensor(seq_lens, dtype=torch.int, device=self.device)
            seq_start_loc = torch.zeros(seq_lens_tensor.shape[0] + 1, dtype=torch.int32, device=self.device)
    
            torch.cumsum(query_lens_tensor, dim=0, dtype=subquery_start_loc.dtype, out=subquery_start_loc[1:])
    
            torch.cumsum(seq_lens_tensor, dim=0, dtype=seq_start_loc.dtype, out=seq_start_loc[1:])
    
            if self.attn_backend is FlashInferBackend:
                attn_metadata = self.attn_backend.make_metadata(
                    is_prompt=True,
                    use_cuda_graph=False,
                    seq_start_loc=seq_start_loc,
                    max_seq_len=max_seq_len,
                    block_tables=block_tables,
                )
            else:
                attn_metadata = self.attn_backend.make_metadata(
                    is_prompt=True,
                    seq_lens=seq_lens,
                    seq_lens_tensor=seq_lens_tensor,
                    max_query_len=max_query_len,
                    max_seq_len=max_seq_len,
                    subquery_start_loc=subquery_start_loc,
                    seq_start_loc=seq_start_loc,
                    context_lens_tensor=context_lens_tensor,
                    block_tables=block_tables,
                    use_cuda_graph=False,
                )
    
            return PreparePromptMetadata(
                input_tokens=input_tokens,
                input_positions=input_positions,
                attn_metadata=attn_metadata,
                seq_lens=seq_lens,
                query_lens=query_lens,
                lora_index_mapping=lora_index_mapping,
                lora_prompt_mapping=lora_prompt_mapping,
                lora_requests=lora_requests,
                multi_modal_input=multi_modal_input,
                slot_mapping=slot_mapping,
            )
    
        def _prepare_decode(
            self,
            seq_group_metadata_list: List[SequenceGroupMetadata],
        ) -> PrepareDecodeMetadata:
            input_tokens: List[int] = []
            input_positions: List[int] = []
            slot_mapping: List[int] = []
            seq_lens: List[int] = []
            block_tables: List[List[int]] = []
            lora_index_mapping: List[int] = []
            lora_prompt_mapping: List[int] = []
            lora_requests: List[LoRARequest] = []
    
    
            if len(seq_group_metadata_list) == 0:
                return PrepareDecodeMetadata.empty()
    
            for seq_group_metadata in seq_group_metadata_list:
                if seq_group_metadata.is_prompt:
                    raise ValueError("seq_group_metadata should not be a prompt")
                if seq_group_metadata.token_chunk_size != 1:
                    raise ValueError("token_chunk_size should be 1")
    
                seq_ids = list(seq_group_metadata.seq_data.keys())
                lora_id = seq_group_metadata.lora_int_id
    
                lora_requests.append(seq_group_metadata.lora_request)
    
                for seq_id in seq_ids:
                    seq_data = seq_group_metadata.seq_data[seq_id]
                    generation_token = seq_data.get_last_token_id()
                    input_tokens.append(generation_token)
    
                    seq_len = seq_data.get_len()
                    position = seq_len - 1
                    input_positions.append(position)
    
                    seq_len = seq_len if self.sliding_window is None else min(seq_len, self.sliding_window)
                    seq_lens.append(seq_len)
    
                    block_table = seq_group_metadata.block_tables[seq_id]
                    block_number = block_table[position // self.block_size]
                    block_offset = position % self.block_size
                    slot = block_number * self.block_size + block_offset
                    slot_mapping.append(slot)
                    lora_index_mapping.append(lora_id)
                    lora_prompt_mapping.append(lora_id)
    
                    if self.sliding_window is not None:
                        sliding_window_blocks = self.sliding_window // self.block_size
                        block_table = block_table[-sliding_window_blocks:]
                    block_tables.append(block_table)
    
            max_seq_len = max(seq_lens)
            seq_lens_tensor = torch.tensor(seq_lens, dtype=torch.int, device=self.device)
    
            max_block_table_len = max(len(block_table) for block_table in block_tables)
            block_tables = make_tensor_with_pad(
                block_tables,
                max_len=max_block_table_len,
                pad=0,
                dtype=torch.int,
                device=self.device,
            )
    
            attn_metadata = self.attn_backend.make_metadata(
                is_prompt=False,
                seq_lens=None,
                seq_lens_tensor=seq_lens_tensor,
                max_query_len=None,
                max_seq_len=max_seq_len,
                subquery_start_loc=None,
                seq_start_loc=None,
                context_lens_tensor=None,
                block_tables=block_tables,
                use_cuda_graph=False,
            )
            return PrepareDecodeMetadata(
                input_tokens=input_tokens,
                input_positions=input_positions,
                attn_metadata=attn_metadata,
                lora_index_mapping=lora_index_mapping,
                lora_prompt_mapping=lora_prompt_mapping,
                lora_requests=lora_requests,
                slot_mapping=slot_mapping,
            )
    
        def prepare_input_tensors(
            self,
            seq_group_metadata_list: List[SequenceGroupMetadata],
        ) -> Tuple[
            torch.Tensor, torch.Tensor, AttentionMetadata, SamplingMetadata, List[LoRARequest], LoRAMapping, torch.Tensor
        ]:
            if self.is_driver_worker:
                prefill_reqs = []
                decode_reqs = []
                for seq_group_meta in seq_group_metadata_list:
                    if seq_group_meta.is_prompt:
                        prefill_reqs.append(seq_group_meta)
                    else:
                        decode_reqs.append(seq_group_meta)
    
                # Prepare input tensors.
                (
                    input_tokens,
                    input_positions,
                    prefill_attn_metadata,
                    seq_lens,
                    query_lens,
                    lora_index_mapping,
                    lora_prompt_mapping,
                    lora_requests,
                    multi_modal_input,
                    slot_mapping,
                ) = self._prepare_prompt(prefill_reqs)
                (
                    decode_input_tokens,
                    decode_input_positions,
                    decode_attn_metadata,
                    decode_lora_index_mapping,
                    decode_lora_prompt_mapping,
                    decode_lora_requests,
                    decode_slot_mapping,
                ) = self._prepare_decode(decode_reqs)
                sampling_metadata = SamplingMetadata.prepare(
                    seq_group_metadata_list, seq_lens, query_lens, self.device, self.pin_memory
                )
    
                if not self.scheduler_config.chunked_prefill_enabled and prefill_reqs and decode_reqs:
                    raise ValueError("Cannot have both prefill and decode requests when chunked_prefill_enabled is False")
    
                num_prefills = len(seq_lens)
                num_prefill_tokens = len(input_tokens)
                num_decode_tokens = len(decode_input_tokens)
    
                # Coalesce tensors. Note that attn_metadata is currently not
                # coalesced for simplicity.
                input_tokens.extend(decode_input_tokens)
                input_positions.extend(decode_input_positions)
                slot_mapping.extend(decode_slot_mapping)
                lora_index_mapping.extend(decode_lora_index_mapping)
                lora_prompt_mapping.extend(decode_lora_prompt_mapping)
                lora_requests.extend(decode_lora_requests)
    
                input_tokens = torch.tensor(input_tokens, dtype=torch.long, device=self.device)
                input_positions = torch.tensor(input_positions, dtype=torch.long, device=self.device)
                slot_mapping = torch.tensor(slot_mapping, dtype=torch.long, device=self.device)
    
                if self.lora_config:
                    lora_mapping = LoRAMapping(
                        lora_index_mapping,
                        lora_prompt_mapping,
                    )
                else:
                    lora_mapping = None
    
                # Broadcast the metadata.
                # If batch contains both prefill and decode, it sends 2 broadcasts.
                # If it only contains 1 type, it triggers a single broadcast.
                if prefill_attn_metadata is not None and decode_attn_metadata is not None:
                    batch_type = BatchType.MIXED
                elif prefill_attn_metadata is not None:
                    batch_type = BatchType.PREFILL
                else:
                    batch_type = BatchType.DECODE
    
                metadata_dict = {
                    "input_tokens": input_tokens,
                    "input_positions": input_positions,
                    "selected_token_indices": sampling_metadata.selected_token_indices,
                    "lora_requests": lora_requests,
                    "lora_mapping": lora_mapping,
                    "multi_modal_input": multi_modal_input,
                    "num_prefill_tokens": num_prefill_tokens,
                    "num_decode_tokens": num_decode_tokens,
                    "slot_mapping": slot_mapping,
                    "num_prefills": num_prefills,
                    "batch_type": batch_type,
                }
                if prefill_attn_metadata is not None:
                    metadata_dict.update(prefill_attn_metadata.asdict_zerocopy())
                else:
                    if decode_attn_metadata is None:
                        raise ValueError("decode_attn_metadata is None")
                    metadata_dict.update(decode_attn_metadata.asdict_zerocopy())
                broadcast_tensor_dict(metadata_dict, src=0)
    
                # Broadcast decode attn metadata for mixed batch type.
                # The additional broadcast costs 300us overhead on 4 A10 GPUs.
                # We can potentially reduce the overhead by coelescing tensors.
                if batch_type == BatchType.MIXED:
                    if decode_attn_metadata is None:
                        raise ValueError("decode_attn_metadata is None")
                    metadata_dict = decode_attn_metadata.asdict_zerocopy()
                    broadcast_tensor_dict(metadata_dict, src=0)
            else:
                metadata_dict = broadcast_tensor_dict(src=0)
                input_tokens = metadata_dict.pop("input_tokens")
                input_positions = metadata_dict.pop("input_positions")
                slot_mapping = metadata_dict.pop("slot_mapping")
                num_prefills = metadata_dict.pop("num_prefills")
                selected_token_indices = metadata_dict.pop("selected_token_indices")
                lora_mapping = metadata_dict.pop("lora_mapping")
                lora_requests = metadata_dict.pop("lora_requests")
                multi_modal_input = metadata_dict.pop("multi_modal_input")
                num_prefill_tokens = metadata_dict.pop("num_prefill_tokens")
                num_decode_tokens = metadata_dict.pop("num_decode_tokens")
                batch_type = metadata_dict.pop("batch_type")
    
                # Create an attention metadata.
                prefill_attn_metadata = None
                decode_attn_metadata = None
                if batch_type == BatchType.PREFILL or batch_type == BatchType.MIXED:
                    prefill_attn_metadata = self.attn_backend.make_metadata(**metadata_dict)
                else:
                    decode_attn_metadata = self.attn_backend.make_metadata(**metadata_dict)
                sampling_metadata = SamplingMetadata(
                    seq_groups=None,
                    selected_token_indices=selected_token_indices,
                    categorized_sample_indices=None,
                    num_prompts=0,
                )
    
                # if it is a mixed batch, decode attn_metadata is broadcasted
                # separately.
                if batch_type == BatchType.MIXED:
                    metadata_dict = broadcast_tensor_dict(src=0)
                    decode_attn_metadata = self.attn_backend.make_metadata(**metadata_dict)
    
            attn_metadata = AttentionMetadata(
                num_prefills=num_prefills,
                slot_mapping=slot_mapping,
                num_prefill_tokens=num_prefill_tokens,
                num_decode_tokens=num_decode_tokens,
                prefill_metadata=prefill_attn_metadata,
                decode_metadata=decode_attn_metadata,
                kv_cache_dtype=self.kv_cache_torch_dtype,
            )
    
            return (
                input_tokens,
                input_positions,
                attn_metadata,
                sampling_metadata,
                lora_requests,
                lora_mapping,
                multi_modal_input,
            )
    
        @torch.inference_mode()
        def execute_model(
            self,
            seq_group_metadata_list: List[SequenceGroupMetadata],
            kv_caches: List[Tuple[torch.Tensor, torch.Tensor]],
        ) -> Optional[SamplerOutput]:
            (input_tokens, input_positions, attn_metadata, sampling_metadata, lora_requests, _, _) = (
                self.prepare_input_tensors(seq_group_metadata_list)
            )
    
            # Currently cuda graph is only supported by the decode phase.
            model_executable = self.model
            execute_model_kwargs = {
                "input_ids": input_tokens,
                "positions": input_positions,
                "kv_caches": kv_caches,
                "attn_metadata": attn_metadata,
                "lora_requests": lora_requests,
            }
            hidden_states = model_executable(**execute_model_kwargs)
    
            # Only perform sampling in the driver worker.
            if not self.is_driver_worker:
                return None
    
            # Sample the next token.
            output = self.model.sample(
                logits=hidden_states,
                sampling_metadata=sampling_metadata,
            )
    
            return output
  • vllm_npu/vllm_npu/worker/ascend_worker.py:实现昇腾后端所使用的AscendWorker类。
    """A Ascend worker class."""
    import gc
    from typing import Dict, List, Tuple, Set, Optional, Any
    import torch
    import torch.distributed
    from vllm.config import (CacheConfig, DeviceConfig, ModelConfig, LoadConfig,
                             ParallelConfig, SchedulerConfig, LoRAConfig, VisionLanguageConfig)
    from vllm.model_executor import set_random_seed
    from vllm.distributed import (broadcast_tensor_dict,
                                  ensure_model_parallel_initialized,
                                  init_distributed_environment)
    from vllm.sequence import SamplerOutput, ExecuteModelRequest
    from vllm.worker.cache_engine import CacheEngine
    from vllm.lora.request import LoRARequest
    from vllm.worker.worker_base import WorkerBase
    from vllm.worker.worker import raise_if_cache_size_invalid
    from vllm_npu.worker.ascend_model_runner import AscendModelRunner
    class AscendWorker(WorkerBase):
        """A worker class that executes the model on a group of Ascend NPUs.
        """
        def __init__(
            self,
            model_config: ModelConfig,
            parallel_config: ParallelConfig,
            scheduler_config: SchedulerConfig,
            device_config: DeviceConfig,
            cache_config: CacheConfig,
            load_config: LoadConfig,
            local_rank: int,
            rank: int,
            distributed_init_method: str,
            lora_config: Optional[LoRAConfig] = None,
            vision_language_config: Optional[VisionLanguageConfig] = None,
            is_driver_worker: bool = False,
        ) -> None:
            self.model_config = model_config
            self.parallel_config = parallel_config
            self.scheduler_config = scheduler_config
            self.device_config = device_config
            self.cache_config = cache_config
            self.local_rank = local_rank
            self.rank = rank
            self.distributed_init_method = distributed_init_method
            self.lora_config = lora_config
            self.load_config = load_config
            self.is_driver_worker = is_driver_worker
            if self.is_driver_worker:
                assert self.rank == 0, "The driver worker must have rank 0."
            if self.model_config.trust_remote_code:
                # note: lazy import to avoid importing torch before initializing
                from vllm.utils import init_cached_hf_modules
                init_cached_hf_modules()
            self.vision_language_config = vision_language_config
            mindie_model_config = {
                'backend_type': 'atb',
                'model_id': model_config.model,
                'rank': rank,
                'local_rank': local_rank,
                'world_size': parallel_config.world_size,
                'npu_device_id': local_rank,
                "trust_remote_code": model_config.trust_remote_code,
                'inference_mode': 2 if scheduler_config.chunked_prefill_enabled 
                    or cache_config.enable_prefix_caching else 0,
            }
            self.model_runner = AscendModelRunner(
                model_config,
                parallel_config,
                scheduler_config,
                device_config,
                load_config=load_config,
                lora_config=self.lora_config,
                kv_cache_dtype=self.cache_config.cache_dtype,
                is_driver_worker=is_driver_worker,
                mindie_model_config=mindie_model_config)
            # Uninitialized cache engine. Will be initialized by
            # self.initialize_cache().
            self.cache_engine: CacheEngine
            self.gpu_cache: List[torch.Tensor]
        def init_device(self) -> None:
            self.device = torch.device(f"npu:{self.local_rank}")
            torch.npu.set_device(self.device)
            # Initialize the distributed environment.
            init_worker_distributed_environment(self.parallel_config, self.rank,
                                                self.distributed_init_method,
                                                self.local_rank)
            # Initialize the model.
            set_random_seed(self.model_config.seed)
        def load_model(self):
            self.model_runner.load_model()
        @torch.inference_mode()
        def determine_num_available_blocks(self) -> Tuple[int, int]:
            """Profiles the peak memory usage of the model and returns the maximum
            number of NPU and CPU cache blocks that can be allocated.
            """
            # Profile the memory usage of the model and get the maximum number of
            # cache blocks that can be allocated with the remaining free memory.
            torch.npu.empty_cache()
            torch.npu.reset_peak_memory_stats()
            # Execute a forward pass with dummy inputs to profile the memory usage
            # of the model.
            self.model_runner.profile_run()
            block_size = self.cache_config.block_size
            dummy_block_size = 128
            dummy_num_blocks = dummy_block_size // block_size
            # Calculate the number of blocks that can be allocated with the
            # profiled peak memory.
            torch.npu.synchronize()
            peak_memory = torch.npu.max_memory_allocated()
            total_gpu_memory = torch.npu.get_device_properties(self.rank).total_memory
            cache_block_size = CacheEngine.get_cache_block_size(
                self.cache_config, self.model_config, self.parallel_config)
            num_gpu_blocks = int(
                (total_gpu_memory * self.cache_config.gpu_memory_utilization - peak_memory) //
                cache_block_size) + dummy_num_blocks
            num_cpu_blocks = int(self.cache_config.swap_space_bytes // cache_block_size)
            num_gpu_blocks = max(num_gpu_blocks, 0)
            num_cpu_blocks = max(num_cpu_blocks, 0)
            if self.model_runner.lora_manager:
                self.model_runner.remove_all_loras()
            gc.collect()
            torch.npu.empty_cache()
            return num_gpu_blocks, num_cpu_blocks
        def initialize_cache(self, num_gpu_blocks: int,
                             num_cpu_blocks: int) -> None:
            raise_if_cache_size_invalid(num_gpu_blocks,
                                        self.cache_config.block_size,
                                        self.model_config.max_model_len)
            self.cache_config.num_gpu_blocks = num_gpu_blocks
            self.cache_config.num_cpu_blocks = num_cpu_blocks
            
            self._init_cache_engine()
            self._warm_up_model()
        def _init_cache_engine(self):
            assert self.cache_config.num_gpu_blocks is not None
            self.cache_engine = CacheEngine(self.cache_config, self.model_config,
                                            self.parallel_config)
            self.gpu_cache = self.cache_engine.gpu_cache
            self.model_runner.set_block_size(self.cache_engine.block_size)
        def _warm_up_model(self) -> None:
            pass
        def cache_swap(
            self,
            blocks_to_swap_in: Dict[int, int],
            blocks_to_swap_out: Dict[int, int],
            blocks_to_copy: Dict[int, List[int]],
        ) -> None:
            if blocks_to_swap_in:
                self.cache_engine.swap_in(blocks_to_swap_in)
            if blocks_to_swap_out:
                self.cache_engine.swap_out(blocks_to_swap_out)
            if blocks_to_copy:
                self.cache_engine.copy(blocks_to_copy)
        @torch.inference_mode()
        def execute_model(
            self,
            execute_model_req: Optional[ExecuteModelRequest] = None
        ) -> List[SamplerOutput]:
            if execute_model_req is None:
                seq_group_metadata_list = None
            else:
                seq_group_metadata_list = execute_model_req.seq_group_metadata_list
            if self.is_driver_worker:
                assert seq_group_metadata_list is not None
                assert execute_model_req is not None
                num_seq_groups = len(seq_group_metadata_list)
                blocks_to_swap_in = execute_model_req.blocks_to_swap_in
                blocks_to_swap_out = execute_model_req.blocks_to_swap_out
                blocks_to_copy = execute_model_req.blocks_to_copy
                data: Dict[str, Any] = {
                    "num_seq_groups": num_seq_groups,
                    "blocks_to_swap_in": blocks_to_swap_in,
                    "blocks_to_swap_out": blocks_to_swap_out,
                    "blocks_to_copy": blocks_to_copy,
                }
                broadcast_tensor_dict(data, src=0)
            else:
                data = broadcast_tensor_dict(src=0)
                num_seq_groups = data["num_seq_groups"]
                blocks_to_swap_in = data["blocks_to_swap_in"]
                blocks_to_swap_out = data["blocks_to_swap_out"]
                blocks_to_copy = data["blocks_to_copy"]
            self.cache_swap(blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy)
            # If there is no input, we don't need to execute the model.
            if num_seq_groups == 0:
                return []
            output = self.model_runner.execute_model(seq_group_metadata_list,
                                                     self.gpu_cache)
            # Worker only supports single-step execution. Wrap the output in a list
            # to conform to interface.
            return [output]
        def add_lora(self, lora_request: LoRARequest) -> bool:
            return self.model_runner.add_lora(lora_request)
        def remove_lora(self, lora_id: int) -> bool:
            return self.model_runner.remove_lora(lora_id)
        def list_loras(self) -> Set[int]:
            return self.model_runner.list_loras()
        def get_cache_block_size_bytes(self) -> int:
            """Get the size of the KV cache block size in bytes.
            """
            return CacheEngine.get_cache_block_size(self.cache_config,
                                                    self.model_config,
                                                    self.parallel_config)
    def init_worker_distributed_environment(
        parallel_config: ParallelConfig,
        rank: int,
        distributed_init_method: Optional[str] = None,
        local_rank: int = -1,
    ) -> None:
        """Initialize the distributed environment."""
        init_distributed_environment(parallel_config.world_size, rank,
                                     distributed_init_method, local_rank)
        ensure_model_parallel_initialized(parallel_config.tensor_parallel_size,
                                          parallel_config.pipeline_parallel_size)
  • vllm_npu/vllm_npu/worker/cache_engine.py:重新实现_allocate_kv_cache函数,以便以元组的形式进行kv_cache的创建。
    from typing import Tuple, List
    import torch
    KVCache = Tuple[torch.Tensor, torch.Tensor]
    def _allocate_kv_cache(
        self,
        num_blocks: int,
        device: str,
    ) -> List[KVCache]:
        """Allocates KV cache on the specified device."""
        kv_cache: List[KVCache] = []
        key_block_shape = (self.block_size, self.num_heads, self.head_size)
        value_block_shape = (self.block_size, self.num_heads, self.head_size)
        for _ in range(self.num_layers):
            key_blocks = torch.empty(
                size=(num_blocks, *key_block_shape),
                dtype=self.dtype,
                device=device,
            )
            value_blocks = torch.empty(
                size=(num_blocks, *value_block_shape),
                dtype=self.dtype,
                device=device,
            )
            kv_cache.append((key_blocks, value_blocks))
        return kv_cache
  • vllm_npu/vllm_npu/config.py:重写DeviceConfig类以适配npu类型的device;重写_get_and_verify_max_len函数以解决一些模型config加载时的问题。
    import warnings
    from typing import Optional
    import torch
    from transformers import PretrainedConfig
    from vllm.logger import init_logger
    logger = init_logger(__name__)
    class DeviceConfig:
        def __init__(self, device: str = "auto") -> None:
            if device == "auto":
                # Automated device type detection
                if getattr(torch.version, "cann", None) is not None:
                    self.device_type = "npu"
                else:
                    warnings.warn(
                        "Failed to detect cann in your environment. \
                        Please check whether you have installed cann correctly. \
                        Now the device type for processing input is set to cpu."
                    )
                    self.device_type = "cpu"
            else:
                # Device type is assigned explicitly
                self.device_type = device
            self.device = torch.device(self.device_type)
    def _get_and_verify_max_len(
        hf_config: PretrainedConfig,
        max_model_len: Optional[int],
    ) -> int:
        """Get and verify the model's maximum length."""
        derived_max_model_len = float("inf")
        possible_keys = [
            # OPT
            "max_position_embeddings",
            # GPT-2
            "n_positions",
            # MPT
            "max_seq_len",
            # ChatGLM2
            "seq_length",
            # Command-R
            "model_max_length",
            # Others
            "max_sequence_length",
            "max_seq_length",
            "seq_len",
        ]
        for key in possible_keys:
            max_len_key = getattr(hf_config, key, None)
            if max_len_key is not None:
                derived_max_model_len = min(derived_max_model_len, max_len_key)
        if derived_max_model_len == float("inf"):
            if max_model_len is not None:
                # If max_model_len is specified, we use it.
                return max_model_len
            default_max_len = 2048
            logger.warning(
                "The model's config.json does not contain any of the following "
                "keys to determine the original maximum length of the model: "
                f"{possible_keys}. Assuming the model's maximum length is "
                f"{default_max_len}."
            )
            derived_max_model_len = default_max_len
        rope_scaling = getattr(hf_config, "rope_scaling", None)
        if rope_scaling is not None:
            if "type" in rope_scaling:
                rope_type = rope_scaling["type"]
            elif "rope_type" in rope_scaling:
                rope_type = rope_scaling["rope_type"]
            else:
                raise ValueError("rope_scaling must have a 'type' or 'rope_type' key.")
            # The correct one should be "longrope", kept "su" here
            # to be backward compatible
            if rope_type not in ("su", "longrope", "llama3"):
                if "factor" not in rope_scaling:
                    raise ValueError("rope_scaling must have a 'factor' key.")
                scaling_factor = rope_scaling["factor"]
                if rope_type == "yarn":
                    derived_max_model_len = rope_scaling["original_max_position_embeddings"]
                derived_max_model_len *= scaling_factor
        if max_model_len is None:
            max_model_len = derived_max_model_len
        elif max_model_len > derived_max_model_len:
            raise ValueError(
                f"User-specified max_model_len ({max_model_len}) is greater than "
                f"the derived max_model_len ({max_len_key}={derived_max_model_len}"
                " in model's config.json). This may lead to incorrect model "
                "outputs or CUDA errors. Make sure the value is correct and "
                "within the model context size."
            )
        return int(max_model_len)
  • vllm_npu/vllm_npu/utils.py:添加是否是昇腾后端的判断函数;重写get_ip函数以修复多卡运行时卡住的问题。
    # Part of codes in this file was copied from project [vLLM Team][vllm]
    import socket
    import warnings
    from functools import lru_cache
    import vllm.envs as envs
    @lru_cache(maxsize=None)
    def is_ascend() -> bool:
        try:
            import torch_npu
        except ImportError:
            torch_npu = None
        return torch_npu is not None
    def get_ip() -> str:
        host_ip = envs.VLLM_HOST_IP
        if host_ip:
            return host_ip
        # IP is not set, try to get it from the network interface
        # try ipv4
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        try:
            s.connect(("localhost", 80))  # Doesn't need to be reachable
            socket_name = s.getsockname()[0]
            s.close()
            return socket_name
        except Exception:
            warnings.warn("Encounted with connection errors. Using 0.0.0.0 by default.")
            s.close()
        # try ipv6
        try:
            s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
            # Google's public DNS server, see
            # https://developers.google.com/speed/public-dns/docs/using#addresses
            s.connect(("localhost", 80))  # Doesn't need to be reachable
            socket_name = s.getsockname()[0]
            s.close()
            return socket_name
        except Exception:
            warnings.warn("Encounted with connection errors. Using 0.0.0.0 by default.")
            s.close()
        s.close()
        warnings.warn(
            "Failed to get the IP address, using 0.0.0.0 by default."
            "The value can be set by the environment variable"
            " VLLM_HOST_IP or HOST_IP.",
            stacklevel=2)
        return "0.0.0.0"
  • vllm_npu/vllm_npu/npu_adaptor.py:编写mocky patch去屏蔽掉vllm原生会造成报错的一些import操作
    import importlib
    import sys
    def replace_modules(old_module_name, new_module_name):
        if old_module_name in sys.modules:
            del sys.modules[old_module_name]
        sys.modules[old_module_name] = importlib.import_module(new_module_name)
    _default_ops = (
        'xformers',
        'xformers.ops',
        'vllm._C',
        'xformers.ops.fmha.attn_bias',
        'vllm.model_executor.layers.ops.sample'
    )
    for _ops in _default_ops:
        replace_modules(_ops, 'vllm_npu')
    # dummy class to avoid import error.
    class ops:
        pass
    class cuda_utils:
        pass
    class cache_ops:
        pass
    class BlockDiagonalCausalMask:
        pass
    class LowerTriangularMaskWithTensorBias:
        pass
    def context_attention_fwd():
        pass
    def get_num_triton_sampler_splits():
        pass
    def sample():
        pass
  • vllm_npu/vllm_npu/__init__.py:做子模块以外的公共模块的热替换,已经mocky patch的应用。
    import torch
    import torch_npu
    from torch_npu.contrib import transfer_to_npu
    from vllm_npu.npu_adaptor import (BlockDiagonalCausalMask,
                                      LowerTriangularMaskWithTensorBias,
                                      cache_ops, cuda_utils, ops,
                                      context_attention_fwd,
                                      get_num_triton_sampler_splits, sample)
    import vllm_npu.core
    import vllm_npu.engine
    import vllm_npu.worker
    import vllm_npu.model_executor
    import vllm_npu.executor
    import vllm_npu.attention
    import vllm_npu.usage
    from vllm_npu.utils import get_ip
    from vllm_npu.config import DeviceConfig, _get_and_verify_max_len
    import vllm.utils as utils
    import vllm.executor.ray_utils as ray_utils
    import vllm.config as vconfig
    import vllm.engine.arg_utils as varg_utils
    utils.get_ip = get_ip
    ray_utils.get_ip = get_ip
    vconfig.DeviceConfig = DeviceConfig
    vconfig._get_and_verify_max_len = _get_and_verify_max_len
    varg_utils.DeviceConfig = DeviceConfig
    __version__ = "0.4.2"
  • examples/start_server.sh:在线模式运行vllm的示例脚本。
    #!/bin/bash
    export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors
    python -m vllm.entrypoints.openai.api_server --model=facebook/opt-125m --trust-remote-code --enforce-eager --worker-use-ray
  • examples/offline_inference.py:离线模式运行vllm的示例脚本。
    from vllm import LLM, SamplingParams
    import json
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('--model_path', type=str, default="facebook/opt-125m")
    # input prompts for test
    prompts = [
        "Hello, my name is",
        "The president of the United States is",
        "The capital of France is",
        "The future of AI is",
    ]
    sampling_params = SamplingParams(max_tokens=512, temperature=0)
    args = parser.parse_args()
    model_path = args.model_path
    llm = LLM(model=model_path,
              block_size=128,
              max_model_len=4096, # max length of prompt
              tensor_parallel_size=8, # number of NPUs to be used
              max_num_seqs=256, # max batch number
              enforce_eager=True, # disable CUDA graph mode
              trust_remote_code=True, # If the model is a custom model not yet available in the HuggingFace transformers library
              worker_use_ray=True,
    )
    outputs = llm.generate(prompts, sampling_params)
    for i, output in enumerate(outputs):
        prompt = output.prompt
        generated_text = output.outputs[0].text
        print(f"req_num: {i}\nPrompt: {prompt!r}\nGenerated text: {generated_text!r}")
  • examples/offline_inference.sh:离线模式运行vllm的示例脚本。
    #!/bin/bash
    export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors
    python3 offline_inference.py --model_path facebook/opt-125m
  • install.sh:一键安装脚本。
    #!/bin/bash
    if [ -d "./vllm" ]; then
        echo "./vllm directory has already exist!"
        exit 1
    fi
    git clone -b v0.4.2 https://github.com/vllm-project/vllm.git vllm
    yes | cp -r cover/* vllm/
    cd vllm
    pip install -r requirements-ascend.txt
    python3 setup.py install
    cd ../vllm_npu
    pip install -r requirements.txt
    python3 setup.py install
  • README.md:
    # Vllm-MindIE
    ## 介绍
    昇腾推理引擎对接vLLM开源框架v0.4.2稳定版本补丁
    ## 适配方案
    在昇腾环境中适配vLLM框架的方案如下:
    - 上层维持vLLM框架原生逻辑,包括请求调度、batch构建以及通过Ray分布式框架启动多卡服务等功能;
    - 下层的模型推理与后处理则通过MindIE-LLM提供的`GeneratorTorch`统一接口,接入MindIE模型仓进行统一管理,从而利用整图加速库实现模型推理加速。
    ## 支持特性
    0.4.2版本已经支持的功能特性如下
    - 浮点模型推理
    - 量化模型推理
    - LoRA模型推理
    - SplitFuse
    ## 环境准备
    ### 依赖版本
    - Vllm-MindIE适配仓代码配套可运行的硬件型号
      - Atlas 800I A2(32GB/64GB显存)
    - Vllm-MindIE适配仓代码运行相关配套软件
      - 系统OS
      - 驱动(HDK)
      - CANN
      - Python
      - PTA
      - 开源软件依赖
    - 版本配套关系
      - 当前Vllm-MindIE适配仓需基于CANN包8.0版本及以上,Python 3.10,torch 2.1.0进行环境部署与运行
    ### 环境配置
    环境配置的详细过程参照[MindIE-LLM模型仓](https://gitee.com/ascend/MindIE-LLM)
    ## 安装教程
    确保昇腾推理基础环境安装完成后,执行`install.sh`文件即可完成vllm及昇腾补丁的安装:
    ```sh
    bash install.sh
    ```
    ## 使用说明
    这里提供了vllm离线模式与在线服务的启动demo作为参考。
    **注:请根据实际情况修改运行脚本offline_inference.sh和start_server.sh中的模型路径参数以使用特定的模型进行推理**
    - 离线模式:使用前先设置offline_inference.sh脚本中的model_path参数为推理使用的模型路径
        ```sh
        cd examples
        bash offline_inference.sh
        ```
    - 在线服务:使用前先设置start_server.sh脚本中的model参数为推理使用的模型路径;此外,常用的其他参数配置如下:
        - -tp n:设置模型推理使用的卡数
        - --port port_num:指定推理服务使用的端口号
        - --max-num-seqs bs:设置推理服务支持的最大batch size
        - 配置使用特定的若干张卡进行推理:添加环境变量,如想使用前四卡进行推理,则在脚本中python命令之前添加如下命令:
        ```sh
        export ASCEND_RT_VISIBLE_DEVICES=0,1,2,3
        ```
        完成上述参数配置后,运行:
        ```sh
        cd examples
        bash start_server.sh
        ```
    - 多LoRA:需要在multilora_inference.sh文件中设置如下参数:
        - model-path参数:指定推理使用的基础模型路径
        - lora-modules参数:指定推理使用的LoRA模型,格式为{LoRA模型名=LoRA模型路径},可以指定多个LoRA模型(**注:在该测试用例中至少要指定两个lora模型才能运行**)
        完成上述参数配置后,运行:
        ```sh
        cd examples
        bash multilora_inference.sh
        ```
    - Split Fuse: 使用前先设置test_split_fuse.sh脚本中的model_path参数为推理使用的模型路径
        ```sh
        cd examples
        bash test_split_fuse.sh
        ```
    ## 详细说明
    `vllm_npu_0.4.2` 补丁包修改了六个主要模块:`attention`、`engine`、`usage`、`executor`、`model_executor` 和 `worker`。这些模块通过热替换确保与 Ascend 后端兼容,同时保持与 vLLM 原生框架中对应模块的一致性。以下是各模块的修改内容:
    ### Attention 模块
    在该模块中,定义了`AscendAttentionBackend`类以整合Ascend后端做attention计算所需要的数据
    ### Engine 模块
    该模块引入了一些关键更改,以确保能够正确进入补丁仓的`executor` 模块,通过重新实现`LLMEngine`和`AsyncLLMEngine`的`from_engine_args`函数实现。此外,在 Ray 模块初始化时,将 `num_gpus` 参数设置为 `parallel_config.world_size`,解决了多卡服务启动问题。
    ### Usage 模块
    该模块主要为了解决vLLM原生框架的`usage_lib`在Ascend环境下获取设备信息报错的问题
    ### Executor 模块
    该模块中定义了`AscendExecutor`和`RayAscendExecutor`类,用于正确对接后续的`AscendWorker`类。
    ### Model Executor 模块
    该模块负责与 MindIE-LLM 推理和后处理管道的对接,包含两个子模块:`layers` 和 `models`。`layers` 子模块主要负责后处理操作,而 `models` 子模块专注于模型推理。
    在 `models` 子模块中,创建了一个名为 `MindIELlmWrapper` 的类。该类实例化了 MindIE-LLM 提供的统一接口 `GeneratorTorch`,并从 vLLM 原生框架的数据结构中提取 MindIE-LLM 所需的推理参数,进而通过统一接口进行推理。此外,该类还负责在预热时构造虚拟数据。
    同时添加了 `ascend_model_loader.py` 文件,提供 `get_model` 方法,用于在 `ModelRunner` 中加载模型。
    ### Worker 模块
    该模块定义了 `AscendWorker` 类和 `AscendModelRunner` 类,分别定义在`ascend_worker.py` 和 `ascend_model_runner.py` 文件中,用于与 `MindIELlmWrapper` 进行交互,确保模型的加载、预热、推理和后处理过程能够正确执行。此外,还修改了原框架中 `CacheEngine` 的 `_allocate_kv_cache` 函数,确保与后端接收的参数形式一致。

    将上述所有代码内容,按照项目的目录结构完全还原后,进入项目根目录执行如下脚本,即可完成vLLM昇腾适配版的安装。

    bash install.sh