- cover/requirements-ascend.txt:昇腾环境上运行所需的python依赖。
cmake >= 3.21
ninja # For faster builds.
psutil
sentencepiece # Required for LLaMA tokenizer.
numpy
requests
py-cpuinfo
transformers >= 4.40.0 # Required for StarCoder2 & Llava, Llama 3.
tokenizers >= 0.19.1 # Required for Llama 3.
fastapi
openai
uvicorn[standard]
pydantic >= 2.0 # Required for OpenAI server.
prometheus_client >= 0.18.0
prometheus-fastapi-instrumentator >= 7.0.0
tiktoken == 0.6.0 # Required for DBRX tokenizer
lm-format-enforcer == 0.10.1
typing_extensions
filelock >= 3.10.4 # filelock starts to support `mode` argument from 3.10.4
ray == 2.9.3
pynvml == 11.5.0
outlines == 0.0.34
- cover/setup.py:适配vllm的安装,在其中添加NPU后端,并更新相关的版本名称、依赖模块等内容。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242 | import importlib.util
import io
import logging
import os
import re
import subprocess
import sys
from shutil import which
from typing import Dict, List
import torch
from packaging.version import Version, parse
from setuptools import Extension, find_packages, setup
from setuptools.command.build_ext import build_ext
from torch.utils.cpp_extension import CUDA_HOME, BuildExtension
def load_module_from_path(module_name, path):
spec = importlib.util.spec_from_file_location(module_name, path)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
return module
ROOT_DIR = os.path.dirname(__file__)
logger = logging.getLogger(__name__)
# cannot import envs directly because it depends on vllm,
# which is not installed yet
envs = load_module_from_path('envs', os.path.join(ROOT_DIR, 'vllm', 'envs.py'))
VLLM_TARGET_DEVICE = 'npu'
# vLLM only supports Linux platform
assert sys.platform.startswith(
"linux"), "vLLM only supports Linux platform (including WSL)."
MAIN_CUDA_VERSION = "12.1"
def is_sccache_available() -> bool:
return which("sccache") is not None
def is_ccache_available() -> bool:
return which("ccache") is not None
def is_ninja_available() -> bool:
return which("ninja") is not None
def remove_prefix(text, prefix):
if text.startswith(prefix):
return text[len(prefix):]
return text
class CMakeExtension(Extension):
def __init__(self, name: str, cmake_lists_dir: str = '.', **kwa) -> None:
super().__init__(name, sources=[], **kwa)
self.cmake_lists_dir = os.path.abspath(cmake_lists_dir)
def _is_cuda() -> bool:
return VLLM_TARGET_DEVICE == "cuda" \
and torch.version.cuda is not None \
and not _is_neuron()
def _is_hip() -> bool:
return (VLLM_TARGET_DEVICE == "cuda"
or VLLM_TARGET_DEVICE == "rocm") and torch.version.hip is not None
def _is_neuron() -> bool:
torch_neuronx_installed = True
try:
subprocess.run(["neuron-ls"], capture_output=True, check=True)
except (FileNotFoundError, PermissionError, subprocess.CalledProcessError):
torch_neuronx_installed = False
return torch_neuronx_installed or envs.VLLM_BUILD_WITH_NEURON
def _is_cpu() -> bool:
return VLLM_TARGET_DEVICE == "cpu"
def _is_npu() -> bool:
return VLLM_TARGET_DEVICE == "npu"
def _install_punica() -> bool:
return envs.VLLM_INSTALL_PUNICA_KERNELS
def get_hipcc_rocm_version():
# Run the hipcc --version command
result = subprocess.run(['hipcc', '--version'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True)
# Check if the command was executed successfully
if result.returncode != 0:
print("Error running 'hipcc --version'")
return None
# Extract the version using a regular expression
match = re.search(r'HIP version: (\S+)', result.stdout)
if match:
# Return the version string
return match.group(1)
else:
print("Could not find HIP version in the output")
return None
def get_neuronxcc_version():
import sysconfig
site_dir = sysconfig.get_paths()["purelib"]
version_file = os.path.join(site_dir, "neuronxcc", "version",
"__init__.py")
# Check if the command was executed successfully
with open(version_file, "rt") as fp:
content = fp.read()
# Extract the version using a regular expression
match = re.search(r"__version__ = '(\S+)'", content)
if match:
# Return the version string
return match.group(1)
else:
raise RuntimeError("Could not find HIP version in the output")
def get_nvcc_cuda_version() -> Version:
"""Get the CUDA version from nvcc.
Adapted from https://github.com/NVIDIA/apex/blob/8b7a1ff183741dd8f9b87e7bafd04cfde99cea28/setup.py
"""
assert CUDA_HOME is not None, "CUDA_HOME is not set"
nvcc_output = subprocess.check_output([CUDA_HOME + "/bin/nvcc", "-V"],
universal_newlines=True)
output = nvcc_output.split()
release_idx = output.index("release") + 1
nvcc_cuda_version = parse(output[release_idx].split(",")[0])
return nvcc_cuda_version
def get_path(*filepath) -> str:
return os.path.join(ROOT_DIR, *filepath)
def find_version(filepath: str) -> str:
"""Extract version information from the given filepath.
Adapted from https://github.com/ray-project/ray/blob/0b190ee1160eeca9796bc091e07eaebf4c85b511/python/setup.py
"""
with open(filepath) as fp:
version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
fp.read(), re.M)
if version_match:
return version_match.group(1)
raise RuntimeError("Unable to find version string.")
def get_vllm_version() -> str:
version = find_version(get_path("vllm", "__init__.py"))
# return version
if _is_cuda():
cuda_version = str(get_nvcc_cuda_version())
if cuda_version != MAIN_CUDA_VERSION:
cuda_version_str = cuda_version.replace(".", "")[:3]
version += f"+cu{cuda_version_str}"
elif _is_hip():
# Get the HIP version
hipcc_version = get_hipcc_rocm_version()
if hipcc_version != MAIN_CUDA_VERSION:
rocm_version_str = hipcc_version.replace(".", "")[:3]
version += f"+rocm{rocm_version_str}"
elif _is_neuron():
# Get the Neuron version
neuron_version = str(get_neuronxcc_version())
if neuron_version != MAIN_CUDA_VERSION:
neuron_version_str = neuron_version.replace(".", "")[:3]
version += f"+neuron{neuron_version_str}"
elif _is_npu():
version += "+npu"
elif _is_cpu():
version += "+cpu"
else:
raise RuntimeError("Unknown runtime environment")
return version
def read_readme() -> str:
"""Read the README file if present."""
p = get_path("README.md")
if os.path.isfile(p):
return io.open(get_path("README.md"), "r", encoding="utf-8").read()
else:
return ""
def get_requirements() -> List[str]:
"""Get Python package dependencies from requirements.txt."""
def _read_requirements(filename: str) -> List[str]:
with open(get_path(filename)) as f:
requirements = f.read().strip().split("\n")
resolved_requirements = []
for line in requirements:
if line.startswith("-r "):
resolved_requirements += _read_requirements(line.split()[1])
else:
resolved_requirements.append(line)
return resolved_requirements
if _is_cuda():
requirements = _read_requirements("requirements-cuda.txt")
cuda_major, cuda_minor = torch.version.cuda.split(".")
modified_requirements = []
for req in requirements:
if "vllm-nccl-cu12" in req:
req = req.replace("vllm-nccl-cu12",
f"vllm-nccl-cu{cuda_major}")
elif ("vllm-flash-attn" in req
and not (cuda_major == "12" and cuda_minor == "1")):
# vllm-flash-attn is built only for CUDA 12.1.
# Skip for other versions.
continue
modified_requirements.append(req)
requirements = modified_requirements
elif _is_hip():
requirements = _read_requirements("requirements-rocm.txt")
elif _is_neuron():
requirements = _read_requirements("requirements-neuron.txt")
elif _is_npu():
requirements = _read_requirements("requirements-ascend.txt")
elif _is_cpu():
requirements = _read_requirements("requirements-cpu.txt")
else:
raise ValueError(
"Unsupported platform, please use CUDA, ROCm, Neuron, NPU or CPU.")
return requirements
ext_modules = []
if _is_cuda():
ext_modules.append(CMakeExtension(name="vllm._moe_C"))
"""
if not _is_neuron():
ext_modules.append(CMakeExtension(name="vllm._C"))
if _install_punica():
ext_modules.append(CMakeExtension(name="vllm._punica_C"))
"""
package_data = {
"vllm": ["py.typed", "model_executor/layers/fused_moe/configs/*.json"]
}
if envs.VLLM_USE_PRECOMPILED:
ext_modules = []
package_data["vllm"].append("*.so")
setup(
name="vllm",
version=get_vllm_version(),
author="vLLM Team",
license="Apache 2.0",
description=("A high-throughput and memory-efficient inference and "
"serving engine for LLMs"),
long_description=read_readme(),
long_description_content_type="text/markdown",
url="https://github.com/vllm-project/vllm",
project_urls={
"Homepage": "https://github.com/vllm-project/vllm",
"Documentation": "https://vllm.readthedocs.io/en/latest/",
},
classifiers=[
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"License :: OSI Approved :: Apache Software License",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
],
packages=find_packages(exclude=("benchmarks", "csrc", "docs", "examples",
"tests*")),
python_requires=">=3.8",
install_requires=get_requirements(),
ext_modules=ext_modules,
extras_require={
"tensorizer": ["tensorizer==2.9.0"],
},
cmdclass={"build_ext": BuildExtension} if not _is_neuron() else {},
package_data=package_data,
)
|
- cover/vllm/__init__.py:在vllm的初始化文件里导入vllm_npu,以实现适配功能的接入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 | """vLLM: a high-throughput and memory-efficient inference engine for LLMs"""
import vllm_npu
from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.engine.llm_engine import LLMEngine
from vllm.entrypoints.llm import LLM
from vllm.executor.ray_utils import initialize_ray_cluster
from vllm.model_executor.models import ModelRegistry
from vllm.outputs import CompletionOutput, RequestOutput
from vllm.sampling_params import SamplingParams
__version__ = "0.4.2"
__all__ = [
"LLM",
"ModelRegistry",
"SamplingParams",
"RequestOutput",
"CompletionOutput",
"LLMEngine",
"EngineArgs",
"AsyncLLMEngine",
"AsyncEngineArgs",
"initialize_ray_cluster",
]
|
- vllm_npu/README.md:空白文件
- vllm_npu/requirements.txt:vllm_npu模块所需要的python依赖库。
numpy
decorator
attrs
psutil
absl-py
cloudpickle
scipy
tornado
transformers
accelerate
pandas
- vllm_npu/setup.py:vllm_npu模块的安装脚本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 | import io
import os
import re
from typing import List
import setuptools
ROOT_DIR = os.path.dirname(__file__)
def get_path(*filepath) -> str:
return os.path.join(ROOT_DIR, *filepath)
def find_version(filepath: str):
"""Extract version information from the given filepath.
"""
with open(filepath) as fp:
version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
fp.read(), re.M)
if version_match:
return version_match.group(1)
raise RuntimeError("Unable to find version string.")
def read_readme() -> str:
"""Read the README file."""
return io.open(get_path("README.md"), "r", encoding="utf-8").read()
def get_requirements() -> List[str]:
"""Get Python package dependencies from requirements.txt."""
with open(get_path("requirements.txt")) as f:
requirements = f.read().strip().split("\n")
return requirements
setuptools.setup(
name="vllm_npu",
version=find_version(get_path("vllm_npu", "__init__.py")),
author="Huawei",
license="Apache 2.0",
description=("A high-throughput and memory-efficient inference and "
"serving engine for LLMs"),
long_description=read_readme(),
long_description_content_type="text/markdown",
url="",
project_urls={
"Homepage": "",
"Documentation": "",
},
classifiers=[
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
],
packages=setuptools.find_packages(exclude=("benchmarks", "examples", "tests")),
python_requires=">=3.8",
install_requires=get_requirements(),
)
|
- vllm_npu/vllm_npu/attention/__init__.py:在attention模块的初始化过程中,对vllm里的get_attn_backend函数进行热替换。
| from vllm_npu.attention.selector import get_attn_backend
import vllm.attention.selector as selector
import vllm.worker.model_runner as mr
import vllm.worker.cache_engine as ce
selector.get_attn_backend = get_attn_backend
mr.get_attn_backend = get_attn_backend
ce.get_attn_backend = get_attn_backend
|
- vllm_npu/vllm_npu/attention/backends.py:昇腾环境下使用的attention后端实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 | # Part of codes in this file was copied from project [vLLM Team][vllm]
from dataclasses import dataclass
from typing import List, Optional, Tuple
from vllm.attention.backends.abstract import (AttentionBackend,
AttentionMetadataPerStage)
import torch
def get_kv_cache_shape(
num_blocks: int,
block_size: int,
num_kv_heads: int,
head_size: int,
) -> Tuple[int, ...]:
return (2, num_blocks, block_size, num_kv_heads, head_size)
class AscendAttentionBackend(AttentionBackend):
@staticmethod
def get_name() -> str:
return "ascend-attn-backend"
@staticmethod
def get_impl_cls():
return None
@staticmethod
def make_metadata(*args, **kwargs) -> "AttentionMetadata":
return AttentionMetadata(*args, **kwargs)
@staticmethod
def get_kv_cache_shape(
num_blocks: int,
block_size: int,
num_kv_heads: int,
head_size: int,
) -> Tuple[int, ...]:
return get_kv_cache_shape(num_blocks, block_size, num_kv_heads, head_size)
@staticmethod
def swap_blocks(
src_kv_cache: torch.Tensor,
dst_kv_cache: torch.Tensor,
src_to_dst: torch.Tensor,
) -> None:
pass
@staticmethod
def copy_blocks(
kv_caches: List[torch.Tensor],
src_to_dists: torch.Tensor,
) -> None:
pass
@dataclass
class AttentionMetadata(AttentionMetadataPerStage):
"""Metadata for AscendAttentionBackend.
"""
# Currently, input sequences can only contain all prompts
# or all decoding. True if all sequences are prompts.
is_prompt: bool
# (batch_size,). The sequence length per sequence. Sequence length means
# the computed tokens + new tokens None if it is a decoding.
seq_lens: Optional[List[int]]
# seq_lens stored as a tensor.
seq_lens_tensor: Optional[torch.Tensor]
# Maximum query length in the batch.
max_query_len: Optional[int]
# Maximum sequence length in the batch.
max_seq_len: Optional[int]
# (batch_size + 1,). The cumulative subquery lengths of the sequences in
# the batch, used to index into subquery. E.g., if the subquery length
# is [4, 6], it is [0, 4, 10].
subquery_start_loc: Optional[torch.Tensor]
# (batch_size + 1,). The cumulative sequence lengths of the sequences in
# the batch, used to index into sequence. E.g., if the sequence length is
# [4, 6], it is [0, 4, 10].
seq_start_loc: Optional[torch.Tensor]
# (batch_size,) A tensor of context lengths (tokens that are computed
# so far).
context_lens_tensor: Optional[torch.Tensor]
block_tables: Optional[torch.Tensor]
# Whether or not if cuda graph is enabled.
use_cuda_graph: bool
|
- vllm_npu/vllm_npu/attention/selector.py:定义返回昇腾后端的get_attn_backend函数,用于替换vllm原生的对应函数。
1
2
3
4
5
6
7
8
9
10
11
12 | # Part of codes in this file was copied from project [vLLM Team][vllm]
from functools import lru_cache
from typing import Type
import torch
from vllm.attention.backends.abstract import AttentionBackend
from vllm.logger import init_logger
logger = init_logger(__name__)
@lru_cache(maxsize=None)
def get_attn_backend(dtype: torch.dtype) -> Type[AttentionBackend]:
logger.info("Using Ascend backend.")
from vllm_npu.attention.backends import AscendAttentionBackend
return AscendAttentionBackend
|
- vllm_npu/vllm_npu/core/__init__.py:空白文件。
- vllm_npu/vllm_npu/engine/__init__.py:在engine模块初始化过程中替换实例化LLMEngine和AsyncLLMEngine类的from_engine_args函数,并替换vllm的arg_utils模块中的EngineArgs类。
| from vllm.engine.llm_engine import LLMEngine
from vllm.engine.async_llm_engine import AsyncLLMEngine
import vllm.engine.arg_utils as v_arg_utils
from vllm_npu.engine.ascend_engine import from_engine_args
from vllm_npu.engine.async_ascend_engine import from_engine_args_async
from .arg_utils import EngineArgs
LLMEngine.from_engine_args = from_engine_args
AsyncLLMEngine.from_engine_args = from_engine_args_async
v_arg_utils.EngineArgs = EngineArgs
|
- vllm_npu/vllm_npu/engine/ascend_engine.py:重写实例化LLMEngine类的from_engine_args函数,以实现昇腾环境下Executor类的载入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 | from vllm.engine.arg_utils import EngineArgs
from vllm.usage.usage_lib import UsageContext
from vllm_npu.executor.ray_utils import initialize_ray_cluster
@classmethod
def from_engine_args(
cls,
engine_args: EngineArgs,
usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
) -> "LLMEngine":
"""Creates an LLM engine from the engine arguments."""
# Create the engine configs.
engine_config = engine_args.create_engine_config()
# Initialize the cluster and specify the executor class.
if engine_config.device_config.device_type == "neuron":
from vllm.executor.neuron_executor import NeuronExecutor
executor_class = NeuronExecutor
elif engine_config.device_config.device_type == "cpu":
from vllm.executor.cpu_executor import CPUExecutor
executor_class = CPUExecutor
elif engine_config.device_config.device_type == "npu":
if engine_config.parallel_config.worker_use_ray:
initialize_ray_cluster(engine_config.parallel_config)
from vllm_npu.executor.ascend_ray_executor import RayAscendExecutor
executor_class = RayAscendExecutor
else:
from vllm_npu.executor.ascend_executor import AscendExecutor
executor_class = AscendExecutor
elif engine_config.parallel_config.worker_use_ray:
initialize_ray_cluster(engine_config.parallel_config)
from vllm.executor.ray_gpu_executor import RayGPUExecutor
executor_class = RayGPUExecutor
else:
if engine_config.parallel_config.world_size != 1:
raise ValueError("Ray is required if parallel_config.world_size > 1.")
from vllm.executor.gpu_executor import GPUExecutor
executor_class = GPUExecutor
# Create the LLM engine.
engine = cls(
**engine_config.to_dict(),
executor_class=executor_class,
log_stats=not engine_args.disable_log_stats,
usage_context=usage_context,
)
return engine
|
- vllm_npu/vllm_npu/engine/async_ascend_engine.py:重写实例化AsyncLLMEngine类的from_engine_args函数,以实现昇腾环境下Executor类的载入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 | from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.usage.usage_lib import UsageContext
from vllm_npu.executor.ray_utils import initialize_ray_cluster
@classmethod
def from_engine_args_async(
cls,
engine_args: AsyncEngineArgs,
start_engine_loop: bool = True,
usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
) -> "AsyncLLMEngine":
"""Creates an async LLM engine from the engine arguments."""
# Create the engine configs.
engine_config = engine_args.create_engine_config()
if engine_config.device_config.device_type == "neuron":
from vllm.executor.neuron_executor import NeuronExecutorAsync
executor_class = NeuronExecutorAsync
elif engine_config.device_config.device_type == "cpu":
if engine_config.parallel_config.worker_use_ray:
raise RuntimeError("Ray is not supported with the CPU backend.")
from vllm.executor.cpu_executor import CPUExecutorAsync
executor_class = CPUExecutorAsync
elif engine_config.device_config.device_type == "npu":
if engine_config.parallel_config.worker_use_ray:
initialize_ray_cluster(engine_config.parallel_config)
from vllm_npu.executor.ascend_ray_executor import RayAscendExecutorAsync
executor_class = RayAscendExecutorAsync
else:
from vllm_npu.executor.ascend_executor import AscendExecutorAsync
executor_class = AscendExecutorAsync
elif engine_config.parallel_config.worker_use_ray:
initialize_ray_cluster(engine_config.parallel_config)
from vllm.executor.ray_gpu_executor import RayGPUExecutorAsync
executor_class = RayGPUExecutorAsync
else:
if engine_config.parallel_config.world_size != 1:
raise RuntimeError("Ray is required if parallel_config.world_size > 1.")
from vllm.executor.gpu_executor import GPUExecutorAsync
executor_class = GPUExecutorAsync
# Create the async LLM engine.
engine = cls(
engine_config.parallel_config.worker_use_ray,
engine_args.engine_use_ray,
**engine_config.to_dict(),
executor_class=executor_class,
log_requests=not engine_args.disable_log_requests,
log_stats=not engine_args.disable_log_stats,
max_log_len=engine_args.max_log_len,
start_engine_loop=start_engine_loop,
usage_context=usage_context,
)
return engine
|
- vllm_npu/vllm_npu/engine/arg_utils.py:重写vllm的arg_utils模块中的EngineArgs类,为block_size参数添加128的可选项。
import argparse
import dataclasses
from dataclasses import dataclass
from typing import List, Optional, Union
from vllm.config import (
CacheConfig,
DecodingConfig,
EngineConfig,
LoadConfig,
LoRAConfig,
ModelConfig,
ParallelConfig,
SchedulerConfig,
SpeculativeConfig,
TokenizerPoolConfig,
VisionLanguageConfig,
)
from vllm.model_executor.layers.quantization import QUANTIZATION_METHODS
from vllm.utils import str_to_int_tuple
from vllm_npu.config import DeviceConfig
DEFAULT_TYPE = "auto"
def nullable_str(val: str):
if not val or val == "None":
return None
return val
@dataclass
class EngineArgs:
"""Arguments for vLLM engine."""
model: str
served_model_name: Optional[Union[List[str]]] = None
tokenizer: Optional[str] = None
skip_tokenizer_init: bool = False
tokenizer_mode: str = DEFAULT_TYPE
trust_remote_code: bool = False
download_dir: Optional[str] = None
load_format: str = DEFAULT_TYPE
dtype: str = DEFAULT_TYPE
kv_cache_dtype: str = DEFAULT_TYPE
quantization_param_path: Optional[str] = None
seed: int = 0
max_model_len: Optional[int] = None
worker_use_ray: bool = False
pipeline_parallel_size: int = 1
tensor_parallel_size: int = 1
max_parallel_loading_workers: Optional[int] = None
block_size: int = 128
enable_prefix_caching: bool = False
use_v2_block_manager: bool = False
swap_space: int = 4 # GiB
gpu_memory_utilization: float = 0.90
max_num_batched_tokens: Optional[int] = None
max_num_seqs: int = 256
max_logprobs: int = 5 # OpenAI default value
disable_log_stats: bool = False
revision: Optional[str] = None
code_revision: Optional[str] = None
tokenizer_revision: Optional[str] = None
quantization: Optional[str] = None
enforce_eager: bool = True
max_context_len_to_capture: Optional[int] = None
max_seq_len_to_capture: int = 8192
disable_custom_all_reduce: bool = False
tokenizer_pool_size: int = 0
tokenizer_pool_type: str = "ray"
tokenizer_pool_extra_config: Optional[dict] = None
enable_lora: bool = False
max_loras: int = 1
max_lora_rank: int = 16
fully_sharded_loras: bool = False
lora_extra_vocab_size: int = 256
lora_dtype = DEFAULT_TYPE
max_cpu_loras: Optional[int] = None
device: str = DEFAULT_TYPE
ray_workers_use_nsight: bool = False
num_gpu_blocks_override: Optional[int] = None
num_lookahead_slots: int = 0
model_loader_extra_config: Optional[dict] = None
# Related to Vision-language models such as llava
image_input_type: Optional[str] = None
image_token_id: Optional[int] = None
image_input_shape: Optional[str] = None
image_feature_size: Optional[int] = None
scheduler_delay_factor: float = 0.0
enable_chunked_prefill: bool = False
guided_decoding_backend: str = "outlines"
# Speculative decoding configuration.
speculative_model: Optional[str] = None
num_speculative_tokens: Optional[int] = None
speculative_max_model_len: Optional[int] = None
ngram_prompt_lookup_max: Optional[int] = None
ngram_prompt_lookup_min: Optional[int] = None
def __post_init__(self):
if self.tokenizer is None:
self.tokenizer = self.model
@staticmethod
def add_cli_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""Shared CLI arguments for vLLM engine."""
# Model arguments
parser.add_argument(
"--model",
type=str,
default="facebook/opt-125m",
help="Name or path of the huggingface model to use.",
)
parser.add_argument(
"--tokenizer",
type=nullable_str,
default=EngineArgs.tokenizer,
help="Name or path of the huggingface tokenizer to use.",
)
parser.add_argument(
"--skip-tokenizer-init",
action="store_true",
help="Skip initialization of tokenizer and detokenizer",
)
parser.add_argument(
"--revision",
type=nullable_str,
default=None,
help="The specific model version to use. It can be a branch "
"name, a tag name, or a commit id. If unspecified, will use "
"the default version.",
)
parser.add_argument(
"--code-revision",
type=nullable_str,
default=None,
help="The specific revision to use for the model code on "
"Hugging Face Hub. It can be a branch name, a tag name, or a "
"commit id. If unspecified, will use the default version.",
)
parser.add_argument(
"--tokenizer-revision",
type=nullable_str,
default=None,
help="The specific tokenizer version to use. It can be a branch "
"name, a tag name, or a commit id. If unspecified, will use "
"the default version.",
)
parser.add_argument(
"--tokenizer-mode",
type=str,
default=EngineArgs.tokenizer_mode,
choices=[DEFAULT_TYPE, "slow"],
help='The tokenizer mode.\n\n* "auto" will use the '
'fast tokenizer if available.\n* "slow" will '
"always use the slow tokenizer.",
)
parser.add_argument(
"--trust-remote-code",
action="store_true",
help="Trust remote code from huggingface.",
)
parser.add_argument(
"--download-dir",
type=nullable_str,
default=EngineArgs.download_dir,
help="Directory to download and load the weights, "
"default to the default cache dir of "
"huggingface.",
)
parser.add_argument(
"--load-format",
type=str,
default=EngineArgs.load_format,
choices=[
DEFAULT_TYPE,
"pt",
"safetensors",
"npcache",
"dummy",
"tensorizer",
],
help="The format of the model weights to load.\n\n"
'* "auto" will try to load the weights in the safetensors format '
"and fall back to the pytorch bin format if safetensors format "
"is not available.\n"
'* "pt" will load the weights in the pytorch bin format.\n'
'* "safetensors" will load the weights in the safetensors format.\n'
'* "npcache" will load the weights in pytorch format and store '
"a numpy cache to speed up the loading.\n"
'* "dummy" will initialize the weights with random values, '
"which is mainly for profiling.\n"
'* "tensorizer" will load the weights using tensorizer from '
"CoreWeave which assumes tensorizer_uri is set to the location of "
"the serialized weights.",
)
parser.add_argument(
"--dtype",
type=str,
default=EngineArgs.dtype,
choices=[DEFAULT_TYPE, "half", "float16", "bfloat16", "float", "float32"],
help="Data type for model weights and activations.\n\n"
'* "auto" will use FP16 precision for FP32 and FP16 models, and '
"BF16 precision for BF16 models.\n"
'* "half" for FP16. Recommended for AWQ quantization.\n'
'* "float16" is the same as "half".\n'
'* "bfloat16" for a balance between precision and range.\n'
'* "float" is shorthand for FP32 precision.\n'
'* "float32" for FP32 precision.',
)
parser.add_argument(
"--kv-cache-dtype",
type=str,
choices=[DEFAULT_TYPE, "fp8"],
default=EngineArgs.kv_cache_dtype,
help='Data type for kv cache storage. If "auto", will use model '
"data type. FP8_E5M2 (without scaling) is only supported on cuda "
"version greater than 11.8. On ROCm (AMD GPU), FP8_E4M3 is instead "
"supported for common inference criteria.",
)
parser.add_argument(
"--quantization-param-path",
type=nullable_str,
default=None,
help="Path to the JSON file containing the KV cache "
"scaling factors. This should generally be supplied, when "
"KV cache dtype is FP8. Otherwise, KV cache scaling factors "
"default to 1.0, which may cause accuracy issues. "
"FP8_E5M2 (without scaling) is only supported on cuda version"
"greater than 11.8. On ROCm (AMD GPU), FP8_E4M3 is instead "
"supported for common inference criteria.",
)
parser.add_argument(
"--max-model-len",
type=int,
default=EngineArgs.max_model_len,
help="Model context length. If unspecified, will "
"be automatically derived from the model config.",
)
parser.add_argument(
"--guided-decoding-backend",
type=str,
default="outlines",
choices=["outlines", "lm-format-enforcer"],
help="Which engine will be used for guided decoding"
" (JSON schema / regex etc) by default. Currently support "
"https://github.com/outlines-dev/outlines and "
"https://github.com/noamgat/lm-format-enforcer."
" Can be overridden per request via guided_decoding_backend"
" parameter.",
)
# Parallel arguments
parser.add_argument(
"--worker-use-ray",
action="store_true",
help="Use Ray for distributed serving, will be "
"automatically set when using more than 1 GPU.",
)
parser.add_argument(
"--pipeline-parallel-size",
"-pp",
type=int,
default=EngineArgs.pipeline_parallel_size,
help="Number of pipeline stages.",
)
parser.add_argument(
"--tensor-parallel-size",
"-tp",
type=int,
default=EngineArgs.tensor_parallel_size,
help="Number of tensor parallel replicas.",
)
parser.add_argument(
"--max-parallel-loading-workers",
type=int,
default=EngineArgs.max_parallel_loading_workers,
help="Load model sequentially in multiple batches, "
"to avoid RAM OOM when using tensor "
"parallel and large models.",
)
parser.add_argument(
"--ray-workers-use-nsight",
action="store_true",
help="If specified, use nsight to profile Ray workers.",
)
# KV cache arguments
parser.add_argument(
"--block-size",
type=int,
default=EngineArgs.block_size,
choices=[8, 16, 32, 128],
help="Token block size for contiguous chunks of " "tokens.",
)
parser.add_argument(
"--enable-prefix-caching",
action="store_true",
help="Enables automatic prefix caching.",
)
parser.add_argument(
"--use-v2-block-manager",
action="store_true",
help="Use BlockSpaceMangerV2.",
)
parser.add_argument(
"--num-lookahead-slots",
type=int,
default=EngineArgs.num_lookahead_slots,
help="Experimental scheduling config necessary for "
"speculative decoding. This will be replaced by "
"speculative config in the future; it is present "
"to enable correctness tests until then.",
)
parser.add_argument(
"--seed",
type=int,
default=EngineArgs.seed,
help="Random seed for operations.",
)
parser.add_argument(
"--swap-space",
type=int,
default=EngineArgs.swap_space,
help="CPU swap space size (GiB) per GPU.",
)
parser.add_argument(
"--gpu-memory-utilization",
type=float,
default=EngineArgs.gpu_memory_utilization,
help="The fraction of GPU memory to be used for the model "
"executor, which can range from 0 to 1. For example, a value of "
"0.5 would imply 50%% GPU memory utilization. If unspecified, "
"will use the default value of 0.9.",
)
parser.add_argument(
"--num-gpu-blocks-override",
type=int,
default=None,
help="If specified, ignore GPU profiling result and use this number"
"of GPU blocks. Used for testing preemption.",
)
parser.add_argument(
"--max-num-batched-tokens",
type=int,
default=EngineArgs.max_num_batched_tokens,
help="Maximum number of batched tokens per " "iteration.",
)
parser.add_argument(
"--max-num-seqs",
type=int,
default=EngineArgs.max_num_seqs,
help="Maximum number of sequences per iteration.",
)
parser.add_argument(
"--max-logprobs",
type=int,
default=EngineArgs.max_logprobs,
help=(
"Max number of log probs to return logprobs is specified in"
" SamplingParams."
),
)
parser.add_argument(
"--disable-log-stats",
action="store_true",
help="Disable logging statistics.",
)
# Quantization settings.
parser.add_argument(
"--quantization",
"-q",
type=nullable_str,
choices=[*QUANTIZATION_METHODS, None],
default=EngineArgs.quantization,
help="Method used to quantize the weights. If "
"None, we first check the `quantization_config` "
"attribute in the model config file. If that is "
"None, we assume the model weights are not "
"quantized and use `dtype` to determine the data "
"type of the weights.",
)
parser.add_argument(
"--enforce-eager",
action="store_true",
help="Always use eager-mode PyTorch. If False, "
"will use eager mode and CUDA graph in hybrid "
"for maximal performance and flexibility.",
)
parser.add_argument(
"--max-context-len-to-capture",
type=int,
default=EngineArgs.max_context_len_to_capture,
help="Maximum context length covered by CUDA "
"graphs. When a sequence has context length "
"larger than this, we fall back to eager mode. "
"(DEPRECATED. Use --max-seq_len-to-capture instead"
")",
)
parser.add_argument(
"--max-seq_len-to-capture",
type=int,
default=EngineArgs.max_seq_len_to_capture,
help="Maximum sequence length covered by CUDA "
"graphs. When a sequence has context length "
"larger than this, we fall back to eager mode.",
)
parser.add_argument(
"--disable-custom-all-reduce",
action="store_true",
default=EngineArgs.disable_custom_all_reduce,
help="See ParallelConfig.",
)
parser.add_argument(
"--tokenizer-pool-size",
type=int,
default=EngineArgs.tokenizer_pool_size,
help="Size of tokenizer pool to use for "
"asynchronous tokenization. If 0, will "
"use synchronous tokenization.",
)
parser.add_argument(
"--tokenizer-pool-type",
type=str,
default=EngineArgs.tokenizer_pool_type,
help="Type of tokenizer pool to use for "
"asynchronous tokenization. Ignored "
"if tokenizer_pool_size is 0.",
)
parser.add_argument(
"--tokenizer-pool-extra-config",
type=nullable_str,
default=EngineArgs.tokenizer_pool_extra_config,
help="Extra config for tokenizer pool. "
"This should be a JSON string that will be "
"parsed into a dictionary. Ignored if "
"tokenizer_pool_size is 0.",
)
# LoRA related configs
parser.add_argument(
"--enable-lora",
action="store_true",
help="If True, enable handling of LoRA adapters.",
)
parser.add_argument(
"--max-loras",
type=int,
default=EngineArgs.max_loras,
help="Max number of LoRAs in a single batch.",
)
parser.add_argument(
"--max-lora-rank",
type=int,
default=EngineArgs.max_lora_rank,
help="Max LoRA rank.",
)
parser.add_argument(
"--lora-extra-vocab-size",
type=int,
default=EngineArgs.lora_extra_vocab_size,
help=(
"Maximum size of extra vocabulary that can be "
"present in a LoRA adapter (added to the base "
"model vocabulary)."
),
)
parser.add_argument(
"--lora-dtype",
type=str,
default=EngineArgs.lora_dtype,
choices=[DEFAULT_TYPE, "float16", "bfloat16", "float32"],
help=("Data type for LoRA. If auto, will default to " "base model dtype."),
)
parser.add_argument(
"--max-cpu-loras",
type=int,
default=EngineArgs.max_cpu_loras,
help=(
"Maximum number of LoRAs to store in CPU memory. "
"Must be >= than max_num_seqs. "
"Defaults to max_num_seqs."
),
)
parser.add_argument(
"--fully-sharded-loras",
action="store_true",
help=(
"By default, only half of the LoRA computation is "
"sharded with tensor parallelism. "
"Enabling this will use the fully sharded layers. "
"At high sequence length, max rank or "
"tensor parallel size, this is likely faster."
),
)
parser.add_argument(
"--device",
type=str,
default=EngineArgs.device,
choices=["auto", "cuda", "neuron", "cpu", "npu"],
help="Device type for vLLM execution.",
)
# Related to Vision-language models such as llava
parser.add_argument(
"--image-input-type",
type=nullable_str,
default=None,
choices=[t.name.lower() for t in VisionLanguageConfig.ImageInputType],
help=(
"The image input type passed into vLLM. "
'Should be one of "pixel_values" or "image_features".'
),
)
parser.add_argument(
"--image-token-id",
type=int,
default=None,
help=("Input id for image token."),
)
parser.add_argument(
"--image-input-shape",
type=nullable_str,
default=None,
help=(
"The biggest image input shape (worst for memory footprint) "
"given an input type. Only used for vLLM's profile_run."
),
)
parser.add_argument(
"--image-feature-size",
type=int,
default=None,
help=("The image feature size along the context dimension."),
)
parser.add_argument(
"--scheduler-delay-factor",
type=float,
default=EngineArgs.scheduler_delay_factor,
help="Apply a delay (of delay factor multiplied by previous"
"prompt latency) before scheduling next prompt.",
)
parser.add_argument(
"--enable-chunked-prefill",
action="store_true",
help="If set, the prefill requests can be chunked based on the "
"max_num_batched_tokens.",
)
parser.add_argument(
"--speculative-model",
type=nullable_str,
default=EngineArgs.speculative_model,
help="The name of the draft model to be used in speculative decoding.",
)
parser.add_argument(
"--num-speculative-tokens",
type=int,
default=EngineArgs.num_speculative_tokens,
help="The number of speculative tokens to sample from "
"the draft model in speculative decoding.",
)
parser.add_argument(
"--speculative-max-model-len",
type=int,
default=EngineArgs.speculative_max_model_len,
help="The maximum sequence length supported by the "
"draft model. Sequences over this length will skip "
"speculation.",
)
parser.add_argument(
"--ngram-prompt-lookup-max",
type=int,
default=EngineArgs.ngram_prompt_lookup_max,
help="Max size of window for ngram prompt lookup in speculative "
"decoding.",
)
parser.add_argument(
"--ngram-prompt-lookup-min",
type=int,
default=EngineArgs.ngram_prompt_lookup_min,
help="Min size of window for ngram prompt lookup in speculative "
"decoding.",
)
parser.add_argument(
"--model-loader-extra-config",
type=nullable_str,
default=EngineArgs.model_loader_extra_config,
help="Extra config for model loader. "
"This will be passed to the model loader "
"corresponding to the chosen load_format. "
"This should be a JSON string that will be "
"parsed into a dictionary.",
)
parser.add_argument(
"--served-model-name",
nargs="+",
type=str,
default=None,
help="The model name(s) used in the API. If multiple "
"names are provided, the server will respond to any "
"of the provided names. The model name in the model "
"field of a response will be the first name in this "
"list. If not specified, the model name will be the "
"same as the `--model` argument. Noted that this name(s)"
"will also be used in `model_name` tag content of "
"prometheus metrics, if multiple names provided, metrics"
"tag will take the first one.",
)
return parser
@classmethod
def from_cli_args(cls, args: argparse.Namespace) -> "EngineArgs":
# Get the list of attributes of this dataclass.
attrs = [attr.name for attr in dataclasses.fields(cls)]
# Set the attributes from the parsed arguments.
engine_args = cls(**{attr: getattr(args, attr) for attr in attrs})
return engine_args
def create_engine_config(
self,
) -> EngineConfig:
device_config = DeviceConfig(self.device)
model_config = ModelConfig(
self.model,
self.tokenizer,
self.tokenizer_mode,
self.trust_remote_code,
self.dtype,
self.seed,
self.revision,
self.code_revision,
self.tokenizer_revision,
self.max_model_len,
self.quantization,
self.quantization_param_path,
self.enforce_eager,
self.max_context_len_to_capture,
self.max_seq_len_to_capture,
self.max_logprobs,
self.skip_tokenizer_init,
self.served_model_name,
)
cache_config = CacheConfig(
self.block_size,
self.gpu_memory_utilization,
self.swap_space,
self.kv_cache_dtype,
self.num_gpu_blocks_override,
model_config.get_sliding_window(),
self.enable_prefix_caching,
)
parallel_config = ParallelConfig(
self.pipeline_parallel_size,
self.tensor_parallel_size,
self.worker_use_ray,
self.max_parallel_loading_workers,
self.disable_custom_all_reduce,
TokenizerPoolConfig.create_config(
self.tokenizer_pool_size,
self.tokenizer_pool_type,
self.tokenizer_pool_extra_config,
),
self.ray_workers_use_nsight,
)
speculative_config = SpeculativeConfig.maybe_create_spec_config(
target_model_config=model_config,
target_parallel_config=parallel_config,
target_dtype=self.dtype,
speculative_model=self.speculative_model,
num_speculative_tokens=self.num_speculative_tokens,
speculative_max_model_len=self.speculative_max_model_len,
enable_chunked_prefill=self.enable_chunked_prefill,
use_v2_block_manager=self.use_v2_block_manager,
ngram_prompt_lookup_max=self.ngram_prompt_lookup_max,
ngram_prompt_lookup_min=self.ngram_prompt_lookup_min,
)
scheduler_config = SchedulerConfig(
self.max_num_batched_tokens,
self.max_num_seqs,
model_config.max_model_len,
self.use_v2_block_manager,
num_lookahead_slots=(
self.num_lookahead_slots
if speculative_config is None
else speculative_config.num_lookahead_slots
),
delay_factor=self.scheduler_delay_factor,
enable_chunked_prefill=self.enable_chunked_prefill,
)
lora_config = (
LoRAConfig(
max_lora_rank=self.max_lora_rank,
max_loras=self.max_loras,
fully_sharded_loras=self.fully_sharded_loras,
lora_extra_vocab_size=self.lora_extra_vocab_size,
lora_dtype=self.lora_dtype,
max_cpu_loras=(
self.max_cpu_loras
if self.max_cpu_loras and self.max_cpu_loras > 0
else None
),
)
if self.enable_lora
else None
)
load_config = LoadConfig(
load_format=self.load_format,
download_dir=self.download_dir,
model_loader_extra_config=self.model_loader_extra_config,
)
if self.image_input_type:
if (
not self.image_token_id
or not self.image_input_shape
or not self.image_feature_size
):
raise ValueError(
"Specify `image_token_id`, `image_input_shape` and "
"`image_feature_size` together with `image_input_type`."
)
vision_language_config = VisionLanguageConfig(
image_input_type=VisionLanguageConfig.get_image_input_enum_type(
self.image_input_type
),
image_token_id=self.image_token_id,
image_input_shape=str_to_int_tuple(self.image_input_shape),
image_feature_size=self.image_feature_size,
)
else:
vision_language_config = None
decoding_config = DecodingConfig(
guided_decoding_backend=self.guided_decoding_backend
)
return EngineConfig(
model_config=model_config,
cache_config=cache_config,
parallel_config=parallel_config,
scheduler_config=scheduler_config,
device_config=device_config,
lora_config=lora_config,
vision_language_config=vision_language_config,
speculative_config=speculative_config,
load_config=load_config,
decoding_config=decoding_config,
)
- vllm_npu/vllm_npu/executor/__init__.py:在executor的初始化模块里替换ray集群的初始化函数。
| from vllm_npu.executor.ray_utils import initialize_ray_cluster
from vllm.executor import ray_utils
ray_utils.initialize_ray_cluster = initialize_ray_cluster
|
- vllm_npu/vllm_npu/executor/ascend_executor.py:实现了昇腾环境下单卡推理所需的AscendExecutor和AscendExecutorAsync类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 | # Part of codes in this file was copied from project [vLLM Team][vllm]
from typing import List, Set, Tuple
from vllm.executor.executor_base import ExecutorAsyncBase, ExecutorBase
from vllm.logger import init_logger
from vllm.lora.request import LoRARequest
from vllm.sequence import ExecuteModelRequest, SamplerOutput
from vllm.utils import (get_distributed_init_method, get_ip, get_open_port,
make_async)
from vllm_npu.worker.ascend_worker import AscendWorker
logger = init_logger(__name__)
class AscendExecutor(ExecutorBase):
def _init_executor(self) -> None:
assert (not self.speculative_config
), "Speculative decoding is not yet supported for Ascend backend."
# Instantiate the worker and load the model to the device.
self._init_worker()
def _init_worker(self):
distributed_init_method = get_distributed_init_method(
get_ip(), get_open_port())
self.driver_worker = AscendWorker(
self.model_config,
self.parallel_config,
self.scheduler_config,
self.device_config,
self.cache_config,
self.load_config,
local_rank=0,
rank=0,
distributed_init_method=distributed_init_method,
lora_config=self.lora_config,
is_driver_worker=True,
)
self.driver_worker.init_device()
self.driver_worker.load_model()
def determine_num_available_blocks(self) -> Tuple[int, int]:
"""Determine the number of available KV blocks by invoking the
underlying worker.
"""
return self.driver_worker.determine_num_available_blocks()
def initialize_cache(self, num_gpu_blocks: int,
num_cpu_blocks: int) -> None:
"""Initialize the KV cache by invoking the underlying worker.
"""
self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks)
def execute_model(
self,
execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
output = self.driver_worker.execute_model(execute_model_req)
return output
def add_lora(self, lora_request: LoRARequest) -> bool:
return self.driver_worker.add_lora(lora_request)
def remove_lora(self, lora_id: int) -> bool:
return self.driver_worker.remove_lora(lora_id)
def list_loras(self) -> Set[int]:
return self.driver_worker.list_loras()
def check_health(self) -> None:
# NeuronExecutor will always be healthy as long as
# it's running.
return
class AscendExecutorAsync(AscendExecutor, ExecutorAsyncBase):
async def execute_model_async(
self,
execute_model_req: ExecuteModelRequest,
) -> List[SamplerOutput]:
output = await make_async(
self.driver_worker.execute_model
)(execute_model_req=execute_model_req,)
return output
# async def check_health_async(self) -> None:
# # AscendExecutor will always be healthy as long as
# # it's running.
# return
|
- vllm_npu/vllm_npu/executor/ascend_ray_executor.py:实现了昇腾环境下基于Ray进行多卡推理所需的RayAscendExecutor和RayAscendExecutorAsync类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258 | import asyncio
import os
import pickle
from collections import defaultdict
from itertools import islice, repeat
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
import vllm.envs as envs
from vllm.executor.distributed_gpu_executor import ( # yapf: disable
DistributedGPUExecutor, DistributedGPUExecutorAsync)
from vllm.executor.ray_utils import RayWorkerWrapper, ray
from vllm.logger import init_logger
from vllm.sequence import ExecuteModelRequest, SamplerOutput
from vllm.utils import (get_distributed_init_method, get_ip, get_open_port,
get_vllm_instance_id, make_async)
if ray is not None:
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
if TYPE_CHECKING:
from ray.util.placement_group import PlacementGroup
logger = init_logger(__name__)
USE_RAY_COMPILED_DAG = envs.VLLM_USE_RAY_COMPILED_DAG
class RayAscendExecutor(DistributedGPUExecutor):
def _init_executor(self) -> None:
assert (not self.speculative_config
), "Speculative decoding not yet supported for RayNPU backend."
assert self.parallel_config.worker_use_ray
placement_group = self.parallel_config.placement_group
# Disable Ray usage stats collection.
ray_usage = os.environ.get("RAY_USAGE_STATS_ENABLED", "0")
if ray_usage != "1":
os.environ["RAY_USAGE_STATS_ENABLED"] = "0"
# Create the parallel GPU workers.
self._init_workers_ray(placement_group)
self.forward_dag = None
if USE_RAY_COMPILED_DAG:
self.forward_dag = self._compiled_ray_dag()
def _init_workers_ray(self, placement_group: "PlacementGroup",
**ray_remote_kwargs):
if self.parallel_config.tensor_parallel_size == 1:
# For single GPU case, we use a ray worker with constrained memory.
num_gpus = self.cache_config.gpu_memory_utilization
else:
# Otherwise, the ray workers are allocated with a full GPU.
num_gpus = 1
# The driver dummy worker does not actually use any resources.
# It holds the resource for the driver worker.
self.driver_dummy_worker: Optional[RayWorkerWrapper] = None
# The remaining workers are the actual ray actors.
self.workers: List[RayWorkerWrapper] = []
if self.parallel_config.ray_workers_use_nsight:
ray_remote_kwargs = self._configure_ray_workers_use_nsight(
ray_remote_kwargs)
# Create the workers.
driver_ip = get_ip()
for bundle_id, bundle in enumerate(placement_group.bundle_specs):
if not bundle.get("GPU", 0):
continue
scheduling_strategy = PlacementGroupSchedulingStrategy(
placement_group=placement_group,
placement_group_capture_child_tasks=True,
placement_group_bundle_index=bundle_id,
)
worker = ray.remote(
num_cpus=0,
num_gpus=num_gpus,
scheduling_strategy=scheduling_strategy,
**ray_remote_kwargs,
)(RayWorkerWrapper).remote(
worker_module_name="vllm_npu.worker.ascend_worker",
worker_class_name="AscendWorker",
trust_remote_code=self.model_config.trust_remote_code,
)
worker_ip = ray.get(worker.get_node_ip.remote())
if worker_ip == driver_ip and self.driver_dummy_worker is None:
# If the worker is on the same node as the driver, we use it
# as the resource holder for the driver process.
self.driver_dummy_worker = worker
self.driver_worker = RayWorkerWrapper(
worker_module_name="vllm_npu.worker.ascend_worker",
worker_class_name="AscendWorker",
trust_remote_code=self.model_config.trust_remote_code,
)
else:
# Else, added to the list of workers.
self.workers.append(worker)
if self.driver_dummy_worker is None:
raise ValueError(
"Ray does not allocate any GPUs on the driver node. Consider "
"adjusting the Ray placement group or running the driver on a "
"GPU node.")
# Get the set of GPU IDs used on each node.
worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids",
use_dummy_driver=True)
node_workers = defaultdict(list)
node_gpus = defaultdict(list)
for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids):
node_workers[node_id].append(i)
node_gpus[node_id].extend(gpu_ids)
for node_id, gpu_ids in node_gpus.items():
node_gpus[node_id] = sorted(gpu_ids)
VLLM_INSTANCE_ID = get_vllm_instance_id()
# Set environment variables for the driver and workers.
all_args_to_update_environment_variables = [({
"CUDA_VISIBLE_DEVICES":
",".join(map(str, node_gpus[node_id])),
"VLLM_INSTANCE_ID":
VLLM_INSTANCE_ID,
"VLLM_TRACE_FUNCTION":
str(envs.VLLM_TRACE_FUNCTION),
}, ) for (node_id, _) in worker_node_and_gpu_ids]
self._run_workers("update_environment_variables",
all_args=all_args_to_update_environment_variables)
distributed_init_method = get_distributed_init_method(
driver_ip, get_open_port())
# Initialize the actual workers inside worker wrapper.
init_worker_all_kwargs = [
self._get_worker_kwargs(
local_rank=node_workers[node_id].index(rank),
rank=rank,
distributed_init_method=distributed_init_method,
) for rank, (node_id, _) in enumerate(worker_node_and_gpu_ids)
]
self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs)
self._run_workers("init_device")
self._run_workers("load_model",
max_concurrent_workers=self.parallel_config.
max_parallel_loading_workers)
def execute_model(
self,
execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
all_outputs = self._run_workers(
"execute_model",
driver_kwargs={"execute_model_req": execute_model_req},
use_ray_compiled_dag=USE_RAY_COMPILED_DAG)
# Only the driver worker returns the sampling results.
return all_outputs[0]
def _run_workers(
self,
method: str,
*args,
driver_args: Optional[Tuple[Any, ...]] = None,
driver_kwargs: Optional[Dict[str, Any]] = None,
all_args: Optional[List[Tuple[Any, ...]]] = None,
all_kwargs: Optional[List[Dict[str, Any]]] = None,
use_dummy_driver: bool = False,
max_concurrent_workers: Optional[int] = None,
use_ray_compiled_dag: bool = False,
**kwargs,
) -> Any:
"""Runs the given method on all workers. Can be used in the following
ways:
- args/kwargs: All workers share the same args/kwargs
- args/kwargs and driver_args/driver_kwargs: Driver worker has
different args
- all_args/all_kwargs: args/kwargs for each worker are specified
individually
"""
if max_concurrent_workers:
raise NotImplementedError(
"max_concurrent_workers is not supported yet.")
if driver_args is None:
driver_args = args if all_args is None else all_args[0]
if driver_kwargs is None:
driver_kwargs = kwargs if all_kwargs is None else all_kwargs[0]
count = len(self.workers)
all_worker_args = repeat(args, count) if all_args is None \
else islice(all_args, 1, None)
all_worker_kwargs = repeat(kwargs, count) if all_kwargs is None \
else islice(all_kwargs, 1, None)
if use_ray_compiled_dag:
assert self.forward_dag is not None
output_channels = self.forward_dag.execute(1)
else:
# Start the ray workers first.
ray_worker_outputs = [
worker.execute_method.remote(method, *worker_args,
**worker_kwargs)
for (worker, worker_args, worker_kwargs
) in zip(self.workers, all_worker_args, all_worker_kwargs)
]
# Start the driver worker after all the ray workers.
if not use_dummy_driver:
driver_worker_output = self.driver_worker.execute_method(
method, *driver_args, **driver_kwargs)
else:
assert self.driver_dummy_worker is not None
driver_worker_output = ray.get(
self.driver_dummy_worker.execute_method.remote(
method, *driver_args, **driver_kwargs))
# Get the results of the ray workers.
if self.workers:
if use_ray_compiled_dag:
try:
ray_worker_outputs = [
pickle.loads(chan.begin_read())
for chan in output_channels
]
finally:
# Has to call end_read in order to reuse the DAG.
for chan in output_channels:
chan.end_read()
else:
ray_worker_outputs = ray.get(ray_worker_outputs)
return [driver_worker_output] + ray_worker_outputs
def _compiled_ray_dag(self):
import pkg_resources
required_version = "2.9"
current_version = pkg_resources.get_distribution("ray").version
if current_version < required_version:
raise ValueError(f"Ray version {required_version} or greater is "
f"required, but found {current_version}")
from ray.dag import InputNode, MultiOutputNode
assert self.parallel_config.worker_use_ray
with InputNode() as input_data:
forward_dag = MultiOutputNode([
worker.execute_model_compiled_dag_remote.
bind( # type: ignore[attr-defined]
input_data) for worker in self.workers
])
return forward_dag.experimental_compile()
def check_health(self) -> None:
"""Raises an error if engine is unhealthy."""
self._check_if_any_actor_is_dead()
def _check_if_any_actor_is_dead(self):
if not self.workers:
return
dead_actors = []
for actor in self.workers:
actor_state = ray.state.actors(actor._ray_actor_id.hex()) # pylint: disable=protected-access
if actor_state["State"] == "DEAD":
dead_actors.append(actor)
if dead_actors:
raise RuntimeError("At least one Worker is dead. "
f"Dead Workers: {dead_actors}. ")
class RayAscendExecutorAsync(RayAscendExecutor, DistributedGPUExecutorAsync):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.driver_executor = make_async(self.driver_worker.execute_method)
async def _run_workers_async(
self,
method: str,
*args,
driver_args: Optional[Tuple[Any, ...]] = None,
driver_kwargs: Optional[Dict[str, Any]] = None,
**kwargs,
) -> Any:
"""Runs the given method on all workers."""
coros = []
if driver_args is None:
driver_args = args
if driver_kwargs is None:
driver_kwargs = kwargs
coros.append(
self.driver_executor(method, *driver_args, **driver_kwargs))
# Run the ray workers asynchronously.
for worker in self.workers:
coros.append(worker.execute_method.remote(method, *args, **kwargs))
all_outputs = await asyncio.gather(*coros)
return all_outputs
|
- vllm_npu/vllm_npu/executor/ray_utils.py:适配ray分布式环境的初始化操作,以便能在昇腾环境下成功初始化ray环境。
from typing import Optional, Tuple, TYPE_CHECKING
from vllm.config import ParallelConfig
from vllm.utils import is_hip
from vllm.logger import init_logger
logger = init_logger(__name__)
try:
import ray
except ImportError as e:
logger.warning(f"Failed to import Ray with {e!r}. "
"For distributed inference, please install Ray with "
"`pip install ray`.")
ray = None
if TYPE_CHECKING:
from ray.util.placement_group import PlacementGroup
def initialize_ray_cluster(
parallel_config: ParallelConfig,
ray_address: Optional[str] = None,
):
"""Initialize the distributed cluster with Ray.
it will connect to the Ray cluster and create a placement group
for the workers, which includes the specification of the resources
for each distributed worker.
Args:
parallel_config: The configurations for parallel execution.
ray_address: The address of the Ray cluster. If None, uses
the default Ray cluster address.
"""
if ray is None:
raise ImportError(
"Ray is not installed. Please install Ray to use multi-node "
"serving.")
# Connect to a ray cluster.
if is_hip():
ray.init(address=ray_address,
ignore_reinit_error=True,
num_gpus=parallel_config.world_size)
else:
"""start adapt"""
# without setting num_gpus, the function will try to detect num of
# GPUs, but in ascend environment it may fail to detect gpus, thus
# needed to be manually setted.
ray.init(address=ray_address, ignore_reinit_error=True,
num_gpus=parallel_config.world_size)
"""end adapt"""
if parallel_config.placement_group:
# Placement group is already set.
return
# Create placement group for worker processes
current_placement_group = ray.util.get_current_placement_group()
if current_placement_group:
# We are in a placement group
bundles = current_placement_group.bundle_specs
# Verify that we can use the placement group.
gpu_bundles = 0
for bundle in bundles:
bundle_gpus = bundle.get("GPU", 0)
if bundle_gpus > 1:
raise ValueError(
"Placement group bundle cannot have more than 1 GPU.")
if bundle_gpus:
gpu_bundles += 1
if parallel_config.world_size > gpu_bundles:
raise ValueError(
"The number of required GPUs exceeds the total number of "
"available GPUs in the placement group.")
else:
num_gpus_in_cluster = ray.cluster_resources().get("GPU", 0)
if parallel_config.world_size > num_gpus_in_cluster:
raise ValueError(
"The number of required GPUs exceeds the total number of "
"available GPUs in the cluster.")
# Create a new placement group
placement_group_specs = ([{"GPU": 1}] * parallel_config.world_size)
current_placement_group = ray.util.placement_group(
placement_group_specs)
# Wait until PG is ready - this will block until all
# requested resources are available, and will timeout
# if they cannot be provisioned.
ray.get(current_placement_group.ready(), timeout=1800)
# Set the placement group in the parallel config
parallel_config.placement_group = current_placement_group
- vllm_npu/vllm_npu/model_executor/__init__.py:在model_executor中替换vllm原生loader里的get_architecture_class_name函数。
import vllm.model_executor.model_loader as vllm_model_loader
import vllm_npu.model_executor.ascend_model_loader as ascend_model_loader
vllm_model_loader.get_architecture_class_name = ascend_model_loader.get_architecture_class_name
- vllm_npu/vllm_npu/model_executor/ascend_model_loader.py:重写vllm原生的get_architecture_class_name函数,以适配量化场景;重写vllm原生的get_model函数,以便提供进入到MindIE整图模型的入口。
# Part of code in this file was copied from project [vLLM Team][vllm] for adapting usage
import contextlib
import torch
import torch.nn as nn
from vllm.config import DeviceConfig, ModelConfig, LoadConfig
from vllm.model_executor.model_loader.weight_utils import initialize_dummy_weights
from vllm_npu.model_executor.models.ascend.mindie_llm_wrapper import MindIELlmWrapper
def get_architecture_class_name(model_config: ModelConfig) -> str:
architectures = getattr(model_config.hf_config, "architectures", [])
if (model_config.quantization is not None
and model_config.quantization != "fp8"
and "MixtralForCausalLM" in architectures):
architectures = ["QuantMixtralForCausalLM"]
return architectures[0]
@contextlib.contextmanager
def _set_default_torch_dtype(dtype: torch.dtype):
"""Sets the default torch dtype to the given dtype."""
old_dtype = torch.get_default_dtype()
torch.set_default_dtype(dtype)
yield
torch.set_default_dtype(old_dtype)
def get_model(model_config: ModelConfig, device_config: DeviceConfig,
load_config: LoadConfig,
mindie_model_config, **kwargs) -> nn.Module:
lora_config = kwargs.get("lora_config", None)
model_class = MindIELlmWrapper
# Get the (maybe quantized) linear method.
linear_method = None
with _set_default_torch_dtype(model_config.dtype):
# Create a model instance.
# The weights will be initialized as empty tensors.
with torch.device(device_config.device):
if hasattr(model_class, "supported_lora_modules"):
model = model_class(mindie_model_config, linear_method,
lora_config)
elif lora_config:
raise ValueError(
f"Model {model_class.__name__} does not support LoRA, "
"but LoRA is enabled. Support for this model may "
"be added in the future. If this is important to you, "
"please open an issue on github.")
else:
model = model_class(mindie_model_config, linear_method)
if load_config.load_format == "dummy":
initialize_dummy_weights(model)
else:
# Load the weights from the cached or downloaded files.
model.load_weights(model_config.model, load_config.download_dir,
load_config.load_format, model_config.revision)
model = model.npu()
return model.eval()
- vllm_npu/vllm_npu/model_executor/layers/__init__.py:空白文件。
- vllm_npu/vllm_npu/model_executor/layers/ascend_sampler.py:实现昇腾后端的后处理类以便实现vllm数据结构和MindIE后处理的对接。
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved.
# Part of codes in this file was copied from project [vLLM Team][vllm]
import random
from typing import Dict, List, Optional, Tuple
import torch
import torch.nn as nn
import numpy as np
from vllm.sampling_params import SamplingType
from vllm.sequence import SamplerOutput
from vllm.model_executor.sampling_metadata import SamplingMetadata, SequenceGroupToSample
from vllm.model_executor.layers.sampler import _get_logprobs, _build_sampler_output
from mindie_llm.text_generator.utils.sampling_metadata import SamplingData, SamplingParam
_SAMPLING_EPS = 1e-5
SampleResultType = List[Tuple[List[int], List[int]]]
def _to_tensor(data, dtype=None):
if dtype:
return torch.tensor(data, dtype=dtype, device=torch.device("npu"))
else:
return torch.tensor(data, device=torch.device("npu"))
class AscendSampler(nn.Module):
def __init__(self, mindie_model):
super().__init__()
self.mindie_model = mindie_model
self.include_gpu_probs_tensor = False
def forward(
self,
logits: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> Optional[SamplerOutput]:
_, vocab_size = logits.shape
mindie_sampling_data, mindie_sampling_param = self.construct_data(sampling_metadata, vocab_size)
probs = torch.softmax(logits, dim=-1, dtype=torch.float)
logprobs = torch.log_softmax(logits, dim=-1, dtype=torch.float)
sampling_mask = [seq_group.do_sample for seq_group in sampling_metadata.seq_groups]
filtered_logits = logits[sampling_mask]
if filtered_logits.size(0) > 0:
next_tokens, _= self.mindie_model.sample(
filtered_logits,
sampling_data=mindie_sampling_data,
sampling_param=mindie_sampling_param,
)
else:
next_tokens = None
sample_results, maybe_sampled_tokens_tensor = recover_data(
sampling_metadata=sampling_metadata,
sampled_tokens=next_tokens,
logprobs=logprobs,
include_gpu_probs_tensor=self.include_gpu_probs_tensor,
)
if self.include_gpu_probs_tensor:
if maybe_sampled_tokens_tensor is None:
raise RuntimeError("maybe_sampled_tokens_tensor is None")
on_device_tensors = (probs, logprobs, maybe_sampled_tokens_tensor)
else:
on_device_tensors = None
# Get the logprobs query results.
prompt_logprobs, sample_logprobs = _get_logprobs(
logprobs, sampling_metadata, sample_results)
return _build_sampler_output(sample_results,
sampling_metadata,
prompt_logprobs,
sample_logprobs,
on_device_tensors=on_device_tensors)
def construct_data(
self,
sampling_metadata: SamplingMetadata,
vocab_size: int,
) -> Tuple[SamplingData, SamplingParam]:
all_input_tokens: List[List[int]] = []
prompt_tokens: List[List[int]] = []
output_tokens: List[List[int]] = []
top_ks: List[int] = []
temperatures: List[float] = []
top_ps: List[float] = []
min_ps: List[float] = []
presence_penalties: List[float] = []
frequency_penalties: List[float] = []
repetition_penalties: List[float] = []
sampling_seeds: List[int] = []
sample_indices: List[int] = []
do_samples: List[bool] = [] # To Do
do_penalties = False
do_top_p_top_k = False
do_min_p = False
greedy_flag = False
if sampling_metadata.seq_groups is None:
raise RuntimeError("sampling_metadata.seq_group is None, no data received.")
for seq_group in sampling_metadata.seq_groups:
do_samples.append(seq_group.do_sample)
seq_ids = seq_group.seq_ids
sampling_params = seq_group.sampling_params
temperature = sampling_params.temperature
p = sampling_params.presence_penalty
f = sampling_params.frequency_penalty
r = sampling_params.repetition_penalty
top_p = sampling_params.top_p
min_p = sampling_params.min_p
is_greedy = sampling_params.sampling_type == SamplingType.GREEDY
seed = sampling_params.seed
if seed is None:
if is_greedy:
seed = 0
else:
lo, hi = torch.iinfo(torch.long).min, torch.iinfo(torch.long).max
seed = random.randint(lo, hi)
if is_greedy:
greedy_flag = True
# k should not be greater than the vocab size.
top_k = min(sampling_params.top_k, vocab_size)
top_k = vocab_size if top_k == -1 else top_k
if temperature < _SAMPLING_EPS:
temperature = 1.0
if not do_top_p_top_k and (top_p < 1.0 - _SAMPLING_EPS
or top_k != vocab_size):
do_top_p_top_k = True
if not do_min_p and min_p > _SAMPLING_EPS:
do_min_p = True
if not do_penalties:
if abs(p) >= _SAMPLING_EPS:
do_penalties = True
elif abs(f) >= _SAMPLING_EPS:
do_penalties = True
elif abs(r - 1.0) >= _SAMPLING_EPS:
do_penalties = True
is_prompt = seq_group.is_prompt
if (seq_group.is_prompt
and sampling_params.prompt_logprobs is not None):
# For tokens in the prompt that we only need to get
# their logprobs
query_len = seq_group.query_len
if query_len is None:
raise RuntimeError("query_len is None")
prefill_len = len(seq_group.prompt_logprob_indices)
temperatures += [temperature] * prefill_len
sampling_seeds += [seed] * prefill_len
top_ps += [top_p] * prefill_len
top_ks += [top_k] * prefill_len
min_ps += [min_p] * prefill_len
presence_penalties += [0] * prefill_len
frequency_penalties += [0] * prefill_len
repetition_penalties += [1] * prefill_len
prompt_tokens.extend([] for _ in range(prefill_len))
output_tokens.extend([] for _ in range(prefill_len))
all_input_tokens.extend([] for _ in range(prefill_len))
if seq_group.do_sample:
sample_lens = len(seq_group.sample_indices)
if sample_lens != len(seq_ids):
raise ValueError("sample_lens != len(seq_ids)")
for seq_id in seq_ids:
seq_data = seq_group.seq_data[seq_id]
prompt_tokens.append(seq_data.prompt_token_ids)
output_tokens.append(seq_data.output_token_ids)
all_input_tokens.append(seq_data.prompt_token_ids + seq_data.output_token_ids)
temperatures += [temperature] * len(seq_ids)
sampling_seeds += [seed] * len(seq_ids)
top_ps += [top_p] * len(seq_ids)
top_ks += [top_k] * len(seq_ids)
min_ps += [min_p] * len(seq_ids)
presence_penalties += [p] * len(seq_ids)
frequency_penalties += [f] * len(seq_ids)
repetition_penalties += [r] * len(seq_ids)
repetition_penalties = np.array(repetition_penalties, dtype=np.float32)
frequency_penalties = np.array(frequency_penalties, dtype=np.float32)
presence_penalties = np.array(presence_penalties, dtype=np.float32)
temperatures = np.array(temperatures, dtype=np.float32)
top_ks = np.array(top_ks, dtype=np.int32)
top_ps = np.array(top_ps, dtype=np.float32)
sampling_seeds = np.array(sampling_seeds)
do_samples = np.array(do_samples)
max_tokens_len = max([len(tokens) for tokens in all_input_tokens], default=0)
padded_all_input_tokens = [
tokens + [vocab_size] * (max_tokens_len - len(tokens))
for tokens in all_input_tokens
]
padded_all_input_tokens = np.array(padded_all_input_tokens, dtype=np.int32)
output_max_len = max([len(tokens) for tokens in output_tokens], default=0)
padded_output_tokens = [
tokens + [vocab_size] * (output_max_len - len(tokens))
for tokens in output_tokens
]
padded_output_tokens = np.array(padded_output_tokens, dtype=np.int32)
all_input_ids_tensor = _to_tensor(
padded_all_input_tokens,
torch.int32
) if padded_all_input_tokens is not None else None
output_ids_tensor = _to_tensor(
padded_output_tokens,
torch.int32
) if padded_output_tokens is not None else None
mindie_sampling_data = SamplingData(
all_input_ids=all_input_ids_tensor,
output_ids=output_ids_tensor
)
if greedy_flag:
mindie_sampling_param = None
else:
mindie_sampling_param = SamplingParam.from_numpy(
repetition_penalty=repetition_penalties,
frequency_penalty=frequency_penalties,
presence_penalty=presence_penalties,
temperature=temperatures,
top_k=top_ks,
top_p=top_ps,
seed=sampling_seeds,
do_sample=do_samples,
to_tensor=_to_tensor,
)
return (mindie_sampling_data, mindie_sampling_param)
def recover_data(
sampling_metadata: SamplingMetadata,
sampled_tokens: np.ndarray,
logprobs: torch.Tensor,
include_gpu_probs_tensor: bool,
) -> Tuple[SampleResultType, Optional[torch.Tensor]]:
categorized_seq_group_ids: Dict[SamplingType,
List[int]] = {t: []
for t in SamplingType}
categorized_sample_indices = sampling_metadata.categorized_sample_indices
for i, seq_group in enumerate(sampling_metadata.seq_groups):
sampling_params = seq_group.sampling_params
sampling_type = sampling_params.sampling_type
categorized_seq_group_ids[sampling_type].append(i)
sample_results_dict: Dict[int, Tuple[List[int], List[int]]] = {}
sample_metadata = {}
# Create output tensor for sampled token ids.
if include_gpu_probs_tensor:
sampled_token_ids_tensor = torch.empty(logprobs.shape[0],
1,
dtype=torch.long,
device=logprobs.device)
else:
sampled_token_ids_tensor = None
for sampling_type in SamplingType:
sample_indices = categorized_sample_indices[sampling_type][:, 0]
num_tokens = len(sample_indices)
if num_tokens == 0:
continue
seq_group_id = categorized_seq_group_ids[sampling_type]
seq_groups = [sampling_metadata.seq_groups[i] for i in seq_group_id]
sample_metadata[sampling_type] = (seq_group_id, seq_groups)
for sampling_type in SamplingType:
if sampling_type not in sample_metadata:
continue
(seq_group_id, seq_groups) = sample_metadata[sampling_type]
if sampling_type in (SamplingType.GREEDY, SamplingType.RANDOM, SamplingType.RANDOM_SEED):
sample_results = normal_wrap(seq_groups, sampled_tokens)
elif sampling_type == SamplingType.BEAM:
sample_results = beam_wrap(seq_groups, sampled_tokens)
sample_results_dict.update(zip(seq_group_id, sample_results))
sample_results = [
sample_results_dict.get(i, ([], []))
for i in range(len(sampling_metadata.seq_groups))
]
return sample_results, sampled_token_ids_tensor
def normal_wrap(
selected_seq_groups: List[SequenceGroupToSample],
samples: np.ndarray,
):
samples = samples.tolist()
sample_idx = 0
results: SampleResultType = []
for seq_group in selected_seq_groups:
if not seq_group.do_sample:
results.append(([], []))
continue
seq_ids = seq_group.seq_ids
num_parent_seqs = len(seq_ids)
parent_ids = list(range(num_parent_seqs))
next_token_ids = [samples[sample_idx]]
results.append((next_token_ids, parent_ids))
sample_idx += num_parent_seqs
return results
def beam_wrap(
selected_seq_groups: List[SequenceGroupToSample],
samples: np.ndarray,
):
raise ValueError(f"Unsupported sampling type: beam search")
- vllm_npu/vllm_npu/model_executor/models/__init__.py:空白文件。
- vllm_npu/vllm_npu/model_executor/models/ascend/__init__.py:空白文件。
- vllm_npu/vllm_npu/model_executor/models/ascend/mindie_llm_wrapper.py:实现了对接MindIE整图模型的包装类,用于将vllm的数据结构解包出MindIE整图所需要的数据并透传下去。
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved.
import math
from typing import List, Optional
import torch
from mindie_llm.text_generator.adapter.generator_torch import GeneratorTorch
from torch import nn
import torch.nn.functional as F
from vllm.attention import AttentionMetadata
from vllm.lora.request import LoRARequest
from vllm.model_executor import SamplingMetadata
from vllm.sequence import SamplerOutput
from vllm_npu.model_executor.layers.ascend_sampler import AscendSampler
from vllm_npu.worker.cache_engine import KVCache
class MindIELlmWrapper(nn.Module):
def __init__(self, mindie_model_config, linear_method=None, lora_config=None):
super(MindIELlmWrapper, self).__init__()
self.mindie_model_config = mindie_model_config
self.rank = mindie_model_config["rank"]
self.local_rank = mindie_model_config["local_rank"]
self.npu_id = self.local_rank
self.world_size = mindie_model_config["world_size"]
self.kv_cache_dtype = mindie_model_config["kv_cache_dtype"]
self.mindie_model = None
self.sampler = None
def forward(
self,
input_ids: torch.Tensor,
positions: torch.Tensor,
kv_caches: List[KVCache],
attn_metadata: AttentionMetadata,
lora_requests: List[LoRARequest],
) -> torch.Tensor:
is_prompt = attn_metadata.prefill_metadata is not None
if kv_caches[0][0] is None:
kv_caches, block_tables, slots = self.create_dummy_kv_cache(attn_metadata, input_ids)
else:
block_tables = self.create_block_tables(attn_metadata)
slots = attn_metadata.slot_mapping
if attn_metadata.prefill_metadata is None:
input_lengths = attn_metadata.decode_metadata.seq_lens_tensor
max_seq_len = attn_metadata.decode_metadata.max_seq_len
query_lens = [1] * len(attn_metadata.decode_metadata.seq_lens_tensor)
lm_head_indices = None
else:
input_lengths = attn_metadata.prefill_metadata.seq_lens_tensor
max_seq_len = attn_metadata.prefill_metadata.max_seq_len
subquery_start_loc = attn_metadata.prefill_metadata.subquery_start_loc
query_lens_tensor = subquery_start_loc[1:] - subquery_start_loc[:-1]
if attn_metadata.decode_metadata is not None:
input_lengths = torch.cat((input_lengths, attn_metadata.decode_metadata.seq_lens_tensor), dim=0)
max_seq_len = max(max_seq_len, attn_metadata.decode_metadata.max_seq_len)
query_lens_tensor = F.pad(query_lens_tensor, (0, attn_metadata.num_decode_tokens), "constant", 1)
query_lens = query_lens_tensor.tolist()
lm_head_indices = query_lens_tensor.cumsum(dim=-1) - 1
adapter_ids = [lora_request.lora_name if lora_request else None for lora_request in lora_requests]
logits = self.mindie_model.forward_tensor(
input_ids,
positions,
is_prompt,
kv_caches,
block_tables,
slots,
input_lengths,
max_seq_len,
lm_head_indices,
adapter_ids=adapter_ids,
q_lens=query_lens,
)
return logits
def compute_logits(self, hidden_states: torch.Tensor, sampling_metadata: SamplingMetadata) -> torch.Tensor:
return hidden_states
def sample(
self,
logits: torch.Tensor,
sampling_metadata: SamplingMetadata,
) -> Optional[SamplerOutput]:
# hidden_states is logits
next_tokens = self.sampler(logits, sampling_metadata)
return next_tokens
def load_weights(
self,
model_name_or_path: str,
cache_dir: Optional[str] = None,
load_format: str = "auto",
revision: Optional[str] = None,
):
if load_format not in ["auto", "safetensors", "pt"]:
raise ValueError("load-format support [safetensors, pt]")
self.weight_dtype = torch.get_default_dtype()
torch.set_default_dtype(torch.float32)
self.mindie_model = GeneratorTorch(self.mindie_model_config)
self.sampler = AscendSampler(self.mindie_model)
torch.set_default_dtype(self.weight_dtype)
# when warmup, create dummy kvcache, block_tables, slot_mapping
def create_dummy_kv_cache(self, attn_metadata, input_ids):
dummy_block_size = 128
max_s = max(attn_metadata.prefill_metadata.seq_lens_tensor)
max_need_block = math.ceil(max_s / dummy_block_size)
batch_size = len(attn_metadata.prefill_metadata.seq_lens_tensor)
dummy_block_num = max_need_block * batch_size
model_runner = self.mindie_model.model_wrapper.model_runner
kv_cache = [
(
torch.empty(
(dummy_block_num, dummy_block_size, model_runner.num_kv_heads, model_runner.head_size),
dtype=self.kv_cache_dtype,
device="npu",
),
torch.empty(
(dummy_block_num, dummy_block_size, model_runner.num_kv_heads, model_runner.head_size),
dtype=self.kv_cache_dtype,
device="npu",
),
)
for _ in range(model_runner.num_layers)
]
block_tables = torch.zeros(batch_size, max_need_block, dtype=int, device="npu")
slot = [i for i in range(dummy_block_size)]
slots = []
warm_up_len = len(input_ids)
while warm_up_len > 0:
if warm_up_len > dummy_block_size:
slots.extend(slot)
warm_up_len -= dummy_block_size
else:
slots.extend(slot[:warm_up_len])
warm_up_len = 0
slots = torch.tensor(slots, dtype=torch.long, device="npu")
return kv_cache, block_tables, slots
def create_block_tables(self, attn_metadata):
if attn_metadata.prefill_metadata is None:
return attn_metadata.decode_metadata.block_tables
prefill_block_tables = attn_metadata.prefill_metadata.block_tables
if prefill_block_tables.numel() == 0:
return torch.tensor([0], dtype=torch.int32, device="npu")
if attn_metadata.decode_metadata is None:
return prefill_block_tables
decode_block_tables = attn_metadata.decode_metadata.block_tables
pad_size = prefill_block_tables.size(1) - decode_block_tables.size(1)
if pad_size > 0:
decode_block_tables = F.pad(decode_block_tables, (0, pad_size), "constant", 0)
elif pad_size < 0:
prefill_block_tables = F.pad(prefill_block_tables, (0, -pad_size), "constant", 0)
return torch.cat((prefill_block_tables, decode_block_tables), dim=0)
- vllm_npu/vllm_npu/usage/__init__.py:热替换vllm原生的_report_usage_once函数以修复一个打印usage信息时的报错。
import types
from vllm.engine.llm_engine import usage_message
import vllm_npu.usage.usage_lib as vllm_npu_usage_lib
usage_message._report_usage_once = types.MethodType(vllm_npu_usage_lib._report_usage_once, usage_message)
- vllm_npu/vllm_npu/usage/usage_lib.py:重新实现vllm原生的_report_usage_once函数以修复一个打印usage信息时的报错。
# Part of code in this file was copied from project [vLLM Team][vllm] for adapting usage
import platform
from typing import Any, Dict
import cpuinfo
import psutil
import torch
import vllm.envs as envs
from vllm.usage.usage_lib import UsageContext, _detect_cloud_provider, _get_current_timestamp_ns
def _report_usage_once(self, model_architecture: str,
usage_context: UsageContext,
extra_kvs: Dict[str, Any]) -> None:
# Platform information
if torch.npu.is_available():
device_property = torch.npu.get_device_properties()
self.gpu_count = torch.npu.device_count()
self.gpu_type = device_property.name
self.gpu_memory_per_device = device_property.total_memory
self.provider = _detect_cloud_provider()
self.architecture = platform.machine()
self.platform = platform.platform()
self.total_memory = psutil.virtual_memory().total
info = cpuinfo.get_cpu_info()
self.num_cpu = info.get("count", None)
self.cpu_type = info.get("brand_raw", "")
self.cpu_family_model_stepping = ",".join([
str(info.get("family", "")),
str(info.get("model", "")),
str(info.get("stepping", ""))
])
# vLLM information
import vllm # delayed import to prevent circular import
self.context = usage_context.value
self.vllm_version = vllm.__version__
self.model_architecture = model_architecture
# Metadata
self.log_time = _get_current_timestamp_ns()
self.source = envs.VLLM_USAGE_SOURCE
data = vars(self)
if data["_report_usage_once"] is not None:
del data["_report_usage_once"]
if extra_kvs:
data.update(extra_kvs)
self._write_to_file(data)
self._send_to_server(data)
- vllm_npu/vllm_npu/worker/__init__.py:替换vllm原生CacheEngine中的_allocate_kv_cache函数,以实现自定义的kv_cache分配方法。
from vllm_npu.worker.cache_engine import _allocate_kv_cache
from vllm.worker import cache_engine
cache_engine.CacheEngine._allocate_kv_cache = _allocate_kv_cache
- vllm_npu/vllm_npu/worker/ascend_model_runner.py : 实现昇腾后端使用的AscendModelRunner类,已适配prefix cache特性。
# Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved.
# Part of codes in this file was copied from project [vLLM Team][vllm]
from typing import List, NamedTuple, Optional, Tuple
import torch
from vllm.attention import AttentionMetadata, AttentionMetadataPerStage
from vllm.attention.backends.flashinfer import FlashInferBackend
from vllm.config import (
DeviceConfig,
LoadConfig,
LoRAConfig,
ModelConfig,
ParallelConfig,
SchedulerConfig,
VisionLanguageConfig,
)
from vllm.distributed import broadcast_tensor_dict
from vllm.logger import init_logger
from vllm.lora.layers import LoRAMapping
from vllm.lora.request import LoRARequest
from vllm.model_executor import SamplingMetadata
from vllm.sampling_params import SamplingParams
from vllm.sequence import SamplerOutput, SequenceGroupMetadata
from vllm.utils import get_kv_cache_torch_dtype, is_hip, make_tensor_with_pad
from vllm.worker.model_runner import (
_PAD_SLOT_ID,
BatchType,
ModelRunner,
PrepareDecodeMetadata,
PreparePromptMetadata,
_prepare_fake_inputs,
)
from vllm_npu.model_executor.ascend_model_loader import get_model
logger = init_logger(__name__)
LORA_WARMUP_RANK = 8
class PreparePromptMetadata(NamedTuple):
input_tokens: List[int]
input_positions: List[int]
attn_metadata: Optional[AttentionMetadataPerStage]
seq_lens: List[int]
query_lens: List[int]
lora_index_mapping: List[int]
lora_prompt_mapping: List[int]
lora_requests: List[LoRARequest]
multi_modal_input: Optional[torch.Tensor]
slot_mapping: List[int]
@classmethod
def empty(cls):
return PreparePromptMetadata(
input_tokens=[],
input_positions=[],
attn_metadata=None,
seq_lens=[],
query_lens=[],
lora_index_mapping=[],
lora_prompt_mapping=[],
lora_requests=[],
multi_modal_input=None,
slot_mapping=[],
)
class PrepareDecodeMetadata(NamedTuple):
input_tokens: List[int]
input_positions: List[int]
attn_metadata: Optional[AttentionMetadata]
lora_index_mapping: List[int]
lora_prompt_mapping: List[int]
lora_requests: List[LoRARequest]
slot_mapping: List[int]
@classmethod
def empty(cls):
return PrepareDecodeMetadata(
input_tokens=[],
input_positions=[],
attn_metadata=None,
lora_index_mapping=[],
lora_prompt_mapping=[],
lora_requests=[],
slot_mapping=[],
)
class AscendModelRunner(ModelRunner):
def __init__(
self,
model_config: ModelConfig,
parallel_config: ParallelConfig,
scheduler_config: SchedulerConfig,
device_config: DeviceConfig,
load_config: LoadConfig,
lora_config: Optional[LoRAConfig],
mindie_model_config,
kv_cache_dtype: Optional[str] = "auto",
is_driver_worker: bool = False,
vision_language_config: Optional[VisionLanguageConfig] = None,
):
super(AscendModelRunner, self).__init__(
model_config,
parallel_config,
scheduler_config,
device_config,
load_config,
lora_config,
kv_cache_dtype,
is_driver_worker,
vision_language_config,
)
self.kv_cache_torch_dtype = get_kv_cache_torch_dtype(kv_cache_dtype, model_config.dtype)
self.mindie_model_config = mindie_model_config
self.mindie_model_config["kv_cache_dtype"] = self.kv_cache_torch_dtype
def load_model(self) -> None:
self.model = get_model(
model_config=self.model_config,
device_config=self.device_config,
load_config=self.load_config,
mindie_model_config=self.mindie_model_config,
)
if self.kv_cache_dtype == "fp8" and is_hip():
# Currently scaled KV cache is only enabled on ROCm
if self.model_config.quantization_param_path is not None:
if callable(getattr(self.model, "load_kv_cache_scales", None)):
self.model.load_kv_cache_scales(self.model_config.quantization_param_path)
else:
raise RuntimeError(
"Using FP8 KV cache and scaling factors provided but "
"model %s does not support loading scaling factors.",
self.model.__class__,
)
else:
logger.warning(
"Using FP8 KV cache but no scaling factors "
"provided. Defaulting to scaling factors of 1.0. "
"This may lead to less accurate results!"
)
elif self.model_config.quantization_param_path is not None:
logger.warning(
"KV cache scaling factors provided, "
"but the KV cache data type is not FP8. "
"KV cache scaling factors will not be used."
)
@torch.inference_mode()
def profile_run(self) -> None:
# Enable top-k sampling to reflect the accurate memory usage.
sampling_params = SamplingParams(top_p=0.99, top_k=self.vocab_size - 1)
max_num_batched_tokens = self.scheduler_config.max_num_batched_tokens
max_num_seqs = self.scheduler_config.max_num_seqs
dummy_lora_requests_per_seq = []
seqs: List[SequenceGroupMetadata] = []
if self.vision_language_config:
max_num_seqs = min(
max_num_seqs, int(max_num_batched_tokens / self.vision_language_config.image_feature_size)
)
for group_id in range(max_num_seqs):
seq_len = max_num_batched_tokens // max_num_seqs + (group_id < max_num_batched_tokens % max_num_seqs)
seq_data, fake_multi_modal_input = _prepare_fake_inputs(seq_len, self.vision_language_config)
seq = SequenceGroupMetadata(
request_id=str(group_id),
is_prompt=True,
seq_data={group_id: seq_data},
sampling_params=sampling_params,
block_tables=None,
lora_request=dummy_lora_requests_per_seq[group_id] if dummy_lora_requests_per_seq else None,
multi_modal_data=fake_multi_modal_input,
)
seqs.append(seq)
# Run the model with the dummy inputs.
num_layers = self.model_config.get_num_layers(self.parallel_config)
kv_caches = [(None, None)] * num_layers
self.execute_model(seqs, kv_caches)
torch.npu.synchronize()
return
def _prepare_prompt(
self,
seq_group_metadata_list: List[SequenceGroupMetadata],
) -> PreparePromptMetadata:
input_tokens: List[int] = []
input_positions: List[int] = []
slot_mapping: List[int] = []
lora_index_mapping: List[int] = []
lora_prompt_mapping: List[int] = []
lora_requests: List[LoRARequest] = []
seq_lens: List[int] = []
context_lens: List[int] = []
query_lens: List[int] = []
prefix_block_tables: List[List[int]] = []
multi_modal_input_list: List[torch.Tensor] = []
if len(seq_group_metadata_list) == 0:
return PreparePromptMetadata.empty()
for seq_group_metadata in seq_group_metadata_list:
if not seq_group_metadata.is_prompt:
raise ValueError("Expected prompt sequence group metadata.")
seq_ids = list(seq_group_metadata.seq_data.keys())
if len(seq_ids) != 1:
raise ValueError("Expected only one sequence ID.")
seq_id = seq_ids[0]
computed_block_nums = seq_group_metadata.computed_block_nums
if (
self.scheduler_config is not None
and self.scheduler_config.chunked_prefill_enabled
and not (computed_block_nums is None or computed_block_nums == [])
):
raise RuntimeError("chunked prefill cannot be used with prefix caching " "now.")
token_chunk_size = seq_group_metadata.token_chunk_size
seq_data = seq_group_metadata.seq_data[seq_id]
context_len = seq_data.get_num_computed_tokens()
# We should use get_len here because in case of preemption
# it contains output tokens.
seq_len = min(seq_data.get_len(), context_len + token_chunk_size)
prompt_tokens = seq_data.get_token_ids()[context_len:seq_len]
seq_lens.append(seq_len)
# NOTE: This only works for oooooooxxx style attention.
if seq_group_metadata.block_tables is not None:
block_table = seq_group_metadata.block_tables[seq_id]
else:
block_table = []
if block_table:
prefix_block_tables.append(block_table)
else:
prefix_block_tables.append([])
if computed_block_nums is not None and len(computed_block_nums) > 0 and self.sliding_window is None:
# Prefix is not supported with sliding_window
context_len = len(computed_block_nums) * self.block_size
prompt_tokens = prompt_tokens[context_len:]
# actual prompt lens
context_lens.append(context_len)
query_lens.append(seq_len - context_len)
input_tokens.extend(prompt_tokens)
# NOTE(woosuk): Here we assume that the first token in the prompt
# is always the first token in the sequence.
input_positions.extend(list(range(context_len, seq_len)))
lora_id = seq_group_metadata.lora_int_id
lora_requests.append(seq_group_metadata.lora_request)
lora_index_mapping += [lora_id] * (seq_len - context_len)
lora_prompt_mapping.extend(
[lora_id] * (seq_len - context_len if seq_group_metadata.sampling_params.prompt_logprobs else 1)
)
if seq_group_metadata.multi_modal_data:
multi_modal_input_list.append(seq_group_metadata.multi_modal_data.data)
if seq_group_metadata.block_tables is None:
# During memory profiling, the block tables are not initialized
# yet. In this case, we just use a dummy slot mapping.
slot_mapping.extend([_PAD_SLOT_ID] * seq_len)
continue
# Compute the slot mapping.
block_table = seq_group_metadata.block_tables[seq_id]
# Mask the [0, start_idx) tokens of the prompt with _PAD_SLOT_ID,
# where start_idx is max(0, seq_len - sliding_window).
# For example, if the prompt len is 10, sliding window is 8, and
# block size is 4, the first two tokens are masked and the slot
# mapping will be [-1, -1, 2, 3, 4, 5, 6, 7, 0, 1].
start_idx = 0
if self.sliding_window is not None:
if context_len != 0:
raise ValueError("Prefix caching is currently not supported with sliding window attention")
start_idx = max(0, seq_len - self.sliding_window)
for i in range(context_len, seq_len):
if i < start_idx:
slot_mapping.append(_PAD_SLOT_ID)
continue
block_number = block_table[i // self.block_size]
block_offset = i % self.block_size
slot = block_number * self.block_size + block_offset
slot_mapping.append(slot)
max_query_len = max(query_lens)
max_seq_len = max(seq_lens)
if max_query_len <= 0:
raise ValueError("max_query_len must be greater than 0")
context_lens_tensor = torch.tensor(context_lens, dtype=torch.int, device=self.device)
if multi_modal_input_list:
if not self.vision_language_config:
raise ValueError("Multi-modal inputs are only supported by vision language models.")
multi_modal_input = torch.cat(multi_modal_input_list, dim=0).to(self.device)
else:
multi_modal_input = None
# Prepare prefix block tables
max_prompt_block_table_len = max(len(t) for t in prefix_block_tables)
block_tables = make_tensor_with_pad(
prefix_block_tables,
max_len=max_prompt_block_table_len,
pad=0,
dtype=torch.int,
device=self.device,
)
# Query length can be shorter than key (i.e., prompt) when prefill
# is chunked or prefix cached.
query_lens_tensor = torch.tensor(query_lens, dtype=torch.long, device=self.device)
subquery_start_loc = torch.zeros(query_lens_tensor.shape[0] + 1, dtype=torch.int32, device=self.device)
seq_lens_tensor = torch.tensor(seq_lens, dtype=torch.int, device=self.device)
seq_start_loc = torch.zeros(seq_lens_tensor.shape[0] + 1, dtype=torch.int32, device=self.device)
torch.cumsum(query_lens_tensor, dim=0, dtype=subquery_start_loc.dtype, out=subquery_start_loc[1:])
torch.cumsum(seq_lens_tensor, dim=0, dtype=seq_start_loc.dtype, out=seq_start_loc[1:])
if self.attn_backend is FlashInferBackend:
attn_metadata = self.attn_backend.make_metadata(
is_prompt=True,
use_cuda_graph=False,
seq_start_loc=seq_start_loc,
max_seq_len=max_seq_len,
block_tables=block_tables,
)
else:
attn_metadata = self.attn_backend.make_metadata(
is_prompt=True,
seq_lens=seq_lens,
seq_lens_tensor=seq_lens_tensor,
max_query_len=max_query_len,
max_seq_len=max_seq_len,
subquery_start_loc=subquery_start_loc,
seq_start_loc=seq_start_loc,
context_lens_tensor=context_lens_tensor,
block_tables=block_tables,
use_cuda_graph=False,
)
return PreparePromptMetadata(
input_tokens=input_tokens,
input_positions=input_positions,
attn_metadata=attn_metadata,
seq_lens=seq_lens,
query_lens=query_lens,
lora_index_mapping=lora_index_mapping,
lora_prompt_mapping=lora_prompt_mapping,
lora_requests=lora_requests,
multi_modal_input=multi_modal_input,
slot_mapping=slot_mapping,
)
def _prepare_decode(
self,
seq_group_metadata_list: List[SequenceGroupMetadata],
) -> PrepareDecodeMetadata:
input_tokens: List[int] = []
input_positions: List[int] = []
slot_mapping: List[int] = []
seq_lens: List[int] = []
block_tables: List[List[int]] = []
lora_index_mapping: List[int] = []
lora_prompt_mapping: List[int] = []
lora_requests: List[LoRARequest] = []
if len(seq_group_metadata_list) == 0:
return PrepareDecodeMetadata.empty()
for seq_group_metadata in seq_group_metadata_list:
if seq_group_metadata.is_prompt:
raise ValueError("seq_group_metadata should not be a prompt")
if seq_group_metadata.token_chunk_size != 1:
raise ValueError("token_chunk_size should be 1")
seq_ids = list(seq_group_metadata.seq_data.keys())
lora_id = seq_group_metadata.lora_int_id
lora_requests.append(seq_group_metadata.lora_request)
for seq_id in seq_ids:
seq_data = seq_group_metadata.seq_data[seq_id]
generation_token = seq_data.get_last_token_id()
input_tokens.append(generation_token)
seq_len = seq_data.get_len()
position = seq_len - 1
input_positions.append(position)
seq_len = seq_len if self.sliding_window is None else min(seq_len, self.sliding_window)
seq_lens.append(seq_len)
block_table = seq_group_metadata.block_tables[seq_id]
block_number = block_table[position // self.block_size]
block_offset = position % self.block_size
slot = block_number * self.block_size + block_offset
slot_mapping.append(slot)
lora_index_mapping.append(lora_id)
lora_prompt_mapping.append(lora_id)
if self.sliding_window is not None:
sliding_window_blocks = self.sliding_window // self.block_size
block_table = block_table[-sliding_window_blocks:]
block_tables.append(block_table)
max_seq_len = max(seq_lens)
seq_lens_tensor = torch.tensor(seq_lens, dtype=torch.int, device=self.device)
max_block_table_len = max(len(block_table) for block_table in block_tables)
block_tables = make_tensor_with_pad(
block_tables,
max_len=max_block_table_len,
pad=0,
dtype=torch.int,
device=self.device,
)
attn_metadata = self.attn_backend.make_metadata(
is_prompt=False,
seq_lens=None,
seq_lens_tensor=seq_lens_tensor,
max_query_len=None,
max_seq_len=max_seq_len,
subquery_start_loc=None,
seq_start_loc=None,
context_lens_tensor=None,
block_tables=block_tables,
use_cuda_graph=False,
)
return PrepareDecodeMetadata(
input_tokens=input_tokens,
input_positions=input_positions,
attn_metadata=attn_metadata,
lora_index_mapping=lora_index_mapping,
lora_prompt_mapping=lora_prompt_mapping,
lora_requests=lora_requests,
slot_mapping=slot_mapping,
)
def prepare_input_tensors(
self,
seq_group_metadata_list: List[SequenceGroupMetadata],
) -> Tuple[
torch.Tensor, torch.Tensor, AttentionMetadata, SamplingMetadata, List[LoRARequest], LoRAMapping, torch.Tensor
]:
if self.is_driver_worker:
prefill_reqs = []
decode_reqs = []
for seq_group_meta in seq_group_metadata_list:
if seq_group_meta.is_prompt:
prefill_reqs.append(seq_group_meta)
else:
decode_reqs.append(seq_group_meta)
# Prepare input tensors.
(
input_tokens,
input_positions,
prefill_attn_metadata,
seq_lens,
query_lens,
lora_index_mapping,
lora_prompt_mapping,
lora_requests,
multi_modal_input,
slot_mapping,
) = self._prepare_prompt(prefill_reqs)
(
decode_input_tokens,
decode_input_positions,
decode_attn_metadata,
decode_lora_index_mapping,
decode_lora_prompt_mapping,
decode_lora_requests,
decode_slot_mapping,
) = self._prepare_decode(decode_reqs)
sampling_metadata = SamplingMetadata.prepare(
seq_group_metadata_list, seq_lens, query_lens, self.device, self.pin_memory
)
if not self.scheduler_config.chunked_prefill_enabled and prefill_reqs and decode_reqs:
raise ValueError("Cannot have both prefill and decode requests when chunked_prefill_enabled is False")
num_prefills = len(seq_lens)
num_prefill_tokens = len(input_tokens)
num_decode_tokens = len(decode_input_tokens)
# Coalesce tensors. Note that attn_metadata is currently not
# coalesced for simplicity.
input_tokens.extend(decode_input_tokens)
input_positions.extend(decode_input_positions)
slot_mapping.extend(decode_slot_mapping)
lora_index_mapping.extend(decode_lora_index_mapping)
lora_prompt_mapping.extend(decode_lora_prompt_mapping)
lora_requests.extend(decode_lora_requests)
input_tokens = torch.tensor(input_tokens, dtype=torch.long, device=self.device)
input_positions = torch.tensor(input_positions, dtype=torch.long, device=self.device)
slot_mapping = torch.tensor(slot_mapping, dtype=torch.long, device=self.device)
if self.lora_config:
lora_mapping = LoRAMapping(
lora_index_mapping,
lora_prompt_mapping,
)
else:
lora_mapping = None
# Broadcast the metadata.
# If batch contains both prefill and decode, it sends 2 broadcasts.
# If it only contains 1 type, it triggers a single broadcast.
if prefill_attn_metadata is not None and decode_attn_metadata is not None:
batch_type = BatchType.MIXED
elif prefill_attn_metadata is not None:
batch_type = BatchType.PREFILL
else:
batch_type = BatchType.DECODE
metadata_dict = {
"input_tokens": input_tokens,
"input_positions": input_positions,
"selected_token_indices": sampling_metadata.selected_token_indices,
"lora_requests": lora_requests,
"lora_mapping": lora_mapping,
"multi_modal_input": multi_modal_input,
"num_prefill_tokens": num_prefill_tokens,
"num_decode_tokens": num_decode_tokens,
"slot_mapping": slot_mapping,
"num_prefills": num_prefills,
"batch_type": batch_type,
}
if prefill_attn_metadata is not None:
metadata_dict.update(prefill_attn_metadata.asdict_zerocopy())
else:
if decode_attn_metadata is None:
raise ValueError("decode_attn_metadata is None")
metadata_dict.update(decode_attn_metadata.asdict_zerocopy())
broadcast_tensor_dict(metadata_dict, src=0)
# Broadcast decode attn metadata for mixed batch type.
# The additional broadcast costs 300us overhead on 4 A10 GPUs.
# We can potentially reduce the overhead by coelescing tensors.
if batch_type == BatchType.MIXED:
if decode_attn_metadata is None:
raise ValueError("decode_attn_metadata is None")
metadata_dict = decode_attn_metadata.asdict_zerocopy()
broadcast_tensor_dict(metadata_dict, src=0)
else:
metadata_dict = broadcast_tensor_dict(src=0)
input_tokens = metadata_dict.pop("input_tokens")
input_positions = metadata_dict.pop("input_positions")
slot_mapping = metadata_dict.pop("slot_mapping")
num_prefills = metadata_dict.pop("num_prefills")
selected_token_indices = metadata_dict.pop("selected_token_indices")
lora_mapping = metadata_dict.pop("lora_mapping")
lora_requests = metadata_dict.pop("lora_requests")
multi_modal_input = metadata_dict.pop("multi_modal_input")
num_prefill_tokens = metadata_dict.pop("num_prefill_tokens")
num_decode_tokens = metadata_dict.pop("num_decode_tokens")
batch_type = metadata_dict.pop("batch_type")
# Create an attention metadata.
prefill_attn_metadata = None
decode_attn_metadata = None
if batch_type == BatchType.PREFILL or batch_type == BatchType.MIXED:
prefill_attn_metadata = self.attn_backend.make_metadata(**metadata_dict)
else:
decode_attn_metadata = self.attn_backend.make_metadata(**metadata_dict)
sampling_metadata = SamplingMetadata(
seq_groups=None,
selected_token_indices=selected_token_indices,
categorized_sample_indices=None,
num_prompts=0,
)
# if it is a mixed batch, decode attn_metadata is broadcasted
# separately.
if batch_type == BatchType.MIXED:
metadata_dict = broadcast_tensor_dict(src=0)
decode_attn_metadata = self.attn_backend.make_metadata(**metadata_dict)
attn_metadata = AttentionMetadata(
num_prefills=num_prefills,
slot_mapping=slot_mapping,
num_prefill_tokens=num_prefill_tokens,
num_decode_tokens=num_decode_tokens,
prefill_metadata=prefill_attn_metadata,
decode_metadata=decode_attn_metadata,
kv_cache_dtype=self.kv_cache_torch_dtype,
)
return (
input_tokens,
input_positions,
attn_metadata,
sampling_metadata,
lora_requests,
lora_mapping,
multi_modal_input,
)
@torch.inference_mode()
def execute_model(
self,
seq_group_metadata_list: List[SequenceGroupMetadata],
kv_caches: List[Tuple[torch.Tensor, torch.Tensor]],
) -> Optional[SamplerOutput]:
(input_tokens, input_positions, attn_metadata, sampling_metadata, lora_requests, _, _) = (
self.prepare_input_tensors(seq_group_metadata_list)
)
# Currently cuda graph is only supported by the decode phase.
model_executable = self.model
execute_model_kwargs = {
"input_ids": input_tokens,
"positions": input_positions,
"kv_caches": kv_caches,
"attn_metadata": attn_metadata,
"lora_requests": lora_requests,
}
hidden_states = model_executable(**execute_model_kwargs)
# Only perform sampling in the driver worker.
if not self.is_driver_worker:
return None
# Sample the next token.
output = self.model.sample(
logits=hidden_states,
sampling_metadata=sampling_metadata,
)
return output
- vllm_npu/vllm_npu/worker/ascend_worker.py:实现昇腾后端所使用的AscendWorker类。
"""A Ascend worker class."""
import gc
from typing import Dict, List, Tuple, Set, Optional, Any
import torch
import torch.distributed
from vllm.config import (CacheConfig, DeviceConfig, ModelConfig, LoadConfig,
ParallelConfig, SchedulerConfig, LoRAConfig, VisionLanguageConfig)
from vllm.model_executor import set_random_seed
from vllm.distributed import (broadcast_tensor_dict,
ensure_model_parallel_initialized,
init_distributed_environment)
from vllm.sequence import SamplerOutput, ExecuteModelRequest
from vllm.worker.cache_engine import CacheEngine
from vllm.lora.request import LoRARequest
from vllm.worker.worker_base import WorkerBase
from vllm.worker.worker import raise_if_cache_size_invalid
from vllm_npu.worker.ascend_model_runner import AscendModelRunner
class AscendWorker(WorkerBase):
"""A worker class that executes the model on a group of Ascend NPUs.
"""
def __init__(
self,
model_config: ModelConfig,
parallel_config: ParallelConfig,
scheduler_config: SchedulerConfig,
device_config: DeviceConfig,
cache_config: CacheConfig,
load_config: LoadConfig,
local_rank: int,
rank: int,
distributed_init_method: str,
lora_config: Optional[LoRAConfig] = None,
vision_language_config: Optional[VisionLanguageConfig] = None,
is_driver_worker: bool = False,
) -> None:
self.model_config = model_config
self.parallel_config = parallel_config
self.scheduler_config = scheduler_config
self.device_config = device_config
self.cache_config = cache_config
self.local_rank = local_rank
self.rank = rank
self.distributed_init_method = distributed_init_method
self.lora_config = lora_config
self.load_config = load_config
self.is_driver_worker = is_driver_worker
if self.is_driver_worker:
assert self.rank == 0, "The driver worker must have rank 0."
if self.model_config.trust_remote_code:
# note: lazy import to avoid importing torch before initializing
from vllm.utils import init_cached_hf_modules
init_cached_hf_modules()
self.vision_language_config = vision_language_config
mindie_model_config = {
'backend_type': 'atb',
'model_id': model_config.model,
'rank': rank,
'local_rank': local_rank,
'world_size': parallel_config.world_size,
'npu_device_id': local_rank,
"trust_remote_code": model_config.trust_remote_code,
'inference_mode': 2 if scheduler_config.chunked_prefill_enabled
or cache_config.enable_prefix_caching else 0,
}
self.model_runner = AscendModelRunner(
model_config,
parallel_config,
scheduler_config,
device_config,
load_config=load_config,
lora_config=self.lora_config,
kv_cache_dtype=self.cache_config.cache_dtype,
is_driver_worker=is_driver_worker,
mindie_model_config=mindie_model_config)
# Uninitialized cache engine. Will be initialized by
# self.initialize_cache().
self.cache_engine: CacheEngine
self.gpu_cache: List[torch.Tensor]
def init_device(self) -> None:
self.device = torch.device(f"npu:{self.local_rank}")
torch.npu.set_device(self.device)
# Initialize the distributed environment.
init_worker_distributed_environment(self.parallel_config, self.rank,
self.distributed_init_method,
self.local_rank)
# Initialize the model.
set_random_seed(self.model_config.seed)
def load_model(self):
self.model_runner.load_model()
@torch.inference_mode()
def determine_num_available_blocks(self) -> Tuple[int, int]:
"""Profiles the peak memory usage of the model and returns the maximum
number of NPU and CPU cache blocks that can be allocated.
"""
# Profile the memory usage of the model and get the maximum number of
# cache blocks that can be allocated with the remaining free memory.
torch.npu.empty_cache()
torch.npu.reset_peak_memory_stats()
# Execute a forward pass with dummy inputs to profile the memory usage
# of the model.
self.model_runner.profile_run()
block_size = self.cache_config.block_size
dummy_block_size = 128
dummy_num_blocks = dummy_block_size // block_size
# Calculate the number of blocks that can be allocated with the
# profiled peak memory.
torch.npu.synchronize()
peak_memory = torch.npu.max_memory_allocated()
total_gpu_memory = torch.npu.get_device_properties(self.rank).total_memory
cache_block_size = CacheEngine.get_cache_block_size(
self.cache_config, self.model_config, self.parallel_config)
num_gpu_blocks = int(
(total_gpu_memory * self.cache_config.gpu_memory_utilization - peak_memory) //
cache_block_size) + dummy_num_blocks
num_cpu_blocks = int(self.cache_config.swap_space_bytes // cache_block_size)
num_gpu_blocks = max(num_gpu_blocks, 0)
num_cpu_blocks = max(num_cpu_blocks, 0)
if self.model_runner.lora_manager:
self.model_runner.remove_all_loras()
gc.collect()
torch.npu.empty_cache()
return num_gpu_blocks, num_cpu_blocks
def initialize_cache(self, num_gpu_blocks: int,
num_cpu_blocks: int) -> None:
raise_if_cache_size_invalid(num_gpu_blocks,
self.cache_config.block_size,
self.model_config.max_model_len)
self.cache_config.num_gpu_blocks = num_gpu_blocks
self.cache_config.num_cpu_blocks = num_cpu_blocks
self._init_cache_engine()
self._warm_up_model()
def _init_cache_engine(self):
assert self.cache_config.num_gpu_blocks is not None
self.cache_engine = CacheEngine(self.cache_config, self.model_config,
self.parallel_config)
self.gpu_cache = self.cache_engine.gpu_cache
self.model_runner.set_block_size(self.cache_engine.block_size)
def _warm_up_model(self) -> None:
pass
def cache_swap(
self,
blocks_to_swap_in: Dict[int, int],
blocks_to_swap_out: Dict[int, int],
blocks_to_copy: Dict[int, List[int]],
) -> None:
if blocks_to_swap_in:
self.cache_engine.swap_in(blocks_to_swap_in)
if blocks_to_swap_out:
self.cache_engine.swap_out(blocks_to_swap_out)
if blocks_to_copy:
self.cache_engine.copy(blocks_to_copy)
@torch.inference_mode()
def execute_model(
self,
execute_model_req: Optional[ExecuteModelRequest] = None
) -> List[SamplerOutput]:
if execute_model_req is None:
seq_group_metadata_list = None
else:
seq_group_metadata_list = execute_model_req.seq_group_metadata_list
if self.is_driver_worker:
assert seq_group_metadata_list is not None
assert execute_model_req is not None
num_seq_groups = len(seq_group_metadata_list)
blocks_to_swap_in = execute_model_req.blocks_to_swap_in
blocks_to_swap_out = execute_model_req.blocks_to_swap_out
blocks_to_copy = execute_model_req.blocks_to_copy
data: Dict[str, Any] = {
"num_seq_groups": num_seq_groups,
"blocks_to_swap_in": blocks_to_swap_in,
"blocks_to_swap_out": blocks_to_swap_out,
"blocks_to_copy": blocks_to_copy,
}
broadcast_tensor_dict(data, src=0)
else:
data = broadcast_tensor_dict(src=0)
num_seq_groups = data["num_seq_groups"]
blocks_to_swap_in = data["blocks_to_swap_in"]
blocks_to_swap_out = data["blocks_to_swap_out"]
blocks_to_copy = data["blocks_to_copy"]
self.cache_swap(blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy)
# If there is no input, we don't need to execute the model.
if num_seq_groups == 0:
return []
output = self.model_runner.execute_model(seq_group_metadata_list,
self.gpu_cache)
# Worker only supports single-step execution. Wrap the output in a list
# to conform to interface.
return [output]
def add_lora(self, lora_request: LoRARequest) -> bool:
return self.model_runner.add_lora(lora_request)
def remove_lora(self, lora_id: int) -> bool:
return self.model_runner.remove_lora(lora_id)
def list_loras(self) -> Set[int]:
return self.model_runner.list_loras()
def get_cache_block_size_bytes(self) -> int:
"""Get the size of the KV cache block size in bytes.
"""
return CacheEngine.get_cache_block_size(self.cache_config,
self.model_config,
self.parallel_config)
def init_worker_distributed_environment(
parallel_config: ParallelConfig,
rank: int,
distributed_init_method: Optional[str] = None,
local_rank: int = -1,
) -> None:
"""Initialize the distributed environment."""
init_distributed_environment(parallel_config.world_size, rank,
distributed_init_method, local_rank)
ensure_model_parallel_initialized(parallel_config.tensor_parallel_size,
parallel_config.pipeline_parallel_size)
- vllm_npu/vllm_npu/worker/cache_engine.py:重新实现_allocate_kv_cache函数,以便以元组的形式进行kv_cache的创建。
from typing import Tuple, List
import torch
KVCache = Tuple[torch.Tensor, torch.Tensor]
def _allocate_kv_cache(
self,
num_blocks: int,
device: str,
) -> List[KVCache]:
"""Allocates KV cache on the specified device."""
kv_cache: List[KVCache] = []
key_block_shape = (self.block_size, self.num_heads, self.head_size)
value_block_shape = (self.block_size, self.num_heads, self.head_size)
for _ in range(self.num_layers):
key_blocks = torch.empty(
size=(num_blocks, *key_block_shape),
dtype=self.dtype,
device=device,
)
value_blocks = torch.empty(
size=(num_blocks, *value_block_shape),
dtype=self.dtype,
device=device,
)
kv_cache.append((key_blocks, value_blocks))
return kv_cache
- vllm_npu/vllm_npu/config.py:重写DeviceConfig类以适配npu类型的device;重写_get_and_verify_max_len函数以解决一些模型config加载时的问题。
import warnings
from typing import Optional
import torch
from transformers import PretrainedConfig
from vllm.logger import init_logger
logger = init_logger(__name__)
class DeviceConfig:
def __init__(self, device: str = "auto") -> None:
if device == "auto":
# Automated device type detection
if getattr(torch.version, "cann", None) is not None:
self.device_type = "npu"
else:
warnings.warn(
"Failed to detect cann in your environment. \
Please check whether you have installed cann correctly. \
Now the device type for processing input is set to cpu."
)
self.device_type = "cpu"
else:
# Device type is assigned explicitly
self.device_type = device
self.device = torch.device(self.device_type)
def _get_and_verify_max_len(
hf_config: PretrainedConfig,
max_model_len: Optional[int],
) -> int:
"""Get and verify the model's maximum length."""
derived_max_model_len = float("inf")
possible_keys = [
# OPT
"max_position_embeddings",
# GPT-2
"n_positions",
# MPT
"max_seq_len",
# ChatGLM2
"seq_length",
# Command-R
"model_max_length",
# Others
"max_sequence_length",
"max_seq_length",
"seq_len",
]
for key in possible_keys:
max_len_key = getattr(hf_config, key, None)
if max_len_key is not None:
derived_max_model_len = min(derived_max_model_len, max_len_key)
if derived_max_model_len == float("inf"):
if max_model_len is not None:
# If max_model_len is specified, we use it.
return max_model_len
default_max_len = 2048
logger.warning(
"The model's config.json does not contain any of the following "
"keys to determine the original maximum length of the model: "
f"{possible_keys}. Assuming the model's maximum length is "
f"{default_max_len}."
)
derived_max_model_len = default_max_len
rope_scaling = getattr(hf_config, "rope_scaling", None)
if rope_scaling is not None:
if "type" in rope_scaling:
rope_type = rope_scaling["type"]
elif "rope_type" in rope_scaling:
rope_type = rope_scaling["rope_type"]
else:
raise ValueError("rope_scaling must have a 'type' or 'rope_type' key.")
# The correct one should be "longrope", kept "su" here
# to be backward compatible
if rope_type not in ("su", "longrope", "llama3"):
if "factor" not in rope_scaling:
raise ValueError("rope_scaling must have a 'factor' key.")
scaling_factor = rope_scaling["factor"]
if rope_type == "yarn":
derived_max_model_len = rope_scaling["original_max_position_embeddings"]
derived_max_model_len *= scaling_factor
if max_model_len is None:
max_model_len = derived_max_model_len
elif max_model_len > derived_max_model_len:
raise ValueError(
f"User-specified max_model_len ({max_model_len}) is greater than "
f"the derived max_model_len ({max_len_key}={derived_max_model_len}"
" in model's config.json). This may lead to incorrect model "
"outputs or CUDA errors. Make sure the value is correct and "
"within the model context size."
)
return int(max_model_len)
- vllm_npu/vllm_npu/utils.py:添加是否是昇腾后端的判断函数;重写get_ip函数以修复多卡运行时卡住的问题。
# Part of codes in this file was copied from project [vLLM Team][vllm]
import socket
import warnings
from functools import lru_cache
import vllm.envs as envs
@lru_cache(maxsize=None)
def is_ascend() -> bool:
try:
import torch_npu
except ImportError:
torch_npu = None
return torch_npu is not None
def get_ip() -> str:
host_ip = envs.VLLM_HOST_IP
if host_ip:
return host_ip
# IP is not set, try to get it from the network interface
# try ipv4
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("localhost", 80)) # Doesn't need to be reachable
socket_name = s.getsockname()[0]
s.close()
return socket_name
except Exception:
warnings.warn("Encounted with connection errors. Using 0.0.0.0 by default.")
s.close()
# try ipv6
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
# Google's public DNS server, see
# https://developers.google.com/speed/public-dns/docs/using#addresses
s.connect(("localhost", 80)) # Doesn't need to be reachable
socket_name = s.getsockname()[0]
s.close()
return socket_name
except Exception:
warnings.warn("Encounted with connection errors. Using 0.0.0.0 by default.")
s.close()
s.close()
warnings.warn(
"Failed to get the IP address, using 0.0.0.0 by default."
"The value can be set by the environment variable"
" VLLM_HOST_IP or HOST_IP.",
stacklevel=2)
return "0.0.0.0"
- vllm_npu/vllm_npu/npu_adaptor.py:编写mocky patch去屏蔽掉vllm原生会造成报错的一些import操作。
import importlib
import sys
def replace_modules(old_module_name, new_module_name):
if old_module_name in sys.modules:
del sys.modules[old_module_name]
sys.modules[old_module_name] = importlib.import_module(new_module_name)
_default_ops = (
'xformers',
'xformers.ops',
'vllm._C',
'xformers.ops.fmha.attn_bias',
'vllm.model_executor.layers.ops.sample'
)
for _ops in _default_ops:
replace_modules(_ops, 'vllm_npu')
# dummy class to avoid import error.
class ops:
pass
class cuda_utils:
pass
class cache_ops:
pass
class BlockDiagonalCausalMask:
pass
class LowerTriangularMaskWithTensorBias:
pass
def context_attention_fwd():
pass
def get_num_triton_sampler_splits():
pass
def sample():
pass
- vllm_npu/vllm_npu/__init__.py:做子模块以外的公共模块的热替换,已经mocky patch的应用。
import torch
import torch_npu
from torch_npu.contrib import transfer_to_npu
from vllm_npu.npu_adaptor import (BlockDiagonalCausalMask,
LowerTriangularMaskWithTensorBias,
cache_ops, cuda_utils, ops,
context_attention_fwd,
get_num_triton_sampler_splits, sample)
import vllm_npu.core
import vllm_npu.engine
import vllm_npu.worker
import vllm_npu.model_executor
import vllm_npu.executor
import vllm_npu.attention
import vllm_npu.usage
from vllm_npu.utils import get_ip
from vllm_npu.config import DeviceConfig, _get_and_verify_max_len
import vllm.utils as utils
import vllm.executor.ray_utils as ray_utils
import vllm.config as vconfig
import vllm.engine.arg_utils as varg_utils
utils.get_ip = get_ip
ray_utils.get_ip = get_ip
vconfig.DeviceConfig = DeviceConfig
vconfig._get_and_verify_max_len = _get_and_verify_max_len
varg_utils.DeviceConfig = DeviceConfig
__version__ = "0.4.2"
- examples/start_server.sh:在线模式运行vllm的示例脚本。
#!/bin/bash
export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors
python -m vllm.entrypoints.openai.api_server --model=facebook/opt-125m --trust-remote-code --enforce-eager --worker-use-ray
- examples/offline_inference.py:离线模式运行vllm的示例脚本。
from vllm import LLM, SamplingParams
import json
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--model_path', type=str, default="facebook/opt-125m")
# input prompts for test
prompts = [
"Hello, my name is",
"The president of the United States is",
"The capital of France is",
"The future of AI is",
]
sampling_params = SamplingParams(max_tokens=512, temperature=0)
args = parser.parse_args()
model_path = args.model_path
llm = LLM(model=model_path,
block_size=128,
max_model_len=4096, # max length of prompt
tensor_parallel_size=8, # number of NPUs to be used
max_num_seqs=256, # max batch number
enforce_eager=True, # disable CUDA graph mode
trust_remote_code=True, # If the model is a custom model not yet available in the HuggingFace transformers library
worker_use_ray=True,
)
outputs = llm.generate(prompts, sampling_params)
for i, output in enumerate(outputs):
prompt = output.prompt
generated_text = output.outputs[0].text
print(f"req_num: {i}\nPrompt: {prompt!r}\nGenerated text: {generated_text!r}")
- examples/offline_inference.sh:离线模式运行vllm的示例脚本。
#!/bin/bash
export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors
python3 offline_inference.py --model_path facebook/opt-125m
- install.sh:一键安装脚本。
#!/bin/bash
if [ -d "./vllm" ]; then
echo "./vllm directory has already exist!"
exit 1
fi
git clone -b v0.4.2 https://github.com/vllm-project/vllm.git vllm
yes | cp -r cover/* vllm/
cd vllm
pip install -r requirements-ascend.txt
python3 setup.py install
cd ../vllm_npu
pip install -r requirements.txt
python3 setup.py install
- README.md:
# Vllm-MindIE
## 介绍
昇腾推理引擎对接vLLM开源框架v0.4.2稳定版本补丁
## 适配方案
在昇腾环境中适配vLLM框架的方案如下:
- 上层维持vLLM框架原生逻辑,包括请求调度、batch构建以及通过Ray分布式框架启动多卡服务等功能;
- 下层的模型推理与后处理则通过MindIE-LLM提供的`GeneratorTorch`统一接口,接入MindIE模型仓进行统一管理,从而利用整图加速库实现模型推理加速。
## 支持特性
0.4.2版本已经支持的功能特性如下
- 浮点模型推理
- 量化模型推理
- LoRA模型推理
- SplitFuse
## 环境准备
### 依赖版本
- Vllm-MindIE适配仓代码配套可运行的硬件型号
- Atlas 800I A2(32GB/64GB显存)
- Vllm-MindIE适配仓代码运行相关配套软件
- 系统OS
- 驱动(HDK)
- CANN
- Python
- PTA
- 开源软件依赖
- 版本配套关系
- 当前Vllm-MindIE适配仓需基于CANN包8.0版本及以上,Python 3.10,torch 2.1.0进行环境部署与运行
### 环境配置
环境配置的详细过程参照[MindIE-LLM模型仓](https://gitee.com/ascend/MindIE-LLM)
## 安装教程
确保昇腾推理基础环境安装完成后,执行`install.sh`文件即可完成vllm及昇腾补丁的安装:
```sh
bash install.sh
```
## 使用说明
这里提供了vllm离线模式与在线服务的启动demo作为参考。
**注:请根据实际情况修改运行脚本offline_inference.sh和start_server.sh中的模型路径参数以使用特定的模型进行推理**
- 离线模式:使用前先设置offline_inference.sh脚本中的model_path参数为推理使用的模型路径
```sh
cd examples
bash offline_inference.sh
```
- 在线服务:使用前先设置start_server.sh脚本中的model参数为推理使用的模型路径;此外,常用的其他参数配置如下:
- -tp n:设置模型推理使用的卡数
- --port port_num:指定推理服务使用的端口号
- --max-num-seqs bs:设置推理服务支持的最大batch size
- 配置使用特定的若干张卡进行推理:添加环境变量,如想使用前四卡进行推理,则在脚本中python命令之前添加如下命令:
```sh
export ASCEND_RT_VISIBLE_DEVICES=0,1,2,3
```
完成上述参数配置后,运行:
```sh
cd examples
bash start_server.sh
```
- 多LoRA:需要在multilora_inference.sh文件中设置如下参数:
- model-path参数:指定推理使用的基础模型路径
- lora-modules参数:指定推理使用的LoRA模型,格式为{LoRA模型名=LoRA模型路径},可以指定多个LoRA模型(**注:在该测试用例中至少要指定两个lora模型才能运行**)
完成上述参数配置后,运行:
```sh
cd examples
bash multilora_inference.sh
```
- Split Fuse: 使用前先设置test_split_fuse.sh脚本中的model_path参数为推理使用的模型路径
```sh
cd examples
bash test_split_fuse.sh
```
## 详细说明
`vllm_npu_0.4.2` 补丁包修改了六个主要模块:`attention`、`engine`、`usage`、`executor`、`model_executor` 和 `worker`。这些模块通过热替换确保与 Ascend 后端兼容,同时保持与 vLLM 原生框架中对应模块的一致性。以下是各模块的修改内容:
### Attention 模块
在该模块中,定义了`AscendAttentionBackend`类以整合Ascend后端做attention计算所需要的数据
### Engine 模块
该模块引入了一些关键更改,以确保能够正确进入补丁仓的`executor` 模块,通过重新实现`LLMEngine`和`AsyncLLMEngine`的`from_engine_args`函数实现。此外,在 Ray 模块初始化时,将 `num_gpus` 参数设置为 `parallel_config.world_size`,解决了多卡服务启动问题。
### Usage 模块
该模块主要为了解决vLLM原生框架的`usage_lib`在Ascend环境下获取设备信息报错的问题
### Executor 模块
该模块中定义了`AscendExecutor`和`RayAscendExecutor`类,用于正确对接后续的`AscendWorker`类。
### Model Executor 模块
该模块负责与 MindIE-LLM 推理和后处理管道的对接,包含两个子模块:`layers` 和 `models`。`layers` 子模块主要负责后处理操作,而 `models` 子模块专注于模型推理。
在 `models` 子模块中,创建了一个名为 `MindIELlmWrapper` 的类。该类实例化了 MindIE-LLM 提供的统一接口 `GeneratorTorch`,并从 vLLM 原生框架的数据结构中提取 MindIE-LLM 所需的推理参数,进而通过统一接口进行推理。此外,该类还负责在预热时构造虚拟数据。
同时添加了 `ascend_model_loader.py` 文件,提供 `get_model` 方法,用于在 `ModelRunner` 中加载模型。
### Worker 模块
该模块定义了 `AscendWorker` 类和 `AscendModelRunner` 类,分别定义在`ascend_worker.py` 和 `ascend_model_runner.py` 文件中,用于与 `MindIELlmWrapper` 进行交互,确保模型的加载、预热、推理和后处理过程能够正确执行。此外,还修改了原框架中 `CacheEngine` 的 `_allocate_kv_cache` 函数,确保与后端接收的参数形式一致。
将上述所有代码内容,按照项目的目录结构完全还原后,进入项目根目录执行如下脚本,即可完成vLLM昇腾适配版的安装。
bash install.sh