参考代码

vLLM 0.3.3版本昇腾框架适配代码目录结构如下所示:

.
├── cover
│   ├── vllm
│   │   └── __init__.py
│   ├── requirements.txt
│   └── setup.py
├── examples
│   ├── start_server.sh
│   ├── test_offline.py
│   └── test_offline.sh
├── install.sh
└── vllm_npu
    ├── requirements.txt
    ├── setup.py
    ├── tests
    │   ├── models
    │   │   ├── __init__.py
    │   │   └── test_models.py
    │   └── sampler
    │       └── test_sampler.py
    └── vllm_npu
        ├── config.py
        ├── core
        │   ├── __init__.py
        │   └── scheduler.py
        ├── engine
        │   ├── __init__.py
        │   ├── llm_engine.py
        │   └── ray_utils.py
        ├── __init__.py
        ├── model_executor
        │   ├── ascend_model_loader.py
        │   ├── __init__.py
        │   ├── layers
        │   │   ├── __init__.py
        │   │   └── sampler.py
        │   ├── models
        │   │   ├── ascend
        │   │   │   ├── __init__.py
        │   │   │   └── mindie_llm_wrapper.py
        │   │   └── __init__.py
        │   └── utils.py
        ├── npu_adaptor.py
        ├── sampling_params.py
        ├── utils.py
        └── worker
            ├── ascend_worker.py
            ├── cache_engine.py
            ├── __init__.py
            └── model_runner.py
  1. ./cover
    1. ./cover/vllm/__init__.py
      """vLLM: a high-throughput and memory-efficient inference engine for LLMs"""
      # Import vllm_npu here to activate all the patches
      import vllm_npu
      from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs
      from vllm.engine.async_llm_engine import AsyncLLMEngine
      from vllm.engine.llm_engine import LLMEngine
      from vllm.engine.ray_utils import initialize_cluster
      from vllm.entrypoints.llm import LLM
      from vllm.outputs import CompletionOutput, RequestOutput
      from vllm.sampling_params import SamplingParams
      __version__ = "0.3.3"
      __all__ = [
          "LLM",
          "SamplingParams",
          "RequestOutput",
          "CompletionOutput",
          "LLMEngine",
          "EngineArgs",
          "AsyncLLMEngine",
          "AsyncEngineArgs",
          "initialize_cluster",
      ]
    2. ./cover/requirements.txt
      ninja  # For faster builds.
      psutil
      ray == 2.9.3  # Fixed version
      ray == 2.9.3
      sentencepiece  # Required for LLaMA tokenizer.
      numpy
      transformers >= 4.38.0  # Required for Gemma.
      fastapi
      uvicorn[standard]
      pydantic >= 2.0  # Required for OpenAI server.
      prometheus_client >= 0.18.0
      pynvml == 11.5.0
      outlines < 0.1.0  # Newer version will break vllm
    3. ./cover/setup.py
        1
        2
        3
        4
        5
        6
        7
        8
        9
       10
       11
       12
       13
       14
       15
       16
       17
       18
       19
       20
       21
       22
       23
       24
       25
       26
       27
       28
       29
       30
       31
       32
       33
       34
       35
       36
       37
       38
       39
       40
       41
       42
       43
       44
       45
       46
       47
       48
       49
       50
       51
       52
       53
       54
       55
       56
       57
       58
       59
       60
       61
       62
       63
       64
       65
       66
       67
       68
       69
       70
       71
       72
       73
       74
       75
       76
       77
       78
       79
       80
       81
       82
       83
       84
       85
       86
       87
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      180
      181
      182
      183
      184
      185
      186
      187
      188
      189
      190
      191
      192
      193
      194
      195
      196
      197
      198
      199
      200
      201
      202
      203
      204
      205
      206
      207
      208
      209
      210
      211
      212
      213
      214
      215
      216
      217
      218
      219
      220
      221
      222
      223
      224
      225
      226
      227
      228
      229
      230
      231
      232
      233
      234
      235
      236
      237
      238
      239
      240
      241
      242
      243
      244
      245
      246
      247
      248
      249
      250
      251
      252
      253
      254
      255
      256
      257
      258
      259
      260
      261
      262
      263
      264
      265
      266
      267
      268
      269
      270
      271
      272
      273
      274
      275
      276
      277
      278
      279
      280
      281
      282
      283
      284
      285
      286
      287
      288
      289
      290
      291
      292
      293
      294
      295
      296
      297
      298
      299
      300
      301
      302
      303
      304
      305
      306
      307
      308
      309
      310
      311
      312
      313
      314
      315
      316
      317
      318
      319
      320
      321
      322
      323
      324
      325
      326
      327
      328
      329
      330
      331
      332
      333
      334
      335
      336
      337
      338
      339
      340
      341
      342
      343
      344
      345
      346
      347
      348
      349
      350
      351
      352
      353
      354
      355
      356
      357
      358
      359
      360
      361
      362
      363
      364
      365
      366
      367
      368
      369
      370
      371
      372
      373
      374
      375
      376
      377
      378
      379
      380
      381
      382
      383
      384
      385
      386
      387
      388
      389
      390
      391
      392
      393
      394
      import contextlib
      import io
      import os
      import re
      import subprocess
      import warnings
      from pathlib import Path
      from typing import List, Set
      from packaging.version import parse, Version
      import setuptools
      import torch
      import torch.utils.cpp_extension as torch_cpp_ext
      from torch.utils.cpp_extension import BuildExtension, CUDAExtension, CUDA_HOME, ROCM_HOME
      ROOT_DIR = os.path.dirname(__file__)
      # If you are developing the C++ backend of vLLM, consider building vLLM with
      # `python setup.py develop` since it will give you incremental builds.
      # The downside is that this method is deprecated, see
      # https://github.com/pypa/setuptools/issues/917
      MAIN_CUDA_VERSION = "12.1"
      # Supported NVIDIA GPU architectures.
      NVIDIA_SUPPORTED_ARCHS = {"7.0", "7.5", "8.0", "8.6", "8.9", "9.0"}
      ROCM_SUPPORTED_ARCHS = {"gfx908", "gfx90a", "gfx942", "gfx1100"}
      # SUPPORTED_ARCHS = NVIDIA_SUPPORTED_ARCHS.union(ROCM_SUPPORTED_ARCHS)
      def _is_hip() -> bool:
          return torch.version.hip is not None
      def _is_neuron() -> bool:
          torch_neuronx_installed = True
          try:
              subprocess.run(["neuron-ls"], capture_output=True, check=True)
          except (FileNotFoundError, PermissionError):
              torch_neuronx_installed = False
          return torch_neuronx_installed
      def _is_cuda() -> bool:
          return (torch.version.cuda is not None) and not _is_neuron()
      # Compiler flags.
      CXX_FLAGS = ["-g", "-O2", "-std=c++17"]
      # TODO(woosuk): Should we use -O3?
      NVCC_FLAGS = ["-O2", "-std=c++17"]
      if _is_hip():
          if ROCM_HOME is None:
              raise RuntimeError(
                  "Cannot find ROCM_HOME. ROCm must be available to build the package."
              )
          NVCC_FLAGS += ["-DUSE_ROCM"]
          NVCC_FLAGS += ["-U__HIP_NO_HALF_CONVERSIONS__"]
          NVCC_FLAGS += ["-U__HIP_NO_HALF_OPERATORS__"]
      if _is_cuda() and CUDA_HOME is None:
          raise RuntimeError(
              "Cannot find CUDA_HOME. CUDA must be available to build the package.")
      ABI = 1 if torch._C._GLIBCXX_USE_CXX11_ABI else 0
      CXX_FLAGS += [f"-D_GLIBCXX_USE_CXX11_ABI={ABI}"]
      NVCC_FLAGS += [f"-D_GLIBCXX_USE_CXX11_ABI={ABI}"]
      def get_hipcc_rocm_version():
          # Run the hipcc --version command
          result = subprocess.run(['hipcc', '--version'],
                                  stdout=subprocess.PIPE,
                                  stderr=subprocess.STDOUT,
                                  text=True)
          # Check if the command was executed successfully
          if result.returncode != 0:
              print("Error running 'hipcc --version'")
              return None
          # Extract the version using a regular expression
          match = re.search(r'HIP version: (\S+)', result.stdout)
          if match:
              # Return the version string
              return match.group(1)
          else:
              print("Could not find HIP version in the output")
              return None
      def glob(pattern: str):
          root = Path(__name__).parent
          return [str(p) for p in root.glob(pattern)]
      def get_neuronxcc_version():
          import sysconfig
          site_dir = sysconfig.get_paths()["purelib"]
          version_file = os.path.join(site_dir, "neuronxcc", "version",
                                      "__init__.py")
          # Check if the command was executed successfully
          with open(version_file, "rt") as fp:
              content = fp.read()
          # Extract the version using a regular expression
          match = re.search(r"__version__ = '(\S+)'", content)
          if match:
              # Return the version string
              return match.group(1)
          else:
              raise RuntimeError("Could not find HIP version in the output")
      def get_nvcc_cuda_version(cuda_dir: str) -> Version:
          """Get the CUDA version from nvcc.
          Adapted from https://github.com/NVIDIA/apex/blob/8b7a1ff183741dd8f9b87e7bafd04cfde99cea28/setup.py
          """
          nvcc_output = subprocess.check_output([cuda_dir + "/bin/nvcc", "-V"],
                                                universal_newlines=True)
          output = nvcc_output.split()
          release_idx = output.index("release") + 1
          nvcc_cuda_version = parse(output[release_idx].split(",")[0])
          return nvcc_cuda_version
      def get_pytorch_rocm_arch() -> Set[str]:
          """Get the cross section of Pytorch,and vllm supported gfx arches
          ROCM can get the supported gfx architectures in one of two ways
          Either through the PYTORCH_ROCM_ARCH env var, or output from
          rocm_agent_enumerator.
          In either case we can generate a list of supported arch's and
          cross reference with VLLM's own ROCM_SUPPORTED_ARCHs.
          """
          env_arch_list = os.environ.get("PYTORCH_ROCM_ARCH", None)
          # If we don't have PYTORCH_ROCM_ARCH specified pull the list from rocm_agent_enumerator
          if env_arch_list is None:
              command = "rocm_agent_enumerator"
              env_arch_list = subprocess.check_output([command]).decode('utf-8')\
                              .strip().replace("\n", ";")
              arch_source_str = "rocm_agent_enumerator"
          else:
              arch_source_str = "PYTORCH_ROCM_ARCH env variable"
          # List are separated by ; or space.
          pytorch_rocm_arch = set(env_arch_list.replace(" ", ";").split(";"))
          # Filter out the invalid architectures and print a warning.
          arch_list = pytorch_rocm_arch.intersection(ROCM_SUPPORTED_ARCHS)
          # If none of the specified architectures are valid, raise an error.
          if not arch_list:
              raise RuntimeError(
                  f"None of the ROCM architectures in {arch_source_str} "
                  f"({env_arch_list}) is supported. "
                  f"Supported ROCM architectures are: {ROCM_SUPPORTED_ARCHS}.")
          invalid_arch_list = pytorch_rocm_arch - ROCM_SUPPORTED_ARCHS
          if invalid_arch_list:
              warnings.warn(
                  f"Unsupported ROCM architectures ({invalid_arch_list}) are "
                  f"excluded from the {arch_source_str} output "
                  f"({env_arch_list}). Supported ROCM architectures are: "
                  f"{ROCM_SUPPORTED_ARCHS}.",
                  stacklevel=2)
          return arch_list
      def get_torch_arch_list() -> Set[str]:
          # TORCH_CUDA_ARCH_LIST can have one or more architectures,
          # e.g. "8.0" or "7.5,8.0,8.6+PTX". Here, the "8.6+PTX" option asks the
          # compiler to additionally include PTX code that can be runtime-compiled
          # and executed on the 8.6 or newer architectures. While the PTX code will
          # not give the best performance on the newer architectures, it provides
          # forward compatibility.
          env_arch_list = os.environ.get("TORCH_CUDA_ARCH_LIST", None)
          if env_arch_list is None:
              return set()
          # List are separated by ; or space.
          torch_arch_list = set(env_arch_list.replace(" ", ";").split(";"))
          if not torch_arch_list:
              return set()
          # Filter out the invalid architectures and print a warning.
          valid_archs = NVIDIA_SUPPORTED_ARCHS.union(
              {s + "+PTX"
               for s in NVIDIA_SUPPORTED_ARCHS})
          arch_list = torch_arch_list.intersection(valid_archs)
          # If none of the specified architectures are valid, raise an error.
          if not arch_list:
              raise RuntimeError(
                  "None of the CUDA architectures in `TORCH_CUDA_ARCH_LIST` env "
                  f"variable ({env_arch_list}) is supported. "
                  f"Supported CUDA architectures are: {valid_archs}.")
          invalid_arch_list = torch_arch_list - valid_archs
          if invalid_arch_list:
              warnings.warn(
                  f"Unsupported CUDA architectures ({invalid_arch_list}) are "
                  "excluded from the `TORCH_CUDA_ARCH_LIST` env variable "
                  f"({env_arch_list}). Supported CUDA architectures are: "
                  f"{valid_archs}.",
                  stacklevel=2)
          return arch_list
      if _is_hip():
          rocm_arches = get_pytorch_rocm_arch()
          NVCC_FLAGS += ["--offload-arch=" + arch for arch in rocm_arches]
      else:
          # First, check the TORCH_CUDA_ARCH_LIST environment variable.
          compute_capabilities = get_torch_arch_list()
      if _is_cuda() and not compute_capabilities:
          # If TORCH_CUDA_ARCH_LIST is not defined or empty, target all available
          # GPUs on the current machine.
          device_count = torch.cuda.device_count()
          for i in range(device_count):
              major, minor = torch.cuda.get_device_capability(i)
              if major < 7:
                  raise RuntimeError(
                      "GPUs with compute capability below 7.0 are not supported.")
              compute_capabilities.add(f"{major}.{minor}")
      ext_modules = []
      if _is_cuda():
          nvcc_cuda_version = get_nvcc_cuda_version(CUDA_HOME)
          if not compute_capabilities:
              # If no GPU is specified nor available, add all supported architectures
              # based on the NVCC CUDA version.
              compute_capabilities = NVIDIA_SUPPORTED_ARCHS.copy()
              if nvcc_cuda_version < Version("11.1"):
                  compute_capabilities.remove("8.6")
              if nvcc_cuda_version < Version("11.8"):
                  compute_capabilities.remove("8.9")
                  compute_capabilities.remove("9.0")
          # Validate the NVCC CUDA version.
          if nvcc_cuda_version < Version("11.0"):
              raise RuntimeError(
                  "CUDA 11.0 or higher is required to build the package.")
          if (nvcc_cuda_version < Version("11.1")
                  and any(cc.startswith("8.6") for cc in compute_capabilities)):
              raise RuntimeError(
                  "CUDA 11.1 or higher is required for compute capability 8.6.")
          if nvcc_cuda_version < Version("11.8"):
              if any(cc.startswith("8.9") for cc in compute_capabilities):
                  # CUDA 11.8 is required to generate the code targeting compute capability 8.9.
                  # However, GPUs with compute capability 8.9 can also run the code generated by
                  # the previous versions of CUDA 11 and targeting compute capability 8.0.
                  # Therefore, if CUDA 11.8 is not available, we target compute capability 8.0
                  # instead of 8.9.
                  warnings.warn(
                      "CUDA 11.8 or higher is required for compute capability 8.9. "
                      "Targeting compute capability 8.0 instead.",
                      stacklevel=2)
                  compute_capabilities = set(cc for cc in compute_capabilities
                                             if not cc.startswith("8.9"))
                  compute_capabilities.add("8.0+PTX")
              if any(cc.startswith("9.0") for cc in compute_capabilities):
                  raise RuntimeError(
                      "CUDA 11.8 or higher is required for compute capability 9.0.")
          NVCC_FLAGS_PUNICA = NVCC_FLAGS.copy()
          # Add target compute capabilities to NVCC flags.
          for capability in compute_capabilities:
              num = capability[0] + capability[2]
              NVCC_FLAGS += ["-gencode", f"arch=compute_{num},code=sm_{num}"]
              if capability.endswith("+PTX"):
                  NVCC_FLAGS += [
                      "-gencode", f"arch=compute_{num},code=compute_{num}"
                  ]
              if int(capability[0]) >= 8:
                  NVCC_FLAGS_PUNICA += [
                      "-gencode", f"arch=compute_{num},code=sm_{num}"
                  ]
                  if capability.endswith("+PTX"):
                      NVCC_FLAGS_PUNICA += [
                          "-gencode", f"arch=compute_{num},code=compute_{num}"
                      ]
          # Use NVCC threads to parallelize the build.
          if nvcc_cuda_version >= Version("11.2"):
              nvcc_threads = int(os.getenv("NVCC_THREADS", 8))
              num_threads = min(os.cpu_count(), nvcc_threads)
              NVCC_FLAGS += ["--threads", str(num_threads)]
          if nvcc_cuda_version >= Version("11.8"):
              NVCC_FLAGS += ["-DENABLE_FP8_E5M2"]
          # changes for punica kernels
          NVCC_FLAGS += torch_cpp_ext.COMMON_NVCC_FLAGS
          REMOVE_NVCC_FLAGS = [
              '-D__CUDA_NO_HALF_OPERATORS__',
              '-D__CUDA_NO_HALF_CONVERSIONS__',
              '-D__CUDA_NO_BFLOAT16_CONVERSIONS__',
              '-D__CUDA_NO_HALF2_OPERATORS__',
          ]
          for flag in REMOVE_NVCC_FLAGS:
              with contextlib.suppress(ValueError):
                  torch_cpp_ext.COMMON_NVCC_FLAGS.remove(flag)
          install_punica = bool(int(os.getenv("VLLM_INSTALL_PUNICA_KERNELS", "0")))
          device_count = torch.cuda.device_count()
          for i in range(device_count):
              major, minor = torch.cuda.get_device_capability(i)
              if major < 8:
                  install_punica = False
                  break
          if install_punica:
              ext_modules.append(
                  CUDAExtension(
                      name="vllm._punica_C",
                      sources=["csrc/punica/punica_ops.cc"] +
                      glob("csrc/punica/bgmv/*.cu"),
                      extra_compile_args={
                          "cxx": CXX_FLAGS,
                          "nvcc": NVCC_FLAGS_PUNICA,
                      },
                  ))
      elif _is_neuron():
          neuronxcc_version = get_neuronxcc_version()
      vllm_extension_sources = [
      ]
      if _is_cuda():
          vllm_extension_sources.append("csrc/quantization/awq/gemm_kernels.cu")
          vllm_extension_sources.append(
              "csrc/quantization/marlin/marlin_cuda_kernel.cu")
          vllm_extension_sources.append("csrc/custom_all_reduce.cu")
          # Add MoE kernels.
          ext_modules.append(
              CUDAExtension(
                  name="vllm._moe_C",
                  sources=glob("csrc/moe/*.cu") + glob("csrc/moe/*.cpp"),
                  extra_compile_args={
                      "cxx": CXX_FLAGS,
                      "nvcc": NVCC_FLAGS,
                  },
              ))
      """
      if not _is_neuron():
          vllm_extension = CUDAExtension(
              name="vllm._C",
              sources=vllm_extension_sources,
              extra_compile_args={
                  "cxx": CXX_FLAGS,
                  "nvcc": NVCC_FLAGS,
              },
              libraries=["cuda"] if _is_cuda() else [],
          )
          ext_modules.append(vllm_extension)
      """
      def get_path(*filepath) -> str:
          return os.path.join(ROOT_DIR, *filepath)
      def find_version(filepath: str) -> str:
          """Extract version information from the given filepath.
          Adapted from https://github.com/ray-project/ray/blob/0b190ee1160eeca9796bc091e07eaebf4c85b511/python/setup.py
          """
          with open(filepath) as fp:
              version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
                                        fp.read(), re.M)
              if version_match:
                  return version_match.group(1)
              raise RuntimeError("Unable to find version string.")
      def get_vllm_version() -> str:
          version = find_version(get_path("vllm", "__init__.py"))
          return version
          if _is_hip():
              # Get the HIP version
              hipcc_version = get_hipcc_rocm_version()
              if hipcc_version != MAIN_CUDA_VERSION:
                  rocm_version_str = hipcc_version.replace(".", "")[:3]
                  version += f"+rocm{rocm_version_str}"
          elif _is_neuron():
              # Get the Neuron version
              neuron_version = str(neuronxcc_version)
              if neuron_version != MAIN_CUDA_VERSION:
                  neuron_version_str = neuron_version.replace(".", "")[:3]
                  version += f"+neuron{neuron_version_str}"
          else:
              cuda_version = str(nvcc_cuda_version)
              if cuda_version != MAIN_CUDA_VERSION:
                  cuda_version_str = cuda_version.replace(".", "")[:3]
                  version += f"+cu{cuda_version_str}"
          return version
      def read_readme() -> str:
          """Read the README file if present."""
          p = get_path("README.md")
          if os.path.isfile(p):
              return io.open(get_path("README.md"), "r", encoding="utf-8").read()
          else:
              return ""
      def get_requirements() -> List[str]:
          """Get Python package dependencies from requirements.txt."""
          if _is_hip():
              with open(get_path("requirements-rocm.txt")) as f:
                  requirements = f.read().strip().split("\n")
          elif _is_neuron():
              with open(get_path("requirements-neuron.txt")) as f:
                  requirements = f.read().strip().split("\n")
          else:
              with open(get_path("requirements.txt")) as f:
                  requirements = f.read().strip().split("\n")
          return requirements
      package_data = {
          "vllm": ["py.typed", "model_executor/layers/fused_moe/configs/*.json"]
      }
      if os.environ.get("VLLM_USE_PRECOMPILED"):
          ext_modules = []
          package_data["vllm"].append("*.so")
      setuptools.setup(
          name="vllm",
          version=get_vllm_version(),
          author="vLLM Team",
          license="Apache 2.0",
          description=("A high-throughput and memory-efficient inference and "
                       "serving engine for LLMs"),
          long_description=read_readme(),
          long_description_content_type="text/markdown",
          url="https://github.com/vllm-project/vllm",
          project_urls={
              "Homepage": "https://github.com/vllm-project/vllm",
              "Documentation": "https://vllm.readthedocs.io/en/latest/",
          },
          classifiers=[
              "Programming Language :: Python :: 3.8",
              "Programming Language :: Python :: 3.9",
              "Programming Language :: Python :: 3.10",
              "Programming Language :: Python :: 3.11",
              "License :: OSI Approved :: Apache Software License",
              "Topic :: Scientific/Engineering :: Artificial Intelligence",
          ],
          packages=setuptools.find_packages(exclude=("benchmarks", "csrc", "docs",
                                                     "examples", "tests")),
          python_requires=">=3.8",
          install_requires=get_requirements(),
          ext_modules=ext_modules,
          cmdclass={"build_ext": BuildExtension} if not _is_neuron() else {},
          package_data=package_data,
      )
      
  2. ./examples
    1. ./examples/start_server.sh:启动在线服务示例。
      #!/bin/bash
      # ATB加速库环境变量
      export ATB_LAYER_INTERNAL_TENSOR_REUSE=1
      export ATB_WORKSPACE_MEM_ALLOC_GLOBAL=1
      export ATB_OPERATION_EXECUTE_ASYNC=1
      export TASK_QUEUE_ENABLE=1
      export ATB_CONTENT_NCHW_TO_ND=1
      export ATB_CONTEXT_WORKSPACE_RING=1
      export HCCL_BUFFSIZE=120
      export LCCL_DETERMINISTIC=1
      export HCCL_DETERMINISTIC=1
      export ATB_OPSRUNNER_KERNEL_CACHE_LOCAL_COUNT=8
      export ATB_OPSRUNNER_KERNEL_CACHE_GLOABL_COUNT=16
      export ATB_LAUNCH_KERNEL_WITH_TILING=0
      export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors
      python -m vllm.entrypoints.openai.api_server --model=facebook/opt-125m --trust-remote-code --enforce-eager --worker-use-ray
    2. ./examples/test_offline.py:启动离线推理示例。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      from vllm import LLM, SamplingParams
      import json
      import argparse
      parser = argparse.ArgumentParser()
      parser.add_argument('--model_path', type=str, default="facebook/opt-125m")
      # input prompts for test
      prompts = [
          "Hello, my name is",
          "The president of the United States is",
          "The capital of France is",
          "The future of AI is",
      ]
      sampling_params = SamplingParams(max_tokens=512, temperature=0)
      args = parser.parse_args()
      model_path = args.model_path
      llm = LLM(model=model_path,
                max_model_len=4096, # max length of prompt
                tensor_parallel_size=8, # number of NPUs to be used
                max_num_seqs=256, # max batch number
                enforce_eager=True, # disable CUDA graph mode
                trust_remote_code=True, # If the model is a custom model not yet available in the HuggingFace transformers library
      )
      outputs = llm.generate(prompts, sampling_params)
      for i, output in enumerate(outputs):
          prompt = output.prompt
          generated_text = output.outputs[0].text
          print(f"req_num: {i}\nPrompt: {prompt!r}\nGenerated text: {generated_text!r}")
      
    3. ./examples/test_offline.sh:启动离线推理示例。
      #!/bin/bash
      # ATB加速库环境变量
      export ATB_LAYER_INTERNAL_TENSOR_REUSE=1
      export ATB_WORKSPACE_MEM_ALLOC_GLOBAL=1
      export ATB_OPERATION_EXECUTE_ASYNC=1
      export TASK_QUEUE_ENABLE=1
      export ATB_CONTENT_NCHW_TO_ND=1
      export ATB_CONTEXT_WORKSPACE_RING=1
      export HCCL_BUFFSIZE=120
      export LCCL_DETERMINISTIC=1
      export HCCL_DETERMINISTIC=true
      export ATB_OPSRUNNER_KERNEL_CACHE_LOCAL_COUNT=8
      export ATB_OPSRUNNER_KERNEL_CACHE_GLOABL_COUNT=16
      export ATB_LAUNCH_KERNEL_WITH_TILING=0
      export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors
      python3 test_offline.py --model_path facebook/opt-125m
  3. ./vllm_npu/README.md:占位文件。
    # vllm_npu
  4. ./vllm_npu/tests
    1. ./vllm_npu/tests/models/test_models.py:测试权重加载与推理功能。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      
      import pytest
      import torch
      from pytest_mock import MockerFixture
      from vllm.model_executor.input_metadata import InputMetadata
      from vllm_npu.model_executor.models.ascend.mindie_llm_wrapper import MindIELlmWrapper
      
      
      @pytest.mark.parametrize("load_format", ["auto", "safetensors", "pt"])
      def test_load_weights(mocker: MockerFixture, load_format: str) -> None:
          mocker.patch("torch.get_default_dtype", return_value=torch.float32)
          mocker.patch("torch.set_default_dtype")
          mock_generator_torch = mocker.patch("vllm_npu.model_executor.models.ascend.mindie_llm_wrapper.GeneratorTorch")
          mock_sampler = mocker.patch("vllm_npu.model_executor.models.ascend.mindie_llm_wrapper.Sampler")
      
          mindie_llm_wrapper = MindIELlmWrapper(mocker.ANY)
      
          # Testing supported formats, raising inside
          mindie_llm_wrapper.load_weights(load_format)
      
          # Testing GeneratorTorch and Sampler call counts
          mock_generator_torch.assert_called_once()
          mock_sampler.assert_called_once()
      
          # Testing unsupported format
          try:
              mindie_llm_wrapper.load_weights("unsupported")
          except ValueError as e:
              if "load-format support [safetensors, pt]" not in str(e):
                  raise AssertionError("Error message does not match expected text for unsupported formats") from e
          else:
              raise AssertionError("Expected a ValueError for unsupported format, but none was raised")
      
      
      @pytest.mark.parametrize("with_kv_cache", [True, False])
      @pytest.mark.parametrize("is_prompt", [True, False])
      def test_forward(mocker: MockerFixture, with_kv_cache: bool, is_prompt: bool) -> None:
          # with_kv_cache is only for profile_run
          if not with_kv_cache and not is_prompt:
              return
      
          mocker.patch("torch.get_default_dtype", return_value=torch.float32)
          mocker.patch("torch.set_default_dtype")
          mocker.patch("vllm_npu.model_executor.models.ascend.mindie_llm_wrapper.GeneratorTorch")
          mocker.patch("vllm_npu.model_executor.models.ascend.mindie_llm_wrapper.Sampler")
          wrapper = MindIELlmWrapper(mocker.ANY)
          wrapper.load_weights()
          mock_forward_tensor = mocker.patch.object(
              wrapper.mindie_model, "forward_tensor", return_value=torch.tensor([[0.1, 0.2, 0.3]])
          )
      
          input_ids = torch.tensor([[1, 2, 3]])
          positions = torch.tensor([[0, 1, 2]])
          kv_caches = [(torch.tensor([0]), torch.tensor([0]))] if with_kv_cache else [(None, None)]
          lora_requests = [None]
      
          mock_input_metadata = mocker.MagicMock(spec=InputMetadata)
          mock_input_metadata.is_prompt = is_prompt
          mock_input_metadata.prompt_lens = torch.tensor([3], dtype=torch.int32)
          mock_input_metadata.context_lens = torch.tensor([3], dtype=torch.int32)
          mock_input_metadata.max_context_len = 3
          mock_input_metadata.block_tables = torch.tensor([[0]], dtype=torch.int32, device="npu")
          mock_input_metadata.slot_mapping = torch.tensor([0, 1, 2], dtype=torch.int64)
      
          wrapper.forward(input_ids, positions, kv_caches, mock_input_metadata, lora_requests)
      
          if is_prompt:
              expected_lengths = mock_input_metadata.prompt_lens.to(torch.int32)
              expected_max_seq_len = int(mock_input_metadata.prompt_lens.max())
              expected_lm_head_indices = (mock_input_metadata.prompt_lens.cumsum(dim=-1) - 1).to(torch.int64)
          else:
              expected_lengths = mock_input_metadata.context_lens
              expected_max_seq_len = mock_input_metadata.max_context_len
              expected_lm_head_indices = None
      
          if with_kv_cache:
              expected_kv_caches = kv_caches
              expected_block_tables = mock_input_metadata.block_tables
              expected_slots = mock_input_metadata.slot_mapping
          else:
              _, expected_block_tables, expected_slots = wrapper.create_dummy_kv_cache(mock_input_metadata, input_ids)
              # Hard to compare this object
              expected_kv_caches = mocker.ANY
      
          expected_adapter_ids = [None]
      
          mock_forward_tensor.assert_called_once_with(
              input_ids,
              positions,
              is_prompt,
              expected_kv_caches,
              expected_block_tables,
              expected_slots,
              expected_lengths,
              expected_max_seq_len,
              expected_lm_head_indices,
              adapter_ids=expected_adapter_ids,
          )
      
    2. ./vllm_npu/tests/sampler/test_sampler.py:测试后处理功能。
        1
        2
        3
        4
        5
        6
        7
        8
        9
       10
       11
       12
       13
       14
       15
       16
       17
       18
       19
       20
       21
       22
       23
       24
       25
       26
       27
       28
       29
       30
       31
       32
       33
       34
       35
       36
       37
       38
       39
       40
       41
       42
       43
       44
       45
       46
       47
       48
       49
       50
       51
       52
       53
       54
       55
       56
       57
       58
       59
       60
       61
       62
       63
       64
       65
       66
       67
       68
       69
       70
       71
       72
       73
       74
       75
       76
       77
       78
       79
       80
       81
       82
       83
       84
       85
       86
       87
       88
       89
       90
       91
       92
       93
       94
       95
       96
       97
       98
       99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163
      164
      165
      166
      167
      168
      169
      170
      171
      172
      173
      174
      175
      176
      177
      178
      179
      180
      181
      182
      183
      184
      185
      186
      187
      188
      189
      190
      191
      192
      193
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      
      import random
      from typing import Union
      
      import numpy as np
      import pytest
      import torch
      from mindie_llm.text_generator.utils.sampling_metadata import (
          DoSampleMetadata,
          PenaltyMetadata,
          SamplingData,
          SamplingParam,
          SeedMetadata,
          TopKMetadata,
          TopPMetadata,
      )
      from pytest_mock import MockerFixture
      from vllm.sampling_params import _SAMPLING_EPS
      from vllm.sequence import SamplingParams, SequenceData, SequenceGroupMetadata
      from vllm.worker.model_runner import ModelRunner
      from vllm_npu.model_executor.layers.sampler import Sampler
      
      torch.npu.set_device("npu")
      
      
      def _prepare_test(
          batch_size: int, is_prompt: bool = True, random_seq_length: bool = False, temprature: Union[int, float] = 0
      ) -> tuple[torch.Tensor, Sampler]:
          torch.npu.set_device("npu")
          sampler = Sampler(None)
      
          model_runner = ModelRunner(None, None, None, None, None)
          model_runner.set_block_size(128)
          model_runner.sliding_window = None
      
          seq_group_metadata_list = []
          prompt_lens = []
          for i in range(batch_size):
              token_ids = list(range(random.randint(1, 128))) if random_seq_length else [1, 2, 3]
              seq_group_metadata_list.append(
                  SequenceGroupMetadata(
                      request_id=f"test_{i}",
                      is_prompt=is_prompt,
                      seq_data={0: SequenceData(token_ids)},
                      sampling_params=SamplingParams(temperature=temprature),
                      block_tables={0: [1]},
                  )
              )
              prompt_lens.append(seq_group_metadata_list[-1].seq_data[0].get_len())
      
          sampling_metadata = model_runner._prepare_sample(seq_group_metadata_list, prompt_lens, prompt_lens)
      
          return sampling_metadata, sampler
      
      
      @pytest.mark.parametrize("vocab_size", [32000])
      @pytest.mark.parametrize("dtype", [torch.bfloat16])
      def test_forward(mocker: MockerFixture, vocab_size: int, dtype: torch.dtype):
          batch_size = random.randint(1, 256)
          sampling_metadata, sampler = _prepare_test(batch_size)
      
          fake_logits = torch.full((batch_size, vocab_size), 1e-1, dtype=dtype)
          fake_tokens = np.arange(batch_size)
          fake_next_tokens = np.random.randint(0, 65536, size=len(sampling_metadata.seq_groups))
          fake_sample_results = [([i], [0]) for i in range(batch_size)]
      
          deconstructor = sampler.get_sample_data_and_param
          reconstructor = sampler.get_sample_results
      
          mock_process = mocker.patch(
              "vllm_npu.model_executor.layers.sampler._apply_logits_processors", return_value=fake_logits
          )
          mock_atb_model = mocker.patch.object(sampler, "atb_model")
          mock_deconstructor = mocker.patch.object(
              sampler, "get_sample_data_and_param", side_effect=lambda *args, **kwargs: deconstructor(*args, **kwargs)
          )
          mock_backend_sample = mocker.patch.object(mock_atb_model, "sample",return_value=(fake_next_tokens, fake_logits))
          mock_reconstructor = mocker.patch.object(
              sampler, "get_sample_results", side_effect=lambda *args, **kwargs: reconstructor(*args, **kwargs)
          )
          mock_logprobs = mocker.patch(
              "vllm_npu.model_executor.layers.sampler._get_logprobs", return_value=(mocker.ANY, mocker.ANY)
          )
          mocker.patch("vllm_npu.model_executor.layers.sampler._build_sampler_output")
      
          sampler.forward(fake_logits, sampling_metadata)
          mock_backend_sample.assert_called_once_with(fake_logits, mocker.ANY, mocker.ANY)
      
      
      
      @pytest.mark.parametrize("temperature", [0, 0.1, 1, 10])
      @pytest.mark.parametrize("vocab_size", [32000])
      def test_sample_data_and_param(temperature, vocab_size):
          batch_size = random.randint(1, 256)
          sampling_metadata, sampler = _prepare_test(batch_size, temprature=temperature)
      
          sampling_data, sampling_param = sampler.get_sample_data_and_param(sampling_metadata, vocab_size)
      
          # Sampling data
          if not isinstance(sampling_data, SamplingData):
              raise TypeError("The returned object is not an instance of SamplingData.")
      
          if not hasattr(sampling_data, "all_input_ids"):
              raise AttributeError("SamplingData does not have the 'all_input_ids' attribute.")
          if not hasattr(sampling_data, "output_ids"):
              raise AttributeError("SamplingData does not have the 'output_ids' attribute.")
      
          if not isinstance(sampling_data.all_input_ids, torch.Tensor):
              raise TypeError("all_input_ids should be a tensor.")
          if not isinstance(sampling_data.output_ids, torch.Tensor):
              raise TypeError("output_ids should be a tensor.")
          if sampling_data.all_input_ids.dtype != torch.int32:
              raise ValueError("The dtype of all_input_ids should be int32.")
          if sampling_data.output_ids.dtype != torch.int32:
              raise ValueError("The dtype of output_ids should be int32.")
      
          # Sampling param
          if temperature < _SAMPLING_EPS:
              if sampling_param is not None:
                  raise ValueError("Sampling data should be None in greedy mode.")
          else:
              if not isinstance(sampling_param, SamplingParam):
                  raise TypeError("The returned object is not an instance of SamplingParam.")
      
              if not hasattr(sampling_param, "penalty_meta"):
                  raise AttributeError("SamplingParam does not have the 'penalty_meta' attribute.")
              if not hasattr(sampling_param, "temperature"):
                  raise AttributeError("SamplingParam does not have the 'temperature' attribute.")
              if not hasattr(sampling_param, "top_k_meta"):
                  raise AttributeError("SamplingParam does not have the 'top_k_meta' attribute.")
              if not hasattr(sampling_param, "top_p_meta"):
                  raise AttributeError("SamplingParam does not have the 'top_p_meta' attribute.")
              if not hasattr(sampling_param, "seed_meta"):
                  raise AttributeError("SamplingParam does not have the 'seed_meta' attribute.")
              if not hasattr(sampling_param, "do_sample_meta"):
                  raise AttributeError("SamplingParam does not have the 'do_sample_meta' attribute.")
      
              if not isinstance(sampling_param.penalty_meta, PenaltyMetadata):
                  raise TypeError("penalty_meta should be an instance of PenaltyMetadata.")
              if temperature != 1:
                  if not isinstance(sampling_param.temperature, torch.Tensor):
                      raise TypeError("Temperature should be a tensor.")
              else:
                  if sampling_param.temperature is not None:
                      raise ValueError("Temperature tensors should be None.")
              if not isinstance(sampling_param.top_k_meta, TopKMetadata):
                  raise TypeError("top_k_meta should be an instance of TopKMetadata.")
              if not isinstance(sampling_param.top_p_meta, TopPMetadata):
                  raise TypeError("top_p_meta should be an instance of TopPMetadata.")
              if not isinstance(sampling_param.seed_meta, SeedMetadata):
                  raise TypeError("seed_meta should be an instance of SeedMetadata.")
              if not isinstance(sampling_param.do_sample_meta, DoSampleMetadata):
                  raise TypeError("do_sample_meta should be an instance of DoSampleMetadata.")
      
      
      def test_sample_results_reconstruction():
          batch_size = random.randint(1, 256)
          sampling_metadata, sampler = _prepare_test(batch_size)
      
          samples = np.arange(batch_size)
          sample_results = sampler.get_sample_results(sampling_metadata, samples)
          expected_sample_results = [([i], [0]) for i in range(batch_size)]
          if sample_results != expected_sample_results:
              raise ValueError(f"Sample results mismatch: Expected {expected_sample_results}, got {sample_results}.")
      
          selected_seq_groups = [([0], SamplingParams()), ([1, 2, 3], SamplingParams()), ([4, 5], SamplingParams())]
          samples = np.array([101, 102, 103, 104, 105, 106])
          results = sampler.extract_next_tokens_and_parents(selected_seq_groups, samples)
          expected_results = [([101], [0]), ([102], [0, 1, 2]), ([105], [0, 1])]
          if results != expected_results:
              raise ValueError(f"Results mismatch: Expected {expected_results}, got {results}.")
      
      
      @pytest.mark.parametrize("is_prompt", [True, False])
      @pytest.mark.parametrize("vocab_size", [32000])
      def test_token_padding(is_prompt, vocab_size):
          batch_size = random.randint(1, 256)
          sampling_metadata, sampler = _prepare_test(batch_size, is_prompt, True)
      
          sampling_data, _ = sampler.get_sample_data_and_param(sampling_metadata, vocab_size)
      
          # Note that if padding fails, then sampling_data cannot be constructed.
          # So we just check some basic shape info here.
          if sampling_data.all_input_ids.shape[0] != batch_size:
              raise ValueError(
                  f"Expected all_input_ids to have shape[0] of {batch_size}, but got {sampling_data.all_input_ids.shape[0]}."
              )
      
          if sampling_data.output_ids.shape[0] != batch_size:
              raise ValueError(
                  f"Expected output_ids to have shape[0] of {batch_size}, but got {sampling_data.output_ids.shape[0]}."
              )
      
  5. ./vllm_npu/vllm_npu/core
    1. ./vllm_npu/vllm_npu/core/__init__.py:core部分的函数替换。
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      from vllm_npu.core.scheduler import _schedule
      from vllm.core.scheduler import Scheduler
      Scheduler._schedule = _schedule
    2. ./vllm_npu/vllm_npu/core/scheduler.py:调度模块。
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of code in this file was copied from project [vLLM Team][vllm]
      import time
      from collections import deque
      from typing import Deque, Dict, List
      from vllm.core.block_manager import AllocStatus
      from vllm.core.scheduler import SchedulerOutputs, logger
      from vllm.sequence import SequenceGroup, SequenceStatus
      def _schedule(self) -> SchedulerOutputs:
          # Blocks that need to be swapped or copied before model execution.
          blocks_to_swap_in: Dict[int, int] = {}
          blocks_to_swap_out: Dict[int, int] = {}
          blocks_to_copy: Dict[int, List[int]] = {}
          # Fix the current time.
          now = time.monotonic()
          # Join waiting sequences if possible.
          if not self.swapped:
              ignored_seq_groups: List[SequenceGroup] = []
              scheduled: List[SequenceGroup] = []
              # The total number of sequences on the fly, including the
              # requests in the generation phase.
              num_curr_seqs = sum(seq_group.get_max_num_running_seqs() for seq_group in self.running)
              num_batched_tokens = 0
              curr_loras = set(seq_group.lora_int_id for seq_group in self.running) if self.lora_enabled else None
              # Optimization: We do not sort the waiting queue since the preempted
              # sequence groups are added to the front and the new sequence groups
              # are added to the back.
              leftover_waiting_sequences = deque()
              while self.waiting:
                  seq_group = self.waiting[0]
                  waiting_seqs = seq_group.get_seqs(status=SequenceStatus.WAITING)
                  assert len(waiting_seqs) == 1, "Waiting sequence group should have only one prompt " "sequence."
                  num_prompt_tokens = waiting_seqs[0].get_len()
                  if num_prompt_tokens > self.prompt_limit:
                      logger.warning(
                          f"Input prompt ({num_prompt_tokens} tokens) is too long"
                          f" and exceeds limit of {self.prompt_limit}"
                      )
                      for seq in waiting_seqs:
                          seq.status = SequenceStatus.FINISHED_IGNORED
                      ignored_seq_groups.append(seq_group)
                      self.waiting.popleft()
                      continue
                  # If the sequence group cannot be allocated, stop.
                  can_allocate = self.block_manager.can_allocate(seq_group)
                  if can_allocate == AllocStatus.LATER:
                      break
                  elif can_allocate == AllocStatus.NEVER:
                      logger.warning(
                          f"Input prompt ({num_prompt_tokens} tokens) is too long"
                          f" and exceeds the capacity of block_manager"
                      )
                      for seq in waiting_seqs:
                          seq.status = SequenceStatus.FINISHED_IGNORED
                      ignored_seq_groups.append(seq_group)
                      self.waiting.popleft()
                      continue
                  lora_int_id = 0
                  if self.lora_enabled:
                      lora_int_id = seq_group.lora_int_id
                      if lora_int_id > 0 and lora_int_id not in curr_loras and len(curr_loras) >= self.lora_config.max_loras:
                          # We don't have a space for another LoRA, so
                          # we ignore this request for now.
                          leftover_waiting_sequences.appendleft(seq_group)
                          self.waiting.popleft()
                          continue
                  # If the number of batched tokens exceeds the limit, stop.
                  if num_batched_tokens + num_prompt_tokens > self.scheduler_config.max_num_batched_tokens:
                      break
                  # The total number of sequences in the RUNNING state should not
                  # exceed the maximum number of sequences.
                  num_new_seqs = seq_group.get_max_num_running_seqs()
                  if num_curr_seqs + num_new_seqs > self.scheduler_config.max_num_seqs:
                      break
                  if lora_int_id > 0:
                      curr_loras.add(lora_int_id)
                  self.waiting.popleft()
                  self._allocate(seq_group)
                  self.running.append(seq_group)
                  num_batched_tokens += num_prompt_tokens
                  num_curr_seqs += num_new_seqs
                  scheduled.append(seq_group)
              self.waiting.extendleft(leftover_waiting_sequences)
              if scheduled or ignored_seq_groups:
                  scheduler_outputs = SchedulerOutputs(
                      scheduled_seq_groups=scheduled,
                      prompt_run=True,
                      num_batched_tokens=num_batched_tokens,
                      blocks_to_swap_in=blocks_to_swap_in,
                      blocks_to_swap_out=blocks_to_swap_out,
                      blocks_to_copy=blocks_to_copy,
                      ignored_seq_groups=ignored_seq_groups,
                  )
                  return scheduler_outputs
          # NOTE(woosuk): Preemption happens only when there is no available slot
          # to keep all the sequence groups in the RUNNING state.
          # In this case, the policy is responsible for deciding which sequence
          # groups to preempt.
          self.running = self.policy.sort_by_priority(now, self.running)
          # Reserve new token slots for the running sequence groups.
          running: Deque[SequenceGroup] = deque()
          preempted: List[SequenceGroup] = []
          while self.running:
              seq_group = self.running.popleft()
              while not self.block_manager.can_append_slot(seq_group):
                  if self.running:
                      # Preempt the lowest-priority sequence groups.
                      victim_seq_group = self.running.pop()
                      self._preempt(victim_seq_group, blocks_to_swap_out)
                      preempted.append(victim_seq_group)
                  else:
                      # No other sequence groups can be preempted.
                      # Preempt the current sequence group.
                      self._preempt(seq_group, blocks_to_swap_out)
                      preempted.append(seq_group)
                      break
              else:
                  # Append new slots to the sequence group.
                  self._append_slot(seq_group, blocks_to_copy)
                  running.append(seq_group)
          self.running = running
          # Swap in the sequence groups in the SWAPPED state if possible.
          self.swapped = self.policy.sort_by_priority(now, self.swapped)
          if not preempted:
              num_curr_seqs = sum(seq_group.get_max_num_running_seqs() for seq_group in self.running)
              curr_loras = set(seq_group.lora_int_id for seq_group in self.running) if self.lora_enabled else None
              leftover_swapped = deque()
              while self.swapped:
                  seq_group = self.swapped[0]
                  lora_int_id = 0
                  if self.lora_enabled:
                      lora_int_id = seq_group.lora_int_id
                      if lora_int_id > 0 and lora_int_id not in curr_loras and len(curr_loras) >= self.lora_config.max_loras:
                          # We don't have a space for another LoRA, so
                          # we ignore this request for now.
                          leftover_swapped.appendleft(seq_group)
                          self.swapped.popleft()
                          continue
                  # If the sequence group cannot be swapped in, stop.
                  if not self.block_manager.can_swap_in(seq_group):
                      break
                  # The total number of sequences in the RUNNING state should not
                  # exceed the maximum number of sequences.
                  num_new_seqs = seq_group.get_max_num_running_seqs()
                  if num_curr_seqs + num_new_seqs > self.scheduler_config.max_num_seqs:
                      break
                  if lora_int_id > 0:
                      curr_loras.add(lora_int_id)
                  self.swapped.popleft()
                  self._swap_in(seq_group, blocks_to_swap_in)
                  self._append_slot(seq_group, blocks_to_copy)
                  num_curr_seqs += num_new_seqs
                  self.running.append(seq_group)
              self.swapped.extendleft(leftover_swapped)
          # Each sequence in the generation phase only takes one token slot.
          # Therefore, the number of batched tokens is equal to the number of
          # sequences in the RUNNING state.
          num_batched_tokens = sum(seq_group.num_seqs(status=SequenceStatus.RUNNING) for seq_group in self.running)
          scheduler_outputs = SchedulerOutputs(
              scheduled_seq_groups=self.running,
              prompt_run=False,
              num_batched_tokens=num_batched_tokens,
              blocks_to_swap_in=blocks_to_swap_in,
              blocks_to_swap_out=blocks_to_swap_out,
              blocks_to_copy=blocks_to_copy,
              ignored_seq_groups=[],
          )
          return scheduler_outputs
  6. ./vllm_npu/vllm_npu/engine
    1. ./vllm_npu/vllm_npu/engine/__init__.py:engine模块的函数替换。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      from vllm_npu.config import DeviceConfig
      from vllm_npu.utils import get_ip
      from vllm_npu.engine.llm_engine import DEVICE_TO_WORKER_MODULE_MAP
      from vllm_npu.engine.ray_utils import initialize_cluster
      from vllm.engine import arg_utils, llm_engine, ray_utils
      arg_utils.DeviceConfig = DeviceConfig
      llm_engine.DEVICE_TO_WORKER_MODULE_MAP = DEVICE_TO_WORKER_MODULE_MAP
      llm_engine.get_ip = get_ip
      llm_engine.initialize_cluster = initialize_cluster
      ray_utils.get_ip = get_ip
      ray_utils.initialize_cluster = initialize_cluster
      
    2. ./vllm_npu/vllm_npu/engine/llm_engine.py:注册昇腾环境的worker。
      1
      2
      3
      4
      5
      6
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      DEVICE_TO_WORKER_MODULE_MAP = {
          "cuda": "vllm.worker.worker",
          "neuron": "vllm.worker.neuron_worker",
          "npu": "vllm_npu.worker.ascend_worker",
      }
      
    3. ./vllm_npu/vllm_npu/engine/ray_utils.py:ray模块相关修改。
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of code in this file was copied from project [vLLM Team][vllm]
      from typing import TYPE_CHECKING, Optional
      from vllm.config import ParallelConfig
      from vllm.logger import init_logger
      from vllm.utils import is_hip
      logger = init_logger(__name__)
      try:
          import ray
      except ImportError as e:
          logger.warning(
              f"Failed to import Ray with {e!r}. " "For distributed inference, please install Ray with " "`pip install ray`."
          )
          ray = None
      if TYPE_CHECKING:
          from ray.util.placement_group import PlacementGroup
      def initialize_cluster(
          parallel_config: ParallelConfig,
          engine_use_ray: bool = False,
          ray_address: Optional[str] = None,
      ) -> Optional["PlacementGroup"]:
          """Initialize the distributed cluster probably with Ray.
          Args:
              parallel_config: The configurations for parallel execution.
              engine_use_ray: Whether to use Ray for async engine.
              ray_address: The address of the Ray cluster. If None, uses
                  the default Ray cluster address.
          Returns:
              An optional `PlacementGroup`. It includes the specification
              of the resources for each distributed worker. None if Ray is
              not used.
          """
          if parallel_config.worker_use_ray or engine_use_ray:
              if ray is None:
                  raise ImportError("Ray is not installed. Please install Ray to use distributed " "serving.")
              # Connect to a ray cluster.
              if is_hip():
                  ray.init(address=ray_address, ignore_reinit_error=True, num_gpus=parallel_config.world_size)
              else:
                  """start adapt"""
                  ray.init(address=ray_address, ignore_reinit_error=True, num_gpus=parallel_config.world_size)
                  """end adapt"""
          if not parallel_config.worker_use_ray:
              assert parallel_config.world_size == 1, "Ray is required if parallel_config.world_size > 1."
              return None
          # Create placement group for worker processes
          current_placement_group = ray.util.get_current_placement_group()
          if current_placement_group:
              # We are in a placement group
              bundles = current_placement_group.bundle_specs
              # Verify that we can use the placement group.
              gpu_bundles = 0
              for bundle in bundles:
                  bundle_gpus = bundle.get("GPU", 0)
                  if bundle_gpus > 1:
                      raise ValueError("Placement group bundle cannot have more than 1 GPU.")
                  if bundle_gpus:
                      gpu_bundles += 1
              if parallel_config.world_size > gpu_bundles:
                  raise ValueError(
                      "The number of required GPUs exceeds the total number of " "available GPUs in the placement group."
                  )
          else:
              num_gpus_in_cluster = ray.cluster_resources().get("GPU", 0)
              if parallel_config.world_size > num_gpus_in_cluster:
                  raise ValueError(
                      "The number of required GPUs exceeds the total number of " "available GPUs in the cluster."
                  )
              # Create a new placement group
              placement_group_specs = [{"GPU": 1}] * parallel_config.world_size
              current_placement_group = ray.util.placement_group(placement_group_specs)
              # Wait until PG is ready - this will block until all
              # requested resources are available, and will timeout
              # if they cannot be provisioned.
              ray.get(current_placement_group.ready(), timeout=1800)
          return current_placement_group
  7. ./vllm_npu/vllm_npu/model_executor
    1. ./vllm_npu/vllm_npu/model_executor/ascend_model_loader.py:模型加载部分。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of code in this file was copied from project [vLLM Team][vllm]
      """Utilities for selecting and loading models."""
      import contextlib
      import torch
      import torch.nn as nn
      from vllm.config import DeviceConfig, ModelConfig
      from vllm.logger import init_logger
      from vllm.model_executor.weight_utils import initialize_dummy_weights
      from vllm_npu.model_executor.models.ascend.mindie_llm_wrapper import MindIELlmWrapper
      logger = init_logger(__name__)
      @contextlib.contextmanager
      def _set_default_torch_dtype(dtype: torch.dtype):
          """Sets the default torch dtype to the given dtype."""
          old_dtype = torch.get_default_dtype()
          torch.set_default_dtype(dtype)
          yield
          torch.set_default_dtype(old_dtype)
      def get_model(
          model_config: ModelConfig,
          device_config: DeviceConfig,
          **kwargs,
      ) -> nn.Module:
          """Get model with MindIELlmWrapper
          Args:
              model_config (ModelConfig): Model configs.
              device_config (DeviceConfig): Device configs.
              mindie_model_config (dict[str, Any]): MindIE configs.
          Raises:
              ValueError: LoRA not supported.
          Returns:
              nn.Module: Model.
          """
          if kwargs.get("lora_config"):
              logger.info(
                  "Using LoRA(s) with MindIE backend:\n"
                  "Please make sure your '--lora-modules' matches with your 'lora_adapter.json' in the model directory!\n"
                  "Current config for LoRA(s): %s",
                  kwargs.get("lora_config"),
              )
          with _set_default_torch_dtype(model_config.dtype):
              with torch.device(device_config.device):
                  model = MindIELlmWrapper(model_config.mindie_config)
              if model_config.load_format == "dummy":
                  initialize_dummy_weights(model)
              else:
                  model.load_weights(model_config.load_format)
              model = model.npu()
          model = model.eval()
          # For compatibility, move the config to the first hierarchy
          model.config = model.mindie_model.model_wrapper.model_runner.config
          return model
      
    2. ./vllm_npu/vllm_npu/model_executor/utils.py:注册昇腾环境的model_loader。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import importlib
      import torch
      from vllm.config import DeviceConfig, ModelConfig
      DEVICE_TO_MODEL_LOADER_MAP = {
          "cuda": "vllm.model_executor.model_loader",
          "neuron": "vllm.model_executor.neuron_model_loader",
          "npu": "vllm_npu.model_executor.ascend_model_loader",
      }
      def get_model(model_config: ModelConfig, device_config: DeviceConfig, **kwargs) -> torch.nn.Module:
          model_loader_module = DEVICE_TO_MODEL_LOADER_MAP[device_config.device_type]
          imported_model_loader = importlib.import_module(f"{model_loader_module}")
          get_model_fn = imported_model_loader.get_model
          return get_model_fn(model_config, device_config, **kwargs)
      
    3. ./vllm_npu/vllm_npu/model_executor/__init__.py:model_executor模块的函数替换。
      1
      2
      3
      4
      5
      6
      7
      8
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import vllm_npu.model_executor.ascend_model_loader
      import vllm_npu.model_executor.models.ascend.mindie_llm_wrapper
      from vllm_npu.model_executor.utils import DEVICE_TO_MODEL_LOADER_MAP, get_model
      from vllm.model_executor import utils
      from vllm import model_executor
      model_executor.get_model = get_model
      utils.DEVICE_TO_MODEL_LOADER_MAP = DEVICE_TO_MODEL_LOADER_MAP
      
    4. ./vllm_npu/vllm_npu/model_executor/layers/__init__.py:占位文件。
      1
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      
    5. ./vllm_npu/vllm_npu/model_executor/layers/sampler.py:后处理部分的实现。
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of codes in this file was copied from project [vLLM Team][vllm]
      from random import randint
      from typing import Dict, List, Optional, Tuple
      import numpy as np
      import torch
      import torch.nn as nn
      from mindie_llm.text_generator.adapter.generator_torch import GeneratorTorch
      from mindie_llm.text_generator.utils.sampling_metadata import SamplingData, SamplingParam
      from vllm.model_executor.layers.sampler import _apply_logits_processors, _build_sampler_output, _get_logprobs
      from vllm.model_executor.sampling_metadata import SamplingMetadata
      from vllm.sampling_params import SamplingParams, SamplingType
      from vllm.sequence import SamplerOutput
      _SAMPLING_EPS = 1e-5
      def _to_tensor(data, dtype=None):
          if dtype:
              return torch.tensor(data, dtype=dtype, device=torch.device("npu"))
          else:
              return torch.tensor(data, device=torch.device("npu"))
      class Sampler(nn.Module):
          """Adapts the vLLM sample function to the backend sample function in MindIE."""
          def __init__(self, atb_model: GeneratorTorch) -> None:
              super().__init__()
              self.atb_model = atb_model
          def forward(self, logits: torch.Tensor, sampling_metadata: SamplingMetadata) -> Optional[SamplerOutput]:
              _, vocab_size = logits.shape
              if not sampling_metadata.perform_sampling:
                  return None
              # Apply logits processors (if any).
              logits = _apply_logits_processors(logits, sampling_metadata)
              sampling_data, sampling_param = self.get_sample_data_and_param(sampling_metadata, vocab_size)
              logprobs = torch.log_softmax(logits, dim=-1, dtype=torch.float)
              next_tokens,_ = self.atb_model.sample(logits, sampling_data, sampling_param)
              # Sample the next tokens.
              sample_results = self.get_sample_results(sampling_metadata, next_tokens)
              # Get the logprobs query results.
              prompt_logprobs, sample_logprobs = _get_logprobs(logprobs, sampling_metadata, sample_results)
              return _build_sampler_output(sample_results, sampling_metadata, prompt_logprobs, sample_logprobs)
          def get_sample_data_and_param(
              self,
              sampling_metadata: "SamplingMetadata",
              vocab_size: int,
          ) -> tuple[SamplingData, SamplingParam]:
              """Get sample data and parameter from sampling metadata.
              Args:
                  sampling_metadata (SamplingMetadata): Sampling metadata.
                  vocab_size (int): Vocabulary size.
              Returns:
                  tuple[SamplingData, SamplingParam]: Sample data and parameter.
              """
              all_input_tokens: List[List[int]] = []
              prompt_tokens: List[List[int]] = []
              output_tokens: List[List[int]] = []
              top_ks: List[int] = []
              temperatures: List[float] = []
              top_ps: List[float] = []
              min_ps: List[float] = []
              presence_penalties: List[float] = []
              frequency_penalties: List[float] = []
              repetition_penalties: List[float] = []
              seeds: List[int] = []
              do_penalties = False
              do_top_p_top_k = False
              do_min_p = False
              for i, seq_group in enumerate(sampling_metadata.seq_groups):
                  seq_ids, sampling_params = seq_group
                  temperature = sampling_params.temperature
                  p = sampling_params.presence_penalty
                  f = sampling_params.frequency_penalty
                  r = sampling_params.repetition_penalty
                  top_p = sampling_params.top_p
                  min_p = sampling_params.min_p
                  # k should not be greater than the vocab size.
                  top_k = min(sampling_params.top_k, vocab_size)
                  top_k = vocab_size if top_k == -1 else top_k
                  is_greedy = sampling_params.sampling_type == SamplingType.GREEDY
                  seed = sampling_params.seed or (
                      0 if is_greedy else randint(torch.iinfo(torch.long).min, torch.iinfo(torch.long).max)
                  )
                  if temperature < _SAMPLING_EPS:
                      # NOTE: Zero temperature means deterministic sampling
                      # (i.e., greedy sampling or beam search).
                      # Set the temperature to 1 to avoid division by zero.
                      temperature = 1.0
                  if not do_top_p_top_k and (top_p < 1.0 - _SAMPLING_EPS or top_k != vocab_size):
                      do_top_p_top_k = True
                  if not do_min_p and min_p > _SAMPLING_EPS:
                      do_min_p = True
                  if not do_penalties and any(
                      [abs(p) >= _SAMPLING_EPS, abs(f) >= _SAMPLING_EPS, abs(r - 1.0) >= _SAMPLING_EPS]
                  ):
                      do_penalties = True
                  if i < sampling_metadata.num_prompts and sampling_params.prompt_logprobs is not None:
                      # For tokens in the prompt that we only need to get their logprobs
                      prompt_len = sampling_metadata.prompt_lens[i]
                      temperatures += [temperature] * (prompt_len - 1)
                      seeds += [seed] * (prompt_len - 1)
                      top_ps += [top_p] * (prompt_len - 1)
                      top_ks += [top_k] * (prompt_len - 1)
                      min_ps += [min_p] * (prompt_len - 1)
                      presence_penalties += [0] * (prompt_len - 1)
                      frequency_penalties += [0] * (prompt_len - 1)
                      repetition_penalties += [1] * (prompt_len - 1)
                      prompt_tokens.extend([] for _ in range(prompt_len - 1))
                      output_tokens.extend([] for _ in range(prompt_len - 1))
                      all_input_tokens.extend([] for _ in range(prompt_len - 1))
                  for seq_id in seq_ids:
                      seq_data = sampling_metadata.seq_data[seq_id]
                      prompt_tokens.append(seq_data.prompt_token_ids)
                      output_tokens.append(seq_data.output_token_ids)
                      all_input_tokens.append(seq_data.prompt_token_ids + seq_data.output_token_ids)
                  temperatures += [temperature] * len(seq_ids)
                  seeds += [seed] * len(seq_ids)
                  top_ps += [top_p] * len(seq_ids)
                  top_ks += [top_k] * len(seq_ids)
                  min_ps += [min_p] * len(seq_ids)
                  presence_penalties += [p] * len(seq_ids)
                  frequency_penalties += [f] * len(seq_ids)
                  repetition_penalties += [r] * len(seq_ids)
              max_tokens_len = max([len(tokens) for tokens in all_input_tokens], default=0)
              padded_all_input_tokens = [
                  tokens + [vocab_size] * (max_tokens_len - len(tokens))
                  for tokens in all_input_tokens
              ]
              output_max_len = max([len(tokens) for tokens in output_tokens], default=0)
              padded_output_tokens = [tokens + [vocab_size] * (output_max_len - len(tokens)) for tokens in output_tokens]
              # To use from_numpy or not to, this is a question.
              # Convert to tensor if not None
              all_input_ids = padded_all_input_tokens and _to_tensor(padded_all_input_tokens, torch.int32)
              output_ids = padded_output_tokens and _to_tensor(padded_output_tokens, torch.int32)
              sampling_data = SamplingData(all_input_ids=all_input_ids, output_ids=output_ids)
              repetition_penalties = np.array(repetition_penalties, dtype=np.float32)
              frequency_penalties = np.array(frequency_penalties, dtype=np.float32)
              presence_penalties = np.array(presence_penalties, dtype=np.float32)
              temperatures = np.array(temperatures, dtype=np.float32)
              top_ks = np.array(top_ks, dtype=np.int32)
              top_ps = np.array(top_ps, dtype=np.float32)
              seeds = np.array(seeds, dtype=np.int32)
              do_sample = np.array([True] * len(sampling_metadata.seq_groups))  # vLLM 0.3.3 does not have do_sample
              # Temporary solution, need to find out a better way to deal with this
              if is_greedy:
                  sampling_param = None
              else:
                  sampling_param = SamplingParam.from_numpy(
                      repetition_penalty=repetition_penalties,
                      frequency_penalty=frequency_penalties,
                      presence_penalty=presence_penalties,
                      temperature=temperatures,
                      top_k=top_ks,
                      top_p=top_ps,
                      seed=seeds,
                      do_sample=do_sample,
                      to_tensor=_to_tensor,
                  )
              return sampling_data, sampling_param
          def get_sample_results(
              self,
              sampling_metadata: SamplingMetadata,
              samples: np.array,
          ) -> List[Tuple[List[int], List[int]]]:
              """Reconstructs the sample results from the backend samples.
              Args:
                  sampling_metadata (SamplingMetadata): Sampling metadata.
                  samples (np.array): Backend samples.
              Raises:
                  ValueError: Beam search not supported temporarily.
              Returns:
                  List[Tuple[List[int], List[int]]]: Sample results.
              """
              categorized_seq_group_ids = {t: [] for t in SamplingType}
              categorized_sample_indices = sampling_metadata.categorized_sample_indices
              for i, seq_group in enumerate(sampling_metadata.seq_groups):
                  _, sampling_params = seq_group
                  sampling_type = sampling_params.sampling_type
                  categorized_seq_group_ids[sampling_type].append(i)
              sample_results_dict: Dict[int, Tuple[List[int], List[int]]] = {}
              sample_metadata = {}
              # Counterintiutively, having two loops here is actually faster.
              # The first loop can run without waiting on GPU<->CPU sync.
              for sampling_type in SamplingType:
                  if len(categorized_sample_indices[sampling_type]) == 0:
                      continue
                  seq_group_ids = categorized_seq_group_ids[sampling_type]
                  seq_groups = [sampling_metadata.seq_groups[i] for i in seq_group_ids]
                  sample_metadata[sampling_type] = (seq_group_ids, seq_groups)
              for sampling_type in SamplingType:
                  if sampling_type not in sample_metadata:
                      continue
                  seq_group_ids, seq_groups = sample_metadata[sampling_type]
                  if sampling_type in (SamplingType.GREEDY, SamplingType.RANDOM, SamplingType.RANDOM_SEED):
                      sample_results = self.extract_next_tokens_and_parents(seq_groups, samples)
                  elif sampling_type == SamplingType.BEAM:
                      raise ValueError(f"Unsupported sampling type: beam search.")
                  sample_results_dict.update(zip(seq_group_ids, sample_results))
              sample_results = [sample_results_dict.get(i) for i in range(len(sampling_metadata.seq_groups))]
              return sample_results
          def extract_next_tokens_and_parents(
              self,
              selected_seq_groups: List[Tuple[List[int], SamplingParams]],
              samples: np.array,
          ) -> List[Tuple[List[int], List[int]]]:
              samples = samples.tolist()
              sample_idx = 0
              results = []
              for seq_group in selected_seq_groups:
                  seq_ids, _ = seq_group
                  num_parent_seqs = len(seq_ids)
                  parent_ids = list(range(num_parent_seqs))
                  next_token_ids = [samples[sample_idx]]
                  results.append((next_token_ids, parent_ids))
                  sample_idx += num_parent_seqs
              return results
    6. ./vllm_npu/vllm_npu/model_executor/models/ascend/__init__.py:占位文件。
      1
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      
    7. ./vllm_npu/vllm_npu/model_executor/models/ascend/mindie_llm_wrapper.py:推理部分的实现。
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import math
      from typing import List, Literal, Optional
      import torch
      from mindie_llm.text_generator.adapter.generator_torch import GeneratorTorch
      from torch import nn
      from vllm.lora.request import LoRARequest
      from vllm.model_executor import SamplingMetadata
      from vllm.model_executor.input_metadata import InputMetadata
      from vllm.sequence import SamplerOutput
      from vllm.worker.cache_engine import KVCache
      from vllm_npu.model_executor.layers.sampler import Sampler
      DEVICE_TYPE = "npu"
      DEFAULT_FORMAT = "auto"
      class MindIELlmWrapper(nn.Module):
          def __init__(self, mindie_config):
              super(MindIELlmWrapper, self).__init__()
              self.mindie_config = mindie_config
              self.mindie_model = None
              self.sampler = None
          def forward(
              self,
              input_ids: torch.Tensor,
              positions: torch.Tensor,
              kv_caches: List[KVCache],
              input_metadata: InputMetadata,
              lora_requests: List[LoRARequest],
          ) -> torch.Tensor:
              if kv_caches[0][0] is None:
                  kv_caches, block_tables, slots = self.create_dummy_kv_cache(input_metadata, input_ids)
              else:
                  if input_metadata.is_prompt:
                      block_tables = torch.tensor([0], dtype=torch.int32, device="npu")
                  else:
                      block_tables = input_metadata.block_tables
                  slots = input_metadata.slot_mapping
              if input_metadata.is_prompt:
                  input_lengths = input_metadata.prompt_lens.to(torch.int32)
                  max_seq_len = int(input_metadata.prompt_lens.max())
                  lm_head_indices = (input_metadata.prompt_lens.cumsum(dim=-1) - 1).to(torch.int64)
              else:
                  input_lengths = input_metadata.context_lens
                  max_seq_len = input_metadata.max_context_len
                  lm_head_indices = None
              adapter_ids = [lora_request.lora_name if lora_request else None for lora_request in lora_requests]
              logits = self.mindie_model.forward_tensor(
                  input_ids,
                  positions,
                  input_metadata.is_prompt,
                  kv_caches,
                  block_tables,
                  slots,
                  input_lengths,
                  max_seq_len,
                  lm_head_indices,
                  adapter_ids=adapter_ids,  # batch_size len
              )
              return logits
          def sample(
              self,
              hidden_states: torch.Tensor,
              sampling_metadata: SamplingMetadata,
          ) -> Optional[SamplerOutput]:
              # hidden_states is logits
              next_tokens = self.sampler(hidden_states, sampling_metadata)
              return next_tokens
          def load_weights(self, load_format: Literal["auto", "safetensors", "pt"] = DEFAULT_FORMAT):
              """Load model weights.
              Args:
                  load_format (Literal[auto, safetensors, pt], optional): The format of weights. Defaults to "auto".
              Raises:
                  ValueError: Format not supported.
              """
              if load_format not in ["auto", "safetensors", "pt"]:
                  raise ValueError("load-format support [safetensors, pt]")
              self.weight_dtype = torch.get_default_dtype()
              torch.set_default_dtype(torch.float32)
              # Replacing the deprecated method with GeneratorTorch
              self.mindie_model = GeneratorTorch(self.mindie_config)
              self.sampler = Sampler(self.mindie_model)
              self.sampler.logits_as_hidden_states = True
              torch.set_default_dtype(self.weight_dtype)
          # when warmup, create dummy kvcache, block_tables, slot_mapping
          def create_dummy_kv_cache(self, input_metadata, input_ids):
              dummy_block_size = 128
              max_s = max(input_metadata.prompt_lens)
              max_need_block = math.ceil(max_s / dummy_block_size)
              batch_size = len(input_metadata.prompt_lens)
              self.dummy_block_num = max_need_block * batch_size
              kv_cache = [
                  (
                      torch.empty(
                          (
                              self.dummy_block_num,
                              dummy_block_size,
                              self.mindie_model.model_info.num_kv_heads,
                              self.mindie_model.model_info.head_size,
                          ),
                          dtype=self.weight_dtype,
                          device=DEVICE_TYPE,
                      ),
                      torch.empty(
                          (
                              self.dummy_block_num,
                              dummy_block_size,
                              self.mindie_model.model_info.num_kv_heads,
                              self.mindie_model.model_info.head_size,
                          ),
                          dtype=self.weight_dtype,
                          device=DEVICE_TYPE,
                      ),
                  )
                  for _ in range(self.mindie_model.model_info.num_layers)
              ]
              max_s = max(input_metadata.prompt_lens)
              max_need_block = math.ceil(max_s / dummy_block_size)
              batch_size = len(input_metadata.prompt_lens)
              block_tables = torch.zeros(batch_size, max_need_block, dtype=int, device=DEVICE_TYPE)
              slot = [i for i in range(dummy_block_size)]
              slots = []
              warm_up_len = len(input_ids)
              while warm_up_len > 0:
                  if warm_up_len > dummy_block_size:
                      slots.extend(slot)
                      warm_up_len -= dummy_block_size
                  else:
                      slots.extend(slot[:warm_up_len])
                      warm_up_len = 0
              slots = torch.tensor(slots, dtype=torch.long, device=DEVICE_TYPE)
              return kv_cache, block_tables, slots
    8. ./vllm_npu/vllm_npu/model_executor/models/__init__.py:占位文件。
      1
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      
  8. ./vllm_npu/vllm_npu/worker
    1. ./vllm_npu/vllm_npu/worker/ascend_worker.py:昇腾环境worker。
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of codes in this file was copied from project [vLLM Team][vllm]
      """A Ascend worker class."""
      import gc
      from typing import Dict, List, Optional, Set, Tuple
      import torch
      import torch.distributed
      from mindie_llm.modeling.backend_type import BackendType
      from vllm.config import (
          CacheConfig,
          DeviceConfig,
          LoRAConfig,
          ModelConfig,
          ParallelConfig,
          SchedulerConfig,
      )
      from vllm.lora.request import LoRARequest
      from vllm.model_executor import set_random_seed
      from vllm.model_executor.parallel_utils.communication_op import broadcast_tensor_dict
      from vllm.model_executor.parallel_utils.custom_all_reduce import init_custom_ar
      from vllm.model_executor.parallel_utils.parallel_state import ensure_model_parallel_initialized
      from vllm.sequence import SamplerOutput, SequenceGroupMetadata
      from vllm.worker.cache_engine import CacheEngine
      from vllm.worker.model_runner import ModelRunner
      class Worker:
          """A worker class that executes the model on a group of NPUs."""
          def __init__(
              self,
              model_config: ModelConfig,
              parallel_config: ParallelConfig,
              scheduler_config: SchedulerConfig,
              device_config: DeviceConfig,
              local_rank: int,
              rank: int,
              distributed_init_method: str,
              lora_config: Optional[LoRAConfig] = None,
              kv_cache_dtype: Optional[str] = "auto",
              is_driver_worker: bool = False,
          ) -> None:
              self.model_config = model_config
              self.parallel_config = parallel_config
              self.scheduler_config = scheduler_config
              self.device_config = device_config
              self.local_rank = local_rank
              self.rank = rank
              self.distributed_init_method = distributed_init_method
              self.lora_config = lora_config
              self.is_driver_worker = is_driver_worker
              if self.is_driver_worker and self.rank != 0:
                  raise ValueError("The driver worker must have rank 0.")
              self.device = None
              self.model_runner = ModelRunner(
                  model_config,
                  parallel_config,
                  scheduler_config,
                  device_config,
                  self.lora_config,
                  kv_cache_dtype,
                  is_driver_worker,
              )
              # Uninitialized cache engine. Will be initialized by
              # self.init_cache_engine().
              self.cache_config = None
              self.cache_engine = None
              self.cache_events = None
              self.gpu_cache = None
          def init_model(self, cupy_port: Optional[int] = None) -> None:
              self.device = torch.device(f"npu:{self.local_rank}")
              torch.npu.set_device(self.device)
              # Initialize the distributed environment.
              init_distributed_environment(self.parallel_config, self.rank, self.distributed_init_method)
              # Initialize the model.
              set_random_seed(self.model_config.seed)
          def load_model(self):
              # Add an attribute dynamically, not graceful, but useful
              self.model_config.mindie_config = {
                  "backend_type": BackendType.ATB,
                  "model_id": self.model_config.model,
                  "rank": self.rank,
                  "local_rank": self.local_rank,
                  "world_size": self.parallel_config.world_size,
                  "npu_device_id": self.local_rank,
                  "trust_remote_code": self.model_config.trust_remote_code,
              }
              self.model_runner.load_model()
              del self.model_config.mindie_config
          @torch.inference_mode()
          def profile_num_available_blocks(
              self, block_size: int, gpu_memory_utilization: float, cpu_swap_space: int, cache_dtype: str
          ) -> Tuple[int, int]:
              """Profiles the peak memory usage of the model and returns the maximum
              number of GPU and CPU cache blocks that can be allocated.
              Args:
                  block_size: The size of the cache block.
                  gpu_memory_utilization: The fraction of the total GPU memory to use.
                  cpu_swap_space: The size of the CPU swap space in bytes.
              """
              # Profile the memory usage of the model and get the maximum number of
              # cache blocks that can be allocated with the remaining free memory.
              torch.npu.empty_cache()
              torch.npu.reset_peak_memory_stats()
              # Execute a forward pass with dummy inputs to profile the memory usage
              # of the model.
              self.model_runner.profile_run()
              # Calculate the number of blocks that can be allocated with the
              # profiled peak memory.
              torch.npu.synchronize()
              peak_memory = torch.npu.max_memory_allocated()
              total_gpu_memory = torch.npu.get_device_properties(self.rank).total_memory
              cache_block_size = CacheEngine.get_cache_block_size(
                  block_size, cache_dtype, self.model_config, self.parallel_config
              )
              num_gpu_blocks = (
                  int((total_gpu_memory * gpu_memory_utilization - peak_memory) // cache_block_size)
              )
              num_cpu_blocks = int(cpu_swap_space // cache_block_size)
              num_gpu_blocks = max(num_gpu_blocks, 0)
              num_cpu_blocks = max(num_cpu_blocks, 0)
              gc.collect()
              torch.npu.empty_cache()
              return num_gpu_blocks, num_cpu_blocks
          def init_cache_engine(self, cache_config: CacheConfig) -> None:
              self.cache_config = cache_config
              self.cache_engine = CacheEngine(self.cache_config, self.model_config, self.parallel_config)
              self.cache_events = self.cache_engine.events
              self.gpu_cache = self.cache_engine.gpu_cache
              self.model_runner.set_block_size(self.cache_engine.block_size)
          def warm_up_model(self) -> None:
              pass
          def cache_swap(
              self,
              blocks_to_swap_in: Dict[int, int],
              blocks_to_swap_out: Dict[int, int],
              blocks_to_copy: Dict[int, List[int]],
          ) -> None:
              # Issue cache operations.
              issued_cache_op = False
              if blocks_to_swap_in:
                  self.cache_engine.swap_in(blocks_to_swap_in)
                  issued_cache_op = True
              if blocks_to_swap_out:
                  self.cache_engine.swap_out(blocks_to_swap_out)
                  issued_cache_op = True
              if blocks_to_copy:
                  self.cache_engine.copy(blocks_to_copy)
                  issued_cache_op = True
              cache_events = self.cache_events if issued_cache_op else None
              if cache_events is not None:
                  for event in cache_events:
                      event.wait()
          @torch.inference_mode()
          def execute_model(
              self,
              seq_group_metadata_list: Optional[List[SequenceGroupMetadata]] = None,
              blocks_to_swap_in: Optional[Dict[int, int]] = None,
              blocks_to_swap_out: Optional[Dict[int, int]] = None,
              blocks_to_copy: Optional[Dict[int, List[int]]] = None,
          ) -> Optional[SamplerOutput]:
              if self.is_driver_worker:
                  if seq_group_metadata_list is None:
                      raise ValueError("seq_group_metadata_list must be provided for the driver worker.")
                  num_seq_groups = len(seq_group_metadata_list)
                  if any(x is None for x in [blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy]):
                      raise ValueError(
                          "blocks_to_swap_in, blocks_to_swap_out, and blocks_to_copy must be provided for the driver worker."
                      )
                  data = {
                      "num_seq_groups": num_seq_groups,
                      "blocks_to_swap_in": blocks_to_swap_in,
                      "blocks_to_swap_out": blocks_to_swap_out,
                      "blocks_to_copy": blocks_to_copy,
                  }
                  broadcast_tensor_dict(data, src=0)
              else:
                  data = broadcast_tensor_dict(src=0)
                  num_seq_groups = data["num_seq_groups"]
                  blocks_to_swap_in = data["blocks_to_swap_in"]
                  blocks_to_swap_out = data["blocks_to_swap_out"]
                  blocks_to_copy = data["blocks_to_copy"]
              self.cache_swap(blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy)
              # If there is no input, we don't need to execute the model.
              if num_seq_groups == 0:
                  return {}
              output = self.model_runner.execute_model(seq_group_metadata_list, self.gpu_cache)
              return output
          def add_lora(self, lora_request: LoRARequest) -> bool:
              return self.model_runner.add_lora(lora_request)
          def remove_lora(self, lora_id: int) -> bool:
              return self.model_runner.remove_lora(lora_id)
          def list_loras(self) -> Set[int]:
              return self.model_runner.list_loras()
      def init_distributed_environment(
          parallel_config: ParallelConfig,
          rank: int,
          distributed_init_method: Optional[str] = None,
      ) -> None:
          """Initialize the distributed environment."""
          if torch.distributed.is_initialized():
              torch_world_size = torch.distributed.get_world_size()
              if torch_world_size != parallel_config.world_size:
                  raise RuntimeError(
                      "torch.distributed is already initialized but the torch world "
                      "size does not match parallel_config.world_size "
                      f"({torch_world_size} vs. {parallel_config.world_size})."
                  )
          elif not distributed_init_method:
              raise ValueError(
                  "distributed_init_method must be set if torch.distributed "
                  f"is not already initialized: {distributed_init_method}"
              )
          else:
              torch.distributed.init_process_group(
                  backend="nccl",
                  world_size=parallel_config.world_size,
                  rank=rank,
                  init_method=distributed_init_method,
              )
          ensure_model_parallel_initialized(parallel_config.tensor_parallel_size, parallel_config.pipeline_parallel_size)
          # Initialize a custom fast all-reduce implementation.
          if not parallel_config.disable_custom_all_reduce:
              init_custom_ar()
    2. ./vllm_npu/vllm_npu/worker/cache_engine.py:修改kv block的数据维度。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      from typing import Tuple
      def get_key_block_shape(self) -> Tuple[int, int, int]:
          return (
              self.block_size,
              self.num_heads,
              self.head_size,
          )
      def get_value_block_shape(self) -> Tuple[int, int, int]:
          return (
              self.block_size,
              self.num_heads,
              self.head_size,
          )
      
    3. ./vllm_npu/vllm_npu/worker/__init__.py:worker模块的函数替换。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import vllm_npu.worker.ascend_worker
      from vllm_npu.worker.cache_engine import get_key_block_shape, get_value_block_shape
      from vllm_npu.worker.model_runner import (
          _make_tensor_with_pad,
          _prepare_decode,
          _prepare_prompt,
          execute_model,
          load_model,
          profile_run,
      )
      from vllm.worker import cache_engine, model_runner
      cache_engine.CacheEngine.get_key_block_shape = get_key_block_shape
      cache_engine.CacheEngine.get_value_block_shape = get_value_block_shape
      model_runner._make_tensor_with_pad = _make_tensor_with_pad
      model_runner.ModelRunner._prepare_decode = _prepare_decode
      model_runner.ModelRunner._prepare_prompt = _prepare_prompt
      model_runner.ModelRunner.execute_model = execute_model
      model_runner.ModelRunner.load_model = load_model
      model_runner.ModelRunner.profile_run = profile_run
      
    4. ./vllm_npu/vllm_npu/worker/model_runner.py:昇腾环境的model_runner。
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of codes in this file was copied from project [vLLM Team][vllm]
      from collections import namedtuple
      from typing import List, Optional, Set, Tuple, Union, NamedTuple
      import torch
      from vllm.lora.request import LoRARequest
      from vllm.model_executor import InputMetadata, get_model
      from vllm.sampling_params import SamplingParams
      from vllm.sequence import SamplerOutput, SequenceData, SequenceGroupMetadata
      from vllm.worker.model_runner import _BATCH_SIZES_TO_CAPTURE, _PAD_SLOT_ID, _pad_to_max
      class PreparedPromptData(NamedTuple):
          tokens: torch.Tensor
          positions: torch.Tensor
          metadata: InputMetadata
          prompt_lens: List[int]
          subquery_lens: List[int]
          lora_index_mapping: List[int]
          lora_prompt_mapping: List[int]
          requests: List[LoRARequest]
      class PreparedDecodeData(NamedTuple):
          tokens: torch.Tensor
          positions: torch.Tensor
          metadata: InputMetadata
          index_mapping: List[int]
          prompt_mapping: List[int]
          requests: List[LoRARequest]
      def load_model(self) -> None:
          self.model = get_model(
              self.model_config,
              self.device_config,
              lora_config=self.lora_config,
              parallel_config=self.parallel_config,
              scheduler_config=self.scheduler_config,
          )
      def _prepare_prompt(
          self,
          seq_group_metadata_list: List[SequenceGroupMetadata],
      ) -> PreparedPromptData:
          if len(seq_group_metadata_list) == 0:
              raise ValueError("seq_group_metadata_list is empty")
          input_tokens: List[List[int]] = []
          input_positions: List[List[int]] = []
          slot_mapping: List[List[int]] = []
          lora_index_mapping: List[int] = []
          lora_prompt_mapping: List[int] = []
          lora_requests: List[LoRARequest] = []
          prompt_lens: List[int] = []
          context_lens: List[int] = []
          subquery_lens: List[int] = []
          prefix_block_tables: List[List[int]] = []
          for seq_group_metadata in seq_group_metadata_list:
              if not seq_group_metadata.is_prompt:
                  raise ValueError("seq_group_metadata.is_prompt is False")
              seq_ids = list(seq_group_metadata.seq_data.keys())
              if len(seq_ids) != 1:
                  raise ValueError("Expected only one sequence ID, but found multiple")
              seq_id = seq_ids[0]
              seq_data = seq_group_metadata.seq_data[seq_id]
              prompt_tokens = seq_data.get_token_ids()
              prompt_len = len(prompt_tokens)
              prompt_lens.append(prompt_len)
              prefix_len = 0
              prefix = seq_group_metadata.prefix
              if prefix is not None and prefix.computed:
                  prefix_len = prefix.get_length()
                  prompt_tokens = prompt_tokens[prefix_len:]
                  prefix_block_tables.append(prefix.get_block_numbers())
              else:
                  prefix_block_tables.append([])
              # actual prompt lens
              context_lens.append(prefix_len)
              subquery_lens.append(prompt_len - prefix_len)
              input_tokens.append(prompt_tokens)
              # NOTE(woosuk): Here we assume that the first token in the prompt
              # is always the first token in the sequence.
              input_positions.append(list(range(prefix_len, prefix_len + len(prompt_tokens))))
              lora_id = seq_group_metadata.lora_int_id
              lora_requests.append(seq_group_metadata.lora_request)
              lora_index_mapping.append([lora_id] * (prompt_len - prefix_len))
              lora_prompt_mapping.extend(
                  [lora_id] * (prompt_len - prefix_len if seq_group_metadata.sampling_params.prompt_logprobs else 1)
              )
              if seq_group_metadata.block_tables is None:
                  # During memory profiling, the block tables are not initialized
                  # yet. In this case, we just use a dummy slot mapping.
                  slot_mapping.append([_PAD_SLOT_ID] * prompt_len)
                  continue
              # Compute the slot mapping.
              slot_mapping.append([])
              block_table = seq_group_metadata.block_tables[seq_id]
              # Mask the [0, start_idx) tokens of the prompt with _PAD_SLOT_ID,
              # where start_idx is max(0, prompt_len - sliding_window).
              # For example, if the prompt len is 10, sliding window is 8, and
              # block size is 4, the first two tokens are masked and the slot
              # mapping will be [-1, -1, 2, 3, 4, 5, 6, 7, 0, 1].
              start_idx = 0
              if self.sliding_window is not None:
                  if prefix_len != 0:
                      raise ValueError("Prefix caching is currently not supported with sliding window attention")
                  start_idx = max(0, prompt_len - self.sliding_window)
              for i in range(prefix_len, prompt_len):
                  if i < start_idx:
                      slot_mapping[-1].append(_PAD_SLOT_ID)
                      continue
                  block_number = block_table[i // self.block_size]
                  block_offset = i % self.block_size
                  slot = block_number * self.block_size + block_offset
                  slot_mapping[-1].append(slot)
          max_prompt_len = max(subquery_lens)
          input_tokens = _make_tensor_with_pad(input_tokens, max_prompt_len, pad=0, dtype=torch.long, device=self.device)
          input_positions = _make_tensor_with_pad(
              input_positions, max_prompt_len, pad=0, dtype=torch.long, device=self.device
          )
          slot_mapping = _make_tensor_with_pad(
              slot_mapping, max_prompt_len, pad=_PAD_SLOT_ID, dtype=torch.long, device=self.device
          )
          lora_index_mapping = [_pad_to_max(mapping, max_prompt_len, pad=0) for mapping in lora_index_mapping]
          context_lens_tensor = torch.tensor(context_lens, dtype=torch.int, device=self.device)
          # Prepare prefix block tables
          max_prompt_block_table_len = max(len(t) for t in prefix_block_tables)
          block_tables = _make_tensor_with_pad(
              prefix_block_tables, max_len=max_prompt_block_table_len, pad=0, dtype=torch.int, device=self.device
          )
          start_loc_tensor = torch.arange(
              0, len(prompt_lens) * max_prompt_len, max_prompt_len, dtype=torch.long, device=self.device
          )
          prompt_lens_tensor = torch.tensor(prompt_lens, dtype=torch.long, device=self.device)
          input_metadata = InputMetadata(
              is_prompt=True,
              slot_mapping=slot_mapping,
              prompt_lens=prompt_lens_tensor,
              max_seq_len=max_prompt_len,
              start_loc=start_loc_tensor,
              max_context_len=None,
              context_lens=context_lens_tensor,
              block_tables=block_tables,
              use_cuda_graph=False,
              kv_cache_dtype=self.kv_cache_dtype,
          )
          return PreparedPromptData(
              input_tokens,
              input_positions,
              input_metadata,
              prompt_lens,
              subquery_lens,
              lora_index_mapping,
              lora_prompt_mapping,
              lora_requests,
          )
      def _prepare_decode(self, seq_group_metadata_list: List[SequenceGroupMetadata]) -> PreparedDecodeData:
          if not seq_group_metadata_list:
              raise ValueError("seq_group_metadata_list is empty")
          input_tokens: List[List[int]] = []
          input_positions: List[List[int]] = []
          slot_mapping: List[List[int]] = []
          context_lens: List[int] = []
          block_tables: List[List[int]] = []
          lora_index_mapping: List[int] = []
          lora_prompt_mapping: List[int] = []
          lora_requests: List[LoRARequest] = []
          max_num_blocks_per_seq = 0
          for seq_group_metadata in seq_group_metadata_list:
              if seq_group_metadata.is_prompt:
                  raise ValueError("seq_group_metadata.is_prompt is True")
              seq_ids = list(seq_group_metadata.seq_data.keys())
              lora_id = seq_group_metadata.lora_int_id
              lora_requests.append(seq_group_metadata.lora_request)
              for seq_id in seq_ids:
                  seq_data = seq_group_metadata.seq_data[seq_id]
                  generation_token = seq_data.get_last_token_id()
                  input_tokens.append([generation_token])
                  seq_len = seq_data.get_len()
                  position = seq_len - 1
                  input_positions.append([position])
                  context_len = seq_len if self.sliding_window is None else min(seq_len, self.sliding_window)
                  context_lens.append(context_len)
                  block_table = seq_group_metadata.block_tables[seq_id]
                  max_num_blocks_per_seq = max(max_num_blocks_per_seq, len(block_table))
                  block_number = block_table[position // self.block_size]
                  block_offset = position % self.block_size
                  slot = block_number * self.block_size + block_offset
                  slot_mapping.append([slot])
                  lora_index_mapping.append([lora_id])
                  lora_prompt_mapping.append(lora_id)
                  if self.sliding_window is not None:
                      sliding_window_blocks = self.sliding_window // self.block_size
                      block_table = block_table[-sliding_window_blocks:]
                  block_tables.append(block_table)
          batch_size = len(input_tokens)
          max_context_len = max(context_lens)
          use_captured_graph = (
              not self.model_config.enforce_eager
              and batch_size <= _BATCH_SIZES_TO_CAPTURE[-1]
              and max_context_len <= self.max_context_len_to_capture
          )
          input_tokens = _make_tensor_with_pad(input_tokens, max_len=1, pad=0, dtype=torch.long, device=self.device)
          input_positions = _make_tensor_with_pad(input_positions, max_len=1, pad=0, dtype=torch.long, device=self.device)
          slot_mapping = _make_tensor_with_pad(
              slot_mapping, max_len=1, pad=_PAD_SLOT_ID, dtype=torch.long, device=self.device
          )
          context_lens = torch.tensor(context_lens, dtype=torch.int, device=self.device)
          if use_captured_graph:
              # The shape of graph_block_tables is
              # [max batch size, max context len // block size].
              input_block_tables = self.graph_block_tables[:batch_size]
              for i, block_table in enumerate(block_tables):
                  if block_table:
                      input_block_tables[i, : len(block_table)] = block_table
              block_tables = torch.tensor(input_block_tables, device=self.device)
          else:
              padded_block_tables = [_pad_to_max(block_table, max_num_blocks_per_seq, 0) for block_table in block_tables]
              block_tables = torch.tensor(padded_block_tables, dtype=torch.int, device="npu")
          lora_index_mapping = [_pad_to_max(mapping, 1, pad=0) for mapping in lora_index_mapping]
          input_metadata = InputMetadata(
              is_prompt=False,
              slot_mapping=slot_mapping,
              prompt_lens=None,
              max_seq_len=None,
              start_loc=None,
              max_context_len=max_context_len,
              context_lens=context_lens,
              block_tables=block_tables,
              use_cuda_graph=use_captured_graph,
              kv_cache_dtype=self.kv_cache_dtype,
          )
          return PreparedDecodeData(
              input_tokens, input_positions, input_metadata, lora_index_mapping, lora_prompt_mapping, lora_requests
          )
      @torch.inference_mode()
      def execute_model(
          self,
          seq_group_meta_list: Optional[List[SequenceGroupMetadata]],
          kv_caches: List[Tuple[torch.Tensor, torch.Tensor]],
      ) -> Optional[SamplerOutput]:
          (tokens, positions, input_meta, sampling_meta, lora_requests, _) = self.prepare_input_tensors(seq_group_meta_list)
          hidden_states = self.model(
              input_ids=tokens,
              positions=positions,
              kv_caches=kv_caches,
              input_metadata=input_meta,
              lora_requests=lora_requests,
          )
          # Sample the next token.
          output = self.model.sample(hidden_states=hidden_states, sampling_metadata=sampling_meta)
          return output
      @torch.inference_mode()
      def profile_run(self) -> None:
          # Enable top-k sampling to reflect the accurate memory usage.
          vocab_size = self.model_config.get_vocab_size()
          sampling_params = SamplingParams(top_p=0.99, top_k=vocab_size - 1)
          max_num_batched_tokens = self.scheduler_config.max_num_batched_tokens
          max_num_seqs = self.scheduler_config.max_num_seqs
          # Profile memory usage with max_num_sequences sequences and the total
          # number of tokens equal to max_num_batched_tokens.
          seqs: List[SequenceGroupMetadata] = []
          for group_id in range(max_num_seqs):
              seq_len = max_num_batched_tokens // max_num_seqs + (group_id < max_num_batched_tokens % max_num_seqs)
              seq_data = SequenceData([0] * seq_len)
              seq = SequenceGroupMetadata(
                  request_id=str(group_id),
                  is_prompt=True,
                  seq_data={group_id: seq_data},
                  sampling_params=sampling_params,
                  block_tables=None,
                  lora_request=None,  # Warmup for LoRA has been done in the MindIE backend
              )
              seqs.append(seq)
          # Run the model with the dummy inputs.
          num_layers = self.model_config.get_num_layers(self.parallel_config)
          kv_caches = [(None, None)] * num_layers
          self.execute_model(seqs, kv_caches)
          torch.cuda.synchronize()
      def _make_tensor_with_pad(
          x: List[List[int]], max_len: int, pad: int, dtype: torch.dtype, device: Optional[Union[str, torch.device]]
      ) -> torch.Tensor:
          unpad_x = [i for l in x for i in l]
          return torch.tensor(unpad_x, dtype=dtype, device=device)
  9. 其他文件
    1. ./install.sh:安装脚本。
      #!/bin/bash
      if [ -d "./vllm" ]; then
          echo "./vllm directory has already exist!"
          exit 1
      fi
      git clone -b v0.3.3 https://github.com/vllm-project/vllm.git vllm
      yes | cp -r cover/* vllm/
      cd vllm
      pip install -r requirements.txt
      python3 setup.py install
      cd ../vllm_npu
      pip install -r requirements.txt
      python3 setup.py install
    2. ./README.md:介绍文档。
      # Vllm-MindIE
      #### 介绍
      昇腾推理引擎对接Vllm开源框架v0.3.3稳定版本补丁
      #### 软件架构
      软件架构说明
      #### 安装教程
      确保昇腾推理基础环境安装完成后,执行`install.sh`文件即可完成vllm及昇腾补丁的安装:
      ```sh
      bash install.sh
      ```
      #### 使用说明
      这里提供了vllm离线模式与在线服务的启动demo作为参考。
      *   离线模式:
          ```sh
          cd examples
          bash test_offine.sh
          ```
      *   在线服务:
          ```sh
          cd examples
          bash start_server.sh
          ```
    3. ./vllm_npu/requirements.txt:外部依赖。
      numpy
      decorator
      attrs
      psutil
      absl-py
      cloudpickle
      scipy
      tornado
      transformers
      accelerate
      pandas
      ray == 2.9.3
    4. ./vllm_npu/setup.py:安装配置文件。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      import io
      import os
      import re
      from typing import List
      import setuptools
      ROOT_DIR = os.path.dirname(__file__)
      def get_path(*filepath) -> str:
          return os.path.join(ROOT_DIR, *filepath)
      def find_version(filepath: str):
          """Extract version information from the given filepath.
          """
          with open(filepath) as fp:
              version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
                                        fp.read(), re.M)
              if version_match:
                  return version_match.group(1)
              raise RuntimeError("Unable to find version string.")
      def read_readme() -> str:
          """Read the README file."""
          return io.open(get_path("README.md"), "r", encoding="utf-8").read()
      def get_requirements() -> List[str]:
          """Get Python package dependencies from requirements.txt."""
          with open(get_path("requirements.txt")) as f:
              requirements = f.read().strip().split("\n")
          return requirements
      setuptools.setup(
          name="vllm_npu",
          version=find_version(get_path("vllm_npu", "__init__.py")),
          author="Huawei",
          license="Apache 2.0",
          description=("A high-throughput and memory-efficient inference and "
                       "serving engine for LLMs"),
          long_description=read_readme(),
          long_description_content_type="text/markdown",
          url="",
          project_urls={
              "Homepage": "",
              "Documentation": "",
          },
          classifiers=[
              "Programming Language :: Python :: 3.8",
              "Programming Language :: Python :: 3.9",
              "Programming Language :: Python :: 3.10",
              "Programming Language :: Python :: 3.11",
              "Topic :: Scientific/Engineering :: Artificial Intelligence",
          ],
          packages=setuptools.find_packages(exclude=("benchmarks", "examples", "tests")),
          python_requires=">=3.8",
          install_requires=get_requirements(),
      )
      
    5. ./vllm_npu/vllm_npu/config.py:配置文件。
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of codes in this file was copied from project [vLLM Team][vllm]
      from typing import Optional
      from sys import maxsize
      import torch
      from transformers import PretrainedConfig
      from vllm.logger import init_logger
      from vllm.utils import is_neuron
      logger = init_logger(__name__)
      class DeviceConfig:
          def __init__(self, device: str = "auto") -> None:
              if device == "auto":
                  # Automated device type detection
                  if getattr(torch.version, "cann", None) is not None:
                      self.device_type = "npu"
                  elif torch.cuda.is_available():
                      self.device_type = "cuda"
                  elif is_neuron():
                      self.device_type = "neuron"
                  else:
                      raise RuntimeError("No supported device detected.")
              else:
                  # Device type is assigned explicitly
                  self.device_type = device
              # Some device types require processing inputs on CPU
              if self.device_type in ["neuron"]:
                  self.device = torch.device("cpu")
              else:
                  # Set device with device type
                  self.device = torch.device(self.device_type)
          @property
          def is_neuron(self):
              return self.device_type == "neuron"
      def _get_and_verify_max_len(
          hf_config: PretrainedConfig,
          max_model_len: Optional[int],
      ) -> int:
          """Get and verify the model's maximum length."""
          derived_max_model_len = maxsize
          possible_keys = [
              # OPT
              "max_position_embeddings",
              # GPT-2
              "n_positions",
              # MPT
              "max_seq_len",
              # ChatGLM2
              "seq_length",
              # Command-R
              "model_max_length",
              # Others
              "max_sequence_length",
              "max_seq_length",
              "seq_len",
          ]
          for key in possible_keys:
              max_len_key = getattr(hf_config, key, None)
              if max_len_key is not None:
                  derived_max_model_len = min(derived_max_model_len, max_len_key)
          if derived_max_model_len == maxsize:
              if max_model_len is not None:
                  # If max_model_len is specified, we use it.
                  return max_model_len
              default_max_len = 2048
              logger.warning(
                  "The model's config.json does not contain any of the following "
                  "keys to determine the original maximum length of the model: "
                  f"{possible_keys}. Assuming the model's maximum length is "
                  f"{default_max_len}."
              )
              derived_max_model_len = default_max_len
          rope_scaling = getattr(hf_config, "rope_scaling", None)
          if rope_scaling is not None:
              if "type" in rope_scaling:
                  rope_type = rope_scaling["type"]
              elif "rope_type" in rope_scaling:
                  rope_type = rope_scaling["rope_type"]
              else:
                  raise ValueError("rope_scaling must have a 'type' or 'rope_type' key.")
              # The correct one should be "longrope", kept "su" here
              # to be backward compatible
              if rope_type not in ("su", "longrope", "llama3"):
                  if "factor" not in rope_scaling:
                      raise ValueError("rope_scaling must have a 'factor' key.")
                  scaling_factor = rope_scaling["factor"]
                  if rope_type == "yarn":
                      derived_max_model_len = rope_scaling["original_max_position_embeddings"]
                  derived_max_model_len *= scaling_factor
          if max_model_len is None:
              max_model_len = derived_max_model_len
          elif max_model_len > derived_max_model_len:
              raise ValueError(
                  f"User-specified max_model_len ({max_model_len}) is greater than "
                  f"the derived max_model_len ({max_len_key}={derived_max_model_len}"
                  " in model's config.json). This may lead to incorrect model "
                  "outputs or CUDA errors. Make sure the value is correct and "
                  "within the model context size."
              )
          return int(max_model_len)
    6. ./vllm_npu/vllm_npu/__init__.py:全局的函数替换。
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import torch
      import torch_npu
      from torch_npu.contrib import transfer_to_npu
      from vllm_npu.npu_adaptor import (
          BlockDiagonalCausalMask,
          LowerTriangularMaskWithTensorBias,
          cache_ops,
          context_attention_fwd,
          cuda_utils,
          ops,
      )
      import vllm_npu.core
      import vllm_npu.engine
      import vllm_npu.model_executor
      import vllm_npu.worker
      from vllm_npu.config import DeviceConfig, _get_and_verify_max_len
      from vllm_npu.sampling_params import _verify_args
      from vllm_npu.utils import get_ip
      from vllm import config, sampling_params, utils
      config.DeviceConfig = DeviceConfig
      config._get_and_verify_max_len = _get_and_verify_max_len
      sampling_params.SamplingParams._verify_args = _verify_args
      utils.get_ip = get_ip
      __version__ = "0.3.3"
    7. ./vllm_npu/vllm_npu/npu_adaptor.py:掩盖破坏功能的函数。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import importlib
      import sys
      def replace_modules(old_module_name, new_module_name):
          if old_module_name in sys.modules:
              del sys.modules[old_module_name]
          sys.modules[old_module_name] = importlib.import_module(new_module_name)
      _default_ops = (
          'xformers',
          'xformers.ops',
          'vllm._C',
          'xformers.ops.fmha.attn_bias',
          'vllm.model_executor.layers.triton_kernel.prefix_prefill'
      )
      for _ops in _default_ops:
          replace_modules(_ops, 'vllm_npu')
      # dummy class to avoid import error.
      class ops:
          pass
      class cuda_utils:
          pass
      class cache_ops:
          pass
      class BlockDiagonalCausalMask:
          pass
      class LowerTriangularMaskWithTensorBias:
          pass
      def context_attention_fwd():
          pass
      
    8. ./vllm_npu/vllm_npu/sampling_params.py:后处理参数。
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      # Part of codes in this file was copied from project [vLLM Team][vllm]
      from vllm.logger import init_logger
      logger = init_logger(__name__)
      def _verify_args(self) -> None:
          if self.n < 1:
              raise ValueError(f"n must be at least 1, got {self.n}.")
          if self.best_of < self.n:
              raise ValueError(f"best_of must be greater than or equal to n, " f"got n={self.n} and best_of={self.best_of}.")
          if not -2.0 <= self.presence_penalty <= 2.0:
              raise ValueError("presence_penalty must be in [-2, 2], got " f"{self.presence_penalty}.")
          if not -2.0 <= self.frequency_penalty <= 2.0:
              raise ValueError("frequency_penalty must be in [-2, 2], got " f"{self.frequency_penalty}.")
          if not 0.0 < self.repetition_penalty <= 2.0:
              raise ValueError("repetition_penalty must be in (0, 2], got " f"{self.repetition_penalty}.")
          if self.temperature < 0.0:
              raise ValueError(f"temperature must be non-negative, got {self.temperature}.")
          if not 0.0 < self.top_p <= 1.0:
              raise ValueError(f"top_p must be in (0, 1], got {self.top_p}.")
          if self.top_k < -1 or self.top_k == 0:
              raise ValueError(f"top_k must be -1 (disable), or at least 1, " f"got {self.top_k}.")
          if not 0.0 <= self.min_p <= 1.0:
              raise ValueError("min_p must be in [0, 1], got " f"{self.min_p}.")
          if self.max_tokens is not None and self.max_tokens < 1:
              raise ValueError(f"max_tokens must be at least 1, got {self.max_tokens}.")
          if self.logprobs is not None and self.logprobs < 0:
              raise ValueError(f"logprobs must be non-negative, got {self.logprobs}.")
          if self.prompt_logprobs is not None and self.prompt_logprobs < 0:
              raise ValueError(f"prompt_logprobs must be non-negative, got " f"{self.prompt_logprobs}.")
          # Currently MindIE backend does not support the params below:
          # Beam Search: best_of, use_beam_search, length_penalty, early_stopping
          # Token: stop, stop_token_ids, include_stop_str_in_output, skip_special_tokens, spaces_between_special_tokens
          # Misc: min_p, ignore_eos, prompt_logprobs,
          if self.stop or self.stop_token_ids:
              logger.warning("MindIE backend currently does not supported stop token feature, setting params to default")
              self.stop, self.stop_token_ids = [], []
          if self.use_beam_search:
              self._verify_beam_search()
              logger.warning("MindIE backend currently does not supported beam search, setting params to default")
              self.use_beam_search = False
              self.early_stopping = False
              self.length_penalty = 1.0
              self.best_of = 1
          if self.prompt_logprobs:
              logger.warning("MindIE backend currently does not supported logprobs per prompt, setting params to default")
              self.prompt_logprobs = None
    9. ./vllm_npu/vllm_npu/utils.py:工具函数。
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      # Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved.
      import socket
      def get_ip() -> str:
          try:
              with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ipv4_socket:
                  ipv4_socket.connect(("localhost", 80))
                  return ipv4_socket.getsockname()[0]
          except OSError:
              with socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) as ipv6_socket:
                  ipv6_socket.connect(("localhost", 80))
                  return ipv6_socket.getsockname()[0]