昇腾社区首页
中文
注册

vLLM 0.4.2版本参考适配代码

适配代码仓的目录结构如下所示:

├── cover
│   ├── requirements-ascend.txt
│   ├── setup.py
│   └── vllm
│      └── __init__.py
├── examples
│   ├── start_server.sh
│   ├── test_offline.py
│   └── test_offline.sh
├── install.sh
│   └──vllm_npu
├── requirements.txt
├── setup.py
│   └── vllm_npu
├── __init__.py
├── attention
│   ├── __init__.py
│   ├── backends.py
│   └── selector.py
├── config.py
├── core
│   └── __init__.py
├── engine
│   ├── __init__.py
│   ├── ascend_engine.py
│   └── async_ascend_engine.py
├── executor
│   ├── __init__.py
│   ├── ascend_executor.py
│   ├── ascend_ray_executor.py
│   └── ray_utils.py
├── model_executor
│   ├── __init__.py
│   ├── ascend_model_loader.py
│   ├── layers
│   │   ├── __init__.py
│   │   └── ascend_sampler.py
│   └── models
│   │   ├── __init__.py
│   │   └── ascend
│   │    │   ├── __init__.py
│   │    │   └── mindie_llm_wrapper.py
├── npu_adaptor.py
├── usage
│   ├── __init__.py
│   └── usage_lib.py
├── utils.py
│   └── worker
├── __init__.py
├── ascend_model_runner.py
├── ascend_worker.py
└── cache_engine.py

其中主要包括如下四个部分

  1. cover文件夹下包含了对vllm框架源码的修改内容。
  2. examples文件夹下包含了离线模式和在线模式的使用实例代码。
  3. vllm_npu文件夹下包含了补丁仓的源码内容。
  4. install.sh为一键式安装脚本,在将所有的代码文件都还原后,即可运行该脚本一键安装昇腾适配版的vllm框架,其中会自动拉取源码安装vllm原生框架并打上适配补丁。

代码仓中各个文件的代码内容

  • cover
    • requirements-ascend.txt
      cmake >= 3.21
      ninja  # For faster builds.
      psutil
      sentencepiece  # Required for LLaMA tokenizer.
      numpy
      requests
      py-cpuinfo
      transformers >= 4.40.0  # Required for StarCoder2 & Llava, Llama 3.
      tokenizers >= 0.19.1  # Required for Llama 3.
      fastapi
      openai
      uvicorn[standard]
      pydantic >= 2.0  # Required for OpenAI server.
      prometheus_client >= 0.18.0
      prometheus-fastapi-instrumentator >= 7.0.0
      tiktoken == 0.6.0  # Required for DBRX tokenizer
      lm-format-enforcer == 0.10.1
      typing_extensions
      filelock >= 3.10.4 # filelock starts to support `mode` argument from 3.10.4
      ray == 2.9.3
      pynvml == 11.5.0
      outlines == 0.0.34
    • setup.py
      import importlib.util
      import io
      import logging
      import os
      import re
      import subprocess
      import sys
      from shutil import which
      from typing import Dict, List
      import torch
      from packaging.version import Version, parse
      from setuptools import Extension, find_packages, setup
      from setuptools.command.build_ext import build_ext
      from torch.utils.cpp_extension import CUDA_HOME, BuildExtension
      def load_module_from_path(module_name, path):
          spec = importlib.util.spec_from_file_location(module_name, path)
          module = importlib.util.module_from_spec(spec)
          sys.modules[module_name] = module
          spec.loader.exec_module(module)
          return module
      ROOT_DIR = os.path.dirname(__file__)
      logger = logging.getLogger(__name__)
      # cannot import envs directly because it depends on vllm,
      #  which is not installed yet
      envs = load_module_from_path('envs', os.path.join(ROOT_DIR, 'vllm', 'envs.py'))
      VLLM_TARGET_DEVICE = 'npu'
      # vLLM only supports Linux platform
      assert sys.platform.startswith(
          "linux"), "vLLM only supports Linux platform (including WSL)."
      MAIN_CUDA_VERSION = "12.1"
      def is_sccache_available() -> bool:
          return which("sccache") is not None
      def is_ccache_available() -> bool:
          return which("ccache") is not None
      def is_ninja_available() -> bool:
          return which("ninja") is not None
      def remove_prefix(text, prefix):
          if text.startswith(prefix):
              return text[len(prefix):]
          return text
      class CMakeExtension(Extension):
          def __init__(self, name: str, cmake_lists_dir: str = '.', **kwa) -> None:
              super().__init__(name, sources=[], **kwa)
              self.cmake_lists_dir = os.path.abspath(cmake_lists_dir)
      
      def _is_cuda() -> bool:
          return VLLM_TARGET_DEVICE == "cuda" \
                  and torch.version.cuda is not None \
                  and not _is_neuron()
      def _is_hip() -> bool:
          return (VLLM_TARGET_DEVICE == "cuda"
                  or VLLM_TARGET_DEVICE == "rocm") and torch.version.hip is not None
      def _is_neuron() -> bool:
          torch_neuronx_installed = True
          try:
              subprocess.run(["neuron-ls"], capture_output=True, check=True)
          except (FileNotFoundError, PermissionError, subprocess.CalledProcessError):
              torch_neuronx_installed = False
          return torch_neuronx_installed or envs.VLLM_BUILD_WITH_NEURON
      def _is_cpu() -> bool:
          return VLLM_TARGET_DEVICE == "cpu"
      def _is_npu() -> bool:
          return VLLM_TARGET_DEVICE == "npu"
      def _install_punica() -> bool:
          return envs.VLLM_INSTALL_PUNICA_KERNELS
      def get_hipcc_rocm_version():
          # Run the hipcc --version command
          result = subprocess.run(['hipcc', '--version'],
                                  stdout=subprocess.PIPE,
                                  stderr=subprocess.STDOUT,
                                  text=True)
          # Check if the command was executed successfully
          if result.returncode != 0:
              print("Error running 'hipcc --version'")
              return None
          # Extract the version using a regular expression
          match = re.search(r'HIP version: (\S+)', result.stdout)
          if match:
              # Return the version string
              return match.group(1)
          else:
              print("Could not find HIP version in the output")
              return None
      def get_neuronxcc_version():
          import sysconfig
          site_dir = sysconfig.get_paths()["purelib"]
          version_file = os.path.join(site_dir, "neuronxcc", "version",
                                      "__init__.py")
          # Check if the command was executed successfully
          with open(version_file, "rt") as fp:
              content = fp.read()
          # Extract the version using a regular expression
          match = re.search(r"__version__ = '(\S+)'", content)
          if match:
              # Return the version string
              return match.group(1)
          else:
              raise RuntimeError("Could not find HIP version in the output")
      def get_nvcc_cuda_version() -> Version:
          """Get the CUDA version from nvcc.
          Adapted from https://github.com/NVIDIA/apex/blob/8b7a1ff183741dd8f9b87e7bafd04cfde99cea28/setup.py
          """
          assert CUDA_HOME is not None, "CUDA_HOME is not set"
          nvcc_output = subprocess.check_output([CUDA_HOME + "/bin/nvcc", "-V"],
                                                universal_newlines=True)
          output = nvcc_output.split()
          release_idx = output.index("release") + 1
          nvcc_cuda_version = parse(output[release_idx].split(",")[0])
          return nvcc_cuda_version
      def get_path(*filepath) -> str:
          return os.path.join(ROOT_DIR, *filepath)
      def find_version(filepath: str) -> str:
          """Extract version information from the given filepath.
          Adapted from https://github.com/ray-project/ray/blob/0b190ee1160eeca9796bc091e07eaebf4c85b511/python/setup.py
          """
          with open(filepath) as fp:
              version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
                                        fp.read(), re.M)
              if version_match:
                  return version_match.group(1)
              raise RuntimeError("Unable to find version string.")
      def get_vllm_version() -> str:
          version = find_version(get_path("vllm", "__init__.py"))
          # return version
          if _is_cuda():
              cuda_version = str(get_nvcc_cuda_version())
              if cuda_version != MAIN_CUDA_VERSION:
                  cuda_version_str = cuda_version.replace(".", "")[:3]
                  version += f"+cu{cuda_version_str}"
          elif _is_hip():
              # Get the HIP version
              hipcc_version = get_hipcc_rocm_version()
              if hipcc_version != MAIN_CUDA_VERSION:
                  rocm_version_str = hipcc_version.replace(".", "")[:3]
                  version += f"+rocm{rocm_version_str}"
          elif _is_neuron():
              # Get the Neuron version
              neuron_version = str(get_neuronxcc_version())
              if neuron_version != MAIN_CUDA_VERSION:
                  neuron_version_str = neuron_version.replace(".", "")[:3]
                  version += f"+neuron{neuron_version_str}"
          elif _is_npu():
              version += "+npu"
          elif _is_cpu():
              version += "+cpu"
          else:
              raise RuntimeError("Unknown runtime environment")
          return version
      def read_readme() -> str:
          """Read the README file if present."""
          p = get_path("README.md")
          if os.path.isfile(p):
              return io.open(get_path("README.md"), "r", encoding="utf-8").read()
          else:
              return ""
      def get_requirements() -> List[str]:
          """Get Python package dependencies from requirements.txt."""
          def _read_requirements(filename: str) -> List[str]:
              with open(get_path(filename)) as f:
                  requirements = f.read().strip().split("\n")
              resolved_requirements = []
              for line in requirements:
                  if line.startswith("-r "):
                      resolved_requirements += _read_requirements(line.split()[1])
                  else:
                      resolved_requirements.append(line)
              return resolved_requirements
          if _is_cuda():
              requirements = _read_requirements("requirements-cuda.txt")
              cuda_major, cuda_minor = torch.version.cuda.split(".")
              modified_requirements = []
              for req in requirements:
                  if "vllm-nccl-cu12" in req:
                      req = req.replace("vllm-nccl-cu12",
                                        f"vllm-nccl-cu{cuda_major}")
                  elif ("vllm-flash-attn" in req
                        and not (cuda_major == "12" and cuda_minor == "1")):
                      # vllm-flash-attn is built only for CUDA 12.1.
                      # Skip for other versions.
                      continue
                  modified_requirements.append(req)
              requirements = modified_requirements
          elif _is_hip():
              requirements = _read_requirements("requirements-rocm.txt")
          elif _is_neuron():
              requirements = _read_requirements("requirements-neuron.txt")
          elif _is_npu():
              requirements = _read_requirements("requirements-ascend.txt")
          elif _is_cpu():
              requirements = _read_requirements("requirements-cpu.txt")
          else:
              raise ValueError(
                  "Unsupported platform, please use CUDA, ROCm, Neuron, NPU or CPU.")
          return requirements
      ext_modules = []
      if _is_cuda():
          ext_modules.append(CMakeExtension(name="vllm._moe_C"))
      """
      if not _is_neuron():
          ext_modules.append(CMakeExtension(name="vllm._C"))
          if _install_punica():
              ext_modules.append(CMakeExtension(name="vllm._punica_C"))
      """
      package_data = {
          "vllm": ["py.typed", "model_executor/layers/fused_moe/configs/*.json"]
      }
      if envs.VLLM_USE_PRECOMPILED:
          ext_modules = []
          package_data["vllm"].append("*.so")
      setup(
          name="vllm",
          version=get_vllm_version(),
          author="vLLM Team",
          license="Apache 2.0",
          description=("A high-throughput and memory-efficient inference and "
                       "serving engine for LLMs"),
          long_description=read_readme(),
          long_description_content_type="text/markdown",
          url="https://github.com/vllm-project/vllm",
          project_urls={
              "Homepage": "https://github.com/vllm-project/vllm",
              "Documentation": "https://vllm.readthedocs.io/en/latest/",
          },
          classifiers=[
              "Programming Language :: Python :: 3.8",
              "Programming Language :: Python :: 3.9",
              "Programming Language :: Python :: 3.10",
              "Programming Language :: Python :: 3.11",
              "License :: OSI Approved :: Apache Software License",
              "Topic :: Scientific/Engineering :: Artificial Intelligence",
          ],
          packages=find_packages(exclude=("benchmarks", "csrc", "docs", "examples",
                                          "tests*")),
          python_requires=">=3.8",
          install_requires=get_requirements(),
          ext_modules=ext_modules,
          extras_require={
              "tensorizer": ["tensorizer==2.9.0"],
          },
          cmdclass={"build_ext": BuildExtension} if not _is_neuron() else {},
          package_data=package_data,
      )
    • vllm
      • __init__.py
      """vLLM: a high-throughput and memory-efficient inference engine for LLMs"""
      import vllm_npu
      from vllm.engine.arg_utils import AsyncEngineArgs, EngineArgs
      from vllm.engine.async_llm_engine import AsyncLLMEngine
      from vllm.engine.llm_engine import LLMEngine
      from vllm.entrypoints.llm import LLM
      from vllm.executor.ray_utils import initialize_ray_cluster
      from vllm.model_executor.models import ModelRegistry
      from vllm.outputs import CompletionOutput, RequestOutput
      from vllm.sampling_params import SamplingParams
      __version__ = "0.4.2"
      __all__ = [
          "LLM",
          "ModelRegistry",
          "SamplingParams",
          "RequestOutput",
          "CompletionOutput",
          "LLMEngine",
          "EngineArgs",
          "AsyncLLMEngine",
          "AsyncEngineArgs",
          "initialize_ray_cluster",
      ]
  • vllm_npu
    • requirements.txt
      numpy
      decorator
      attrs
      psutil
      absl-py
      cloudpickle
      scipy
      tornado
      transformers
      accelerate
      pandas
    • setup.py
      import io
      import os
      import re
      from typing import List
      import setuptools
      ROOT_DIR = os.path.dirname(__file__)
      def get_path(*filepath) -> str:
          return os.path.join(ROOT_DIR, *filepath)
      def find_version(filepath: str):
          """Extract version information from the given filepath.
          """
          with open(filepath) as fp:
              version_match = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
                                        fp.read(), re.M)
              if version_match:
                  return version_match.group(1)
              raise RuntimeError("Unable to find version string.")
      def read_readme() -> str:
          """Read the README file."""
          return io.open(get_path("README.md"), "r", encoding="utf-8").read()
      def get_requirements() -> List[str]:
          """Get Python package dependencies from requirements.txt."""
          with open(get_path("requirements.txt")) as f:
              requirements = f.read().strip().split("\n")
          return requirements
      setuptools.setup(
          name="vllm_npu",
          version=find_version(get_path("vllm_npu", "__init__.py")),
          author="Huawei",
          license="Apache 2.0",
          description=("A high-throughput and memory-efficient inference and "
                       "serving engine for LLMs"),
          long_description=read_readme(),
          long_description_content_type="text/markdown",
          url="",
          project_urls={
              "Homepage": "",
              "Documentation": "",
          },
          classifiers=[
              "Programming Language :: Python :: 3.8",
              "Programming Language :: Python :: 3.9",
              "Programming Language :: Python :: 3.10",
              "Programming Language :: Python :: 3.11",
              "Topic :: Scientific/Engineering :: Artificial Intelligence",
          ],
          packages=setuptools.find_packages(exclude=("benchmarks", "examples", "tests")),
          python_requires=">=3.8",
          install_requires=get_requirements(),
      )
    • vllm_npu
      • attention
        • __init__.py
          from vllm_npu.attention.selector import get_attn_backend
          import vllm.attention.selector as selector
          import vllm.worker.model_runner as mr
          import vllm.worker.cache_engine as ce
          selector.get_attn_backend = get_attn_backend
          mr.get_attn_backend = get_attn_backend
          ce.get_attn_backend = get_attn_backend
        • backends.py
          # Part of codes in this file was copied from project [vLLM Team][vllm]
          from dataclasses import dataclass
          from typing import List, Optional, Tuple
          from vllm.attention.backends.abstract import (AttentionBackend,
                                                        AttentionMetadataPerStage)
          import torch
          def get_kv_cache_shape(
                  num_blocks: int,
                  block_size: int,
                  num_kv_heads: int,
                  head_size: int,
              ) -> Tuple[int, ...]:
                  return (2, num_blocks, block_size, num_kv_heads, head_size)
          class AscendAttentionBackend(AttentionBackend):
              @staticmethod
              def get_name() -> str:
                  return "ascend-attn-backend"
              @staticmethod
              def get_impl_cls():
                  return None
              @staticmethod
              def make_metadata(*args, **kwargs) -> "AttentionMetadata":
                  return AttentionMetadata(*args, **kwargs)
              @staticmethod
              def get_kv_cache_shape(
                  num_blocks: int,
                  block_size: int,
                  num_kv_heads: int,
                  head_size: int,
              ) -> Tuple[int, ...]:
                  return get_kv_cache_shape(num_blocks, block_size, num_kv_heads, head_size)
              @staticmethod
              def swap_blocks(
                  src_kv_cache: torch.Tensor,
                  dst_kv_cache: torch.Tensor,
                  src_to_dst: torch.Tensor,
              ) -> None:
                  pass
              @staticmethod
              def copy_blocks(
                  kv_caches: List[torch.Tensor],
                  src_to_dists: torch.Tensor,
              ) -> None:
                  pass
          @dataclass
          class AttentionMetadata(AttentionMetadataPerStage):
              """Metadata for AscendAttentionBackend.
              """
              # Currently, input sequences can only contain all prompts
              # or all decoding. True if all sequences are prompts.
              is_prompt: bool
              # (batch_size,). The sequence length per sequence. Sequence length means
              # the computed tokens + new tokens None if it is a decoding.
              seq_lens: Optional[List[int]]
              # seq_lens stored as a tensor.
              seq_lens_tensor: Optional[torch.Tensor]
              # Maximum query length in the batch.
              max_query_len: Optional[int]
              # Maximum sequence length in the batch.
              max_seq_len: Optional[int]
              # (batch_size + 1,). The cumulative subquery lengths of the sequences in
              # the batch, used to index into subquery. E.g., if the subquery length
              # is [4, 6], it is [0, 4, 10].
              subquery_start_loc: Optional[torch.Tensor]
              # (batch_size + 1,). The cumulative sequence lengths of the sequences in
              # the batch, used to index into sequence. E.g., if the sequence length is
              # [4, 6], it is [0, 4, 10].
              seq_start_loc: Optional[torch.Tensor]
              # (batch_size,) A tensor of context lengths (tokens that are computed
              # so far).
              context_lens_tensor: Optional[torch.Tensor]
              block_tables: Optional[torch.Tensor]
              # Whether or not if cuda graph is enabled.
              use_cuda_graph: bool
        • selector.py
        # Part of codes in this file was copied from project [vLLM Team][vllm]
        from functools import lru_cache
        from typing import Type
        import torch
        from vllm.attention.backends.abstract import AttentionBackend
        from vllm.logger import init_logger
        logger = init_logger(__name__)
        @lru_cache(maxsize=None)
        def get_attn_backend(dtype: torch.dtype) -> Type[AttentionBackend]:
            logger.info("Using Ascend backend.")
            from vllm_npu.attention.backends import AscendAttentionBackend
            return AscendAttentionBackend
      • core
        • __init__.py: 空白文件
      • engine
        • __init__.py
          from vllm.engine.llm_engine import LLMEngine
          from vllm.engine.async_llm_engine import AsyncLLMEngine
          from vllm_npu.engine.ascend_engine import from_engine_args
          from vllm_npu.engine.async_ascend_engine import from_engine_args_async
          LLMEngine.from_engine_args = from_engine_args
          AsyncLLMEngine.from_engine_args = from_engine_args_async
        • ascend_engine.py
          from vllm.engine.arg_utils import EngineArgs
          from vllm.usage.usage_lib import UsageContext
          from vllm_npu.executor.ray_utils import initialize_ray_cluster
          @classmethod
          def from_engine_args(
              cls,
              engine_args: EngineArgs,
              usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
          ) -> "LLMEngine":
              """Creates an LLM engine from the engine arguments."""
              # Create the engine configs.
              engine_config = engine_args.create_engine_config()
              # Initialize the cluster and specify the executor class.
              if engine_config.device_config.device_type == "neuron":
                  from vllm.executor.neuron_executor import NeuronExecutor
                  executor_class = NeuronExecutor
              elif engine_config.device_config.device_type == "cpu":
                  from vllm.executor.cpu_executor import CPUExecutor
                  executor_class = CPUExecutor
              elif engine_config.device_config.device_type == "npu":
                  if engine_config.parallel_config.worker_use_ray:
                      initialize_ray_cluster(engine_config.parallel_config)
                      from vllm_npu.executor.ascend_ray_executor import RayAscendExecutor
                      executor_class = RayAscendExecutor
                  else:
                      from vllm_npu.executor.ascend_executor import AscendExecutor
                      executor_class = AscendExecutor
              elif engine_config.parallel_config.worker_use_ray:
                  initialize_ray_cluster(engine_config.parallel_config)
                  from vllm.executor.ray_gpu_executor import RayGPUExecutor
                  executor_class = RayGPUExecutor
              else:
                  if engine_config.parallel_config.world_size != 1:
                      raise ValueError("Ray is required if parallel_config.world_size > 1.")
                  from vllm.executor.gpu_executor import GPUExecutor
                  executor_class = GPUExecutor
              # Create the LLM engine.
              engine = cls(
                  **engine_config.to_dict(),
                  executor_class=executor_class,
                  log_stats=not engine_args.disable_log_stats,
                  usage_context=usage_context,
              )
              return engine
        • async_ascend_engine.py
          from vllm.engine.arg_utils import AsyncEngineArgs
          from vllm.usage.usage_lib import UsageContext
          from vllm_npu.executor.ray_utils import initialize_ray_cluster
          @classmethod
          def from_engine_args_async(
              cls,
              engine_args: AsyncEngineArgs,
              start_engine_loop: bool = True,
              usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
          ) -> "AsyncLLMEngine":
              """Creates an async LLM engine from the engine arguments."""
              # Create the engine configs.
              engine_config = engine_args.create_engine_config()
              if engine_config.device_config.device_type == "neuron":
                  from vllm.executor.neuron_executor import NeuronExecutorAsync
                  executor_class = NeuronExecutorAsync
              elif engine_config.device_config.device_type == "cpu":
                  if engine_config.parallel_config.worker_use_ray:
                      raise RuntimeError("Ray is not supported with the CPU backend.")
                  from vllm.executor.cpu_executor import CPUExecutorAsync
                  executor_class = CPUExecutorAsync
              elif engine_config.device_config.device_type == "npu":
                  if engine_config.parallel_config.worker_use_ray:
                      initialize_ray_cluster(engine_config.parallel_config)
                      from vllm_npu.executor.ascend_ray_executor import RayAscendExecutorAsync
                      executor_class = RayAscendExecutorAsync
                  else:
                      from vllm_npu.executor.ascend_executor import AscendExecutorAsync
                      executor_class = AscendExecutorAsync
              elif engine_config.parallel_config.worker_use_ray:
                  initialize_ray_cluster(engine_config.parallel_config)
                  from vllm.executor.ray_gpu_executor import RayGPUExecutorAsync
                  executor_class = RayGPUExecutorAsync
              else:
                  if engine_config.parallel_config.world_size != 1:
                      raise RuntimeError("Ray is required if parallel_config.world_size > 1.")
                  from vllm.executor.gpu_executor import GPUExecutorAsync
                  executor_class = GPUExecutorAsync
              # Create the async LLM engine.
              engine = cls(
                  engine_config.parallel_config.worker_use_ray,
                  engine_args.engine_use_ray,
                  **engine_config.to_dict(),
                  executor_class=executor_class,
                  log_requests=not engine_args.disable_log_requests,
                  log_stats=not engine_args.disable_log_stats,
                  max_log_len=engine_args.max_log_len,
                  start_engine_loop=start_engine_loop,
                  usage_context=usage_context,
              )
              return engine
      • executor
        • __init__.py
          from vllm_npu.executor.ray_utils import initialize_ray_cluster
          from vllm.executor import ray_utils
          ray_utils.initialize_ray_cluster = initialize_ray_cluster
        • ascend_executor.py
          # Part of codes in this file was copied from project [vLLM Team][vllm]
          from typing import List, Set, Tuple
          from vllm.executor.executor_base import ExecutorAsyncBase, ExecutorBase
          from vllm.logger import init_logger
          from vllm.lora.request import LoRARequest
          from vllm.sequence import ExecuteModelRequest, SamplerOutput
          from vllm.utils import (get_distributed_init_method, get_ip, get_open_port,
                                  make_async)
          from vllm_npu.worker.ascend_worker import AscendWorker
          logger = init_logger(__name__)
          class AscendExecutor(ExecutorBase):
              def _init_executor(self) -> None:
                  assert (not self.speculative_config
                      ), "Speculative decoding is not yet supported for Ascend backend."
                  # Instantiate the worker and load the model to the device.
                  self._init_worker()
              def _init_worker(self):
                  distributed_init_method = get_distributed_init_method(
                          get_ip(), get_open_port())
                  self.driver_worker = AscendWorker(
                      self.model_config,
                      self.parallel_config,
                      self.scheduler_config,
                      self.device_config,
                      self.cache_config,
                      self.load_config,
                      local_rank=0,
                      rank=0,
                      distributed_init_method=distributed_init_method,
                      lora_config=self.lora_config,
                      is_driver_worker=True,
                  )
                  self.driver_worker.init_device()
                  self.driver_worker.load_model()
              def determine_num_available_blocks(self) -> Tuple[int, int]:
                  """Determine the number of available KV blocks by invoking the
                  underlying worker.
                  """
                  return self.driver_worker.determine_num_available_blocks()
              def initialize_cache(self, num_gpu_blocks: int,
                                   num_cpu_blocks: int) -> None:
                  """Initialize the KV cache by invoking the underlying worker.
                  """
                  self.driver_worker.initialize_cache(num_gpu_blocks, num_cpu_blocks)
              def execute_model(
                      self,
                      execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
                  output = self.driver_worker.execute_model(execute_model_req)
                  return output
              def add_lora(self, lora_request: LoRARequest) -> bool:
                  return self.driver_worker.add_lora(lora_request)
              def remove_lora(self, lora_id: int) -> bool:
                  return self.driver_worker.remove_lora(lora_id)
              def list_loras(self) -> Set[int]:
                  return self.driver_worker.list_loras()
              def check_health(self) -> None:
                  # NeuronExecutor will always be healthy as long as
                  # it's running.
                  return
          class AscendExecutorAsync(AscendExecutor, ExecutorAsyncBase):
              async def execute_model_async(
                  self,
                  execute_model_req: ExecuteModelRequest,
              ) -> List[SamplerOutput]:
                  output = await make_async(
                      self.driver_worker.execute_model
                  )(execute_model_req=execute_model_req,)
                  return output
              # async def check_health_async(self) -> None:
              #     # AscendExecutor will always be healthy as long as
              #     # it's running.
              #     return
        • ascend_ray_executor.py
          import asyncio
          import os
          import pickle
          from collections import defaultdict
          from itertools import islice, repeat
          from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
          import vllm.envs as envs
          from vllm.executor.distributed_gpu_executor import (  # yapf: disable
              DistributedGPUExecutor, DistributedGPUExecutorAsync)
          from vllm.executor.ray_utils import RayWorkerWrapper, ray
          from vllm.logger import init_logger
          from vllm.sequence import ExecuteModelRequest, SamplerOutput
          from vllm.utils import (get_distributed_init_method, get_ip, get_open_port,
                                  get_vllm_instance_id, make_async)
          if ray is not None:
              from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
          if TYPE_CHECKING:
              from ray.util.placement_group import PlacementGroup
          logger = init_logger(__name__)
          USE_RAY_COMPILED_DAG = envs.VLLM_USE_RAY_COMPILED_DAG
          class RayAscendExecutor(DistributedGPUExecutor):
              def _init_executor(self) -> None:
                  assert (not self.speculative_config
                          ), "Speculative decoding not yet supported for RayNPU backend."
                  assert self.parallel_config.worker_use_ray
                  placement_group = self.parallel_config.placement_group
                  # Disable Ray usage stats collection.
                  ray_usage = os.environ.get("RAY_USAGE_STATS_ENABLED", "0")
                  if ray_usage != "1":
                      os.environ["RAY_USAGE_STATS_ENABLED"] = "0"
                  # Create the parallel GPU workers.
                  self._init_workers_ray(placement_group)
                  self.forward_dag = None
                  if USE_RAY_COMPILED_DAG:
                      self.forward_dag = self._compiled_ray_dag()
              def _init_workers_ray(self, placement_group: "PlacementGroup",
                                    **ray_remote_kwargs):
                  if self.parallel_config.tensor_parallel_size == 1:
                      # For single GPU case, we use a ray worker with constrained memory.
                      num_gpus = self.cache_config.gpu_memory_utilization
                  else:
                      # Otherwise, the ray workers are allocated with a full GPU.
                      num_gpus = 1
                  # The driver dummy worker does not actually use any resources.
                  # It holds the resource for the driver worker.
                  self.driver_dummy_worker: Optional[RayWorkerWrapper] = None
                  # The remaining workers are the actual ray actors.
                  self.workers: List[RayWorkerWrapper] = []
                  if self.parallel_config.ray_workers_use_nsight:
                      ray_remote_kwargs = self._configure_ray_workers_use_nsight(
                          ray_remote_kwargs)
                  # Create the workers.
                  driver_ip = get_ip()
                  for bundle_id, bundle in enumerate(placement_group.bundle_specs):
                      if not bundle.get("GPU", 0):
                          continue
                      scheduling_strategy = PlacementGroupSchedulingStrategy(
                          placement_group=placement_group,
                          placement_group_capture_child_tasks=True,
                          placement_group_bundle_index=bundle_id,
                      )
                      worker = ray.remote(
                          num_cpus=0,
                          num_gpus=num_gpus,
                          scheduling_strategy=scheduling_strategy,
                          **ray_remote_kwargs,
                      )(RayWorkerWrapper).remote(
                          worker_module_name="vllm_npu.worker.ascend_worker",
                          worker_class_name="AscendWorker",
                          trust_remote_code=self.model_config.trust_remote_code,
                      )
                      worker_ip = ray.get(worker.get_node_ip.remote())
                      if worker_ip == driver_ip and self.driver_dummy_worker is None:
                          # If the worker is on the same node as the driver, we use it
                          # as the resource holder for the driver process.
                          self.driver_dummy_worker = worker
                          self.driver_worker = RayWorkerWrapper(
                              worker_module_name="vllm_npu.worker.ascend_worker",
                              worker_class_name="AscendWorker",
                              trust_remote_code=self.model_config.trust_remote_code,
                          )
                      else:
                          # Else, added to the list of workers.
                          self.workers.append(worker)
                  if self.driver_dummy_worker is None:
                      raise ValueError(
                          "Ray does not allocate any GPUs on the driver node. Consider "
                          "adjusting the Ray placement group or running the driver on a "
                          "GPU node.")
                  # Get the set of GPU IDs used on each node.
                  worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids",
                                                              use_dummy_driver=True)
                  node_workers = defaultdict(list)
                  node_gpus = defaultdict(list)
                  for i, (node_id, gpu_ids) in enumerate(worker_node_and_gpu_ids):
                      node_workers[node_id].append(i)
                      node_gpus[node_id].extend(gpu_ids)
                  for node_id, gpu_ids in node_gpus.items():
                      node_gpus[node_id] = sorted(gpu_ids)
                  VLLM_INSTANCE_ID = get_vllm_instance_id()
                  # Set environment variables for the driver and workers.
                  all_args_to_update_environment_variables = [({
                      "CUDA_VISIBLE_DEVICES":
                      ",".join(map(str, node_gpus[node_id])),
                      "VLLM_INSTANCE_ID":
                      VLLM_INSTANCE_ID,
                      "VLLM_TRACE_FUNCTION":
                      str(envs.VLLM_TRACE_FUNCTION),
                  }, ) for (node_id, _) in worker_node_and_gpu_ids]
                  self._run_workers("update_environment_variables",
                                    all_args=all_args_to_update_environment_variables)
                  distributed_init_method = get_distributed_init_method(
                      driver_ip, get_open_port())
                  # Initialize the actual workers inside worker wrapper.
                  init_worker_all_kwargs = [
                      self._get_worker_kwargs(
                          local_rank=node_workers[node_id].index(rank),
                          rank=rank,
                          distributed_init_method=distributed_init_method,
                      ) for rank, (node_id, _) in enumerate(worker_node_and_gpu_ids)
                  ]
                  self._run_workers("init_worker", all_kwargs=init_worker_all_kwargs)
                  self._run_workers("init_device")
                  self._run_workers("load_model",
                                    max_concurrent_workers=self.parallel_config.
                                    max_parallel_loading_workers)
              def execute_model(
                      self,
                      execute_model_req: ExecuteModelRequest) -> List[SamplerOutput]:
                  all_outputs = self._run_workers(
                      "execute_model",
                      driver_kwargs={"execute_model_req": execute_model_req},
                      use_ray_compiled_dag=USE_RAY_COMPILED_DAG)
                  # Only the driver worker returns the sampling results.
                  return all_outputs[0]
              def _run_workers(
                  self,
                  method: str,
                  *args,
                  driver_args: Optional[Tuple[Any, ...]] = None,
                  driver_kwargs: Optional[Dict[str, Any]] = None,
                  all_args: Optional[List[Tuple[Any, ...]]] = None,
                  all_kwargs: Optional[List[Dict[str, Any]]] = None,
                  use_dummy_driver: bool = False,
                  max_concurrent_workers: Optional[int] = None,
                  use_ray_compiled_dag: bool = False,
                  **kwargs,
              ) -> Any:
                  """Runs the given method on all workers. Can be used in the following
                  ways:
                  - args/kwargs: All workers share the same args/kwargs
                  - args/kwargs and driver_args/driver_kwargs: Driver worker has
                    different args
                  - all_args/all_kwargs: args/kwargs for each worker are specified
                    individually
                  """
                  if max_concurrent_workers:
                      raise NotImplementedError(
                          "max_concurrent_workers is not supported yet.")
                  if driver_args is None:
                      driver_args = args if all_args is None else all_args[0]
                  if driver_kwargs is None:
                      driver_kwargs = kwargs if all_kwargs is None else all_kwargs[0]
                  count = len(self.workers)
                  all_worker_args = repeat(args, count) if all_args is None \
                      else islice(all_args, 1, None)
                  all_worker_kwargs = repeat(kwargs, count) if all_kwargs is None \
                      else islice(all_kwargs, 1, None)
                  if use_ray_compiled_dag:
                      assert self.forward_dag is not None
                      output_channels = self.forward_dag.execute(1)
                  else:
                      # Start the ray workers first.
                      ray_worker_outputs = [
                          worker.execute_method.remote(method, *worker_args,
                                                       **worker_kwargs)
                          for (worker, worker_args, worker_kwargs
                               ) in zip(self.workers, all_worker_args, all_worker_kwargs)
                      ]
                  # Start the driver worker after all the ray workers.
                  if not use_dummy_driver:
                      driver_worker_output = self.driver_worker.execute_method(
                          method, *driver_args, **driver_kwargs)
                  else:
                      assert self.driver_dummy_worker is not None
                      driver_worker_output = ray.get(
                          self.driver_dummy_worker.execute_method.remote(
                              method, *driver_args, **driver_kwargs))
                  # Get the results of the ray workers.
                  if self.workers:
                      if use_ray_compiled_dag:
                          try:
                              ray_worker_outputs = [
                                  pickle.loads(chan.begin_read())
                                  for chan in output_channels
                              ]
                          finally:
                              # Has to call end_read in order to reuse the DAG.
                              for chan in output_channels:
                                  chan.end_read()
                      else:
                          ray_worker_outputs = ray.get(ray_worker_outputs)
                  return [driver_worker_output] + ray_worker_outputs
              def _compiled_ray_dag(self):
                  import pkg_resources
                  required_version = "2.9"
                  current_version = pkg_resources.get_distribution("ray").version
                  if current_version < required_version:
                      raise ValueError(f"Ray version {required_version} or greater is "
                                       f"required, but found {current_version}")
                  from ray.dag import InputNode, MultiOutputNode
                  assert self.parallel_config.worker_use_ray
                  with InputNode() as input_data:
                      forward_dag = MultiOutputNode([
                          worker.execute_model_compiled_dag_remote.
                          bind(  # type: ignore[attr-defined]
                              input_data) for worker in self.workers
                      ])
                  return forward_dag.experimental_compile()
              def check_health(self) -> None:
                  """Raises an error if engine is unhealthy."""
                  self._check_if_any_actor_is_dead()
              def _check_if_any_actor_is_dead(self):
                  if not self.workers:
                      return
                  dead_actors = []
                  for actor in self.workers:
                      actor_state = ray.state.actors(actor._ray_actor_id.hex())  # pylint: disable=protected-access
                      if actor_state["State"] == "DEAD":
                          dead_actors.append(actor)
                  if dead_actors:
                      raise RuntimeError("At least one Worker is dead. "
                                         f"Dead Workers: {dead_actors}. ")
          class RayAscendExecutorAsync(RayAscendExecutor, DistributedGPUExecutorAsync):
              def __init__(self, *args, **kwargs):
                  super().__init__(*args, **kwargs)
                  self.driver_executor = make_async(self.driver_worker.execute_method)
              async def _run_workers_async(
                  self,
                  method: str,
                  *args,
                  driver_args: Optional[Tuple[Any, ...]] = None,
                  driver_kwargs: Optional[Dict[str, Any]] = None,
                  **kwargs,
              ) -> Any:
                  """Runs the given method on all workers."""
                  coros = []
                  if driver_args is None:
                      driver_args = args
                  if driver_kwargs is None:
                      driver_kwargs = kwargs
                  coros.append(
                      self.driver_executor(method, *driver_args, **driver_kwargs))
                  # Run the ray workers asynchronously.
                  for worker in self.workers:
                      coros.append(worker.execute_method.remote(method, *args, **kwargs))
                  all_outputs = await asyncio.gather(*coros)
                  return all_outputs
        • ray_utils.py
          from typing import Optional, Tuple, TYPE_CHECKING
          from vllm.config import ParallelConfig
          from vllm.utils import is_hip
          from vllm.logger import init_logger
          logger = init_logger(__name__)
          try:
              import ray
          except ImportError as e:
              logger.warning(f"Failed to import Ray with {e!r}. "
                             "For distributed inference, please install Ray with "
                             "`pip install ray`.")
              ray = None
          if TYPE_CHECKING:
              from ray.util.placement_group import PlacementGroup
          def initialize_ray_cluster(
              parallel_config: ParallelConfig,
              ray_address: Optional[str] = None,
          ):
              """Initialize the distributed cluster with Ray.
              it will connect to the Ray cluster and create a placement group
              for the workers, which includes the specification of the resources
              for each distributed worker.
              Args:
                  parallel_config: The configurations for parallel execution.
                  ray_address: The address of the Ray cluster. If None, uses
                      the default Ray cluster address.
              """
              if ray is None:
                  raise ImportError(
                      "Ray is not installed. Please install Ray to use multi-node "
                      "serving.")
              # Connect to a ray cluster.
              if is_hip():
                  ray.init(address=ray_address,
                           ignore_reinit_error=True,
                           num_gpus=parallel_config.world_size)
              else:
                  """start adapt"""
                  # without setting num_gpus, the function will try to detect num of 
                  # GPUs, but in ascend environment it may fail to detect gpus, thus
                  # needed to be manually setted.
                  ray.init(address=ray_address, ignore_reinit_error=True, 
                           num_gpus=parallel_config.world_size)
                  """end adapt"""
              if parallel_config.placement_group:
                  # Placement group is already set.
                  return
              # Create placement group for worker processes
              current_placement_group = ray.util.get_current_placement_group()
              if current_placement_group:
                  # We are in a placement group
                  bundles = current_placement_group.bundle_specs
                  # Verify that we can use the placement group.
                  gpu_bundles = 0
                  for bundle in bundles:
                      bundle_gpus = bundle.get("GPU", 0)
                      if bundle_gpus > 1:
                          raise ValueError(
                              "Placement group bundle cannot have more than 1 GPU.")
                      if bundle_gpus:
                          gpu_bundles += 1
                  if parallel_config.world_size > gpu_bundles:
                      raise ValueError(
                          "The number of required GPUs exceeds the total number of "
                          "available GPUs in the placement group.")
              else:
                  num_gpus_in_cluster = ray.cluster_resources().get("GPU", 0)
                  if parallel_config.world_size > num_gpus_in_cluster:
                      raise ValueError(
                          "The number of required GPUs exceeds the total number of "
                          "available GPUs in the cluster.")
                  # Create a new placement group
                  placement_group_specs = ([{"GPU": 1}] * parallel_config.world_size)
                  current_placement_group = ray.util.placement_group(
                      placement_group_specs)
                  # Wait until PG is ready - this will block until all
                  # requested resources are available, and will timeout
                  # if they cannot be provisioned.
                  ray.get(current_placement_group.ready(), timeout=1800)
              # Set the placement group in the parallel config
              parallel_config.placement_group = current_placement_group
      • model_executor
        • __init__.py
          import vllm.model_executor.model_loader as vllm_model_loader
          import vllm_npu.model_executor.ascend_model_loader as ascend_model_loader
          vllm_model_loader.get_architecture_class_name = ascend_model_loader.get_architecture_class_name
        • ascend_model_loader.py
          # Part of code in this file was copied from project [vLLM Team][vllm] for adapting usage
          import contextlib
          import torch
          import torch.nn as nn
          from vllm.config import DeviceConfig, ModelConfig, LoadConfig
          from vllm.model_executor.model_loader.weight_utils import initialize_dummy_weights
          from vllm_npu.model_executor.models.ascend.mindie_llm_wrapper import MindIELlmWrapper
          def get_architecture_class_name(model_config: ModelConfig) -> str:
              architectures = getattr(model_config.hf_config, "architectures", [])
              if (model_config.quantization is not None
                      and model_config.quantization != "fp8"
                      and "MixtralForCausalLM" in architectures):
                  architectures = ["QuantMixtralForCausalLM"]
              return architectures[0]
              
          @contextlib.contextmanager
          def _set_default_torch_dtype(dtype: torch.dtype):
              """Sets the default torch dtype to the given dtype."""
              old_dtype = torch.get_default_dtype()
              torch.set_default_dtype(dtype)
              yield
              torch.set_default_dtype(old_dtype)
          def get_model(model_config: ModelConfig, device_config: DeviceConfig,
                        load_config: LoadConfig, 
                        mindie_model_config, **kwargs) -> nn.Module:
              lora_config = kwargs.get("lora_config", None)
              model_class = MindIELlmWrapper
              # Get the (maybe quantized) linear method.
              linear_method = None
              with _set_default_torch_dtype(model_config.dtype):
                  # Create a model instance.
                  # The weights will be initialized as empty tensors.
                  with torch.device(device_config.device):
                      if hasattr(model_class, "supported_lora_modules"):
                          model = model_class(mindie_model_config, linear_method,
                                              lora_config)
                      elif lora_config:
                          raise ValueError(
                              f"Model {model_class.__name__} does not support LoRA, "
                              "but LoRA is enabled. Support for this model may "
                              "be added in the future. If this is important to you, "
                              "please open an issue on github.")
                      else:
                          model = model_class(mindie_model_config, linear_method)
                  if load_config.load_format == "dummy":
                      initialize_dummy_weights(model)
                  else:
                      # Load the weights from the cached or downloaded files.
                      model.load_weights(model_config.model, load_config.download_dir,
                                         load_config.load_format, model_config.revision)
                  model = model.npu()
              return model.eval()
        • layers
        • __init__.py:空白文件
        • ascend_sampler.py
          # Part of codes in this file was copied from project [vLLM Team][vllm]
          import random
          from typing import Dict, List, Optional, Tuple
          import torch
          import torch.nn as nn
          import numpy as np
          from vllm.sampling_params import SamplingType
          from vllm.sequence import SamplerOutput
          from vllm.model_executor.sampling_metadata import SamplingMetadata, SequenceGroupToSample
          from vllm.model_executor.layers.sampler import _get_logprobs, _build_sampler_output
          from mindie_llm.text_generator.utils.sampling_metadata import SamplingData, SamplingParam
          _SAMPLING_EPS = 1e-5
          SampleResultType = List[Tuple[List[int], List[int]]]
          def _to_tensor(data, dtype=None):
              if dtype:
                  return torch.tensor(data, dtype=dtype, device=torch.device("npu"))
              else:
                  return torch.tensor(data, device=torch.device("npu"))
          class AscendSampler(nn.Module):
              def __init__(self, mindie_model):
                  super().__init__()
                  self.mindie_model = mindie_model
                  self.include_gpu_probs_tensor = False
              def forward(
                  self,
                  logits: torch.Tensor,
                  sampling_metadata: SamplingMetadata,
              ) -> Optional[SamplerOutput]:
                  _, vocab_size = logits.shape
                  mindie_sampling_data, mindie_sampling_param = self.construct_data(sampling_metadata, vocab_size)
                  probs = torch.softmax(logits, dim=-1, dtype=torch.float)
                  logprobs = torch.log_softmax(logits, dim=-1, dtype=torch.float)
                  next_tokens = self.mindie_model.sample(
                      logits, 
                      sampling_data=mindie_sampling_data, 
                      sampling_param=mindie_sampling_param,
                  )
                  
                  sample_results, maybe_sampled_tokens_tensor = recover_data(
                      sampling_metadata=sampling_metadata, 
                      sampled_tokens=next_tokens, 
                      logprobs=logprobs, 
                      include_gpu_probs_tensor=self.include_gpu_probs_tensor,
                  )
                  if self.include_gpu_probs_tensor:
                      if maybe_sampled_tokens_tensor is None:
                          raise RuntimeError("maybe_sampled_tokens_tensor is None")
                      on_device_tensors = (probs, logprobs, maybe_sampled_tokens_tensor)
                  else:
                      on_device_tensors = None
                  # Get the logprobs query results.
                  prompt_logprobs, sample_logprobs = _get_logprobs(
                      logprobs, sampling_metadata, sample_results)
                  return _build_sampler_output(sample_results,
                                               sampling_metadata,
                                               prompt_logprobs,
                                               sample_logprobs,
                                               on_device_tensors=on_device_tensors)
              def construct_data(
                  self,
                  sampling_metadata: SamplingMetadata,
                  vocab_size: int,
              ) -> Tuple[SamplingData, SamplingParam]:
                  all_input_tokens: List[List[int]] = []
                  prompt_tokens: List[List[int]] = []
                  output_tokens: List[List[int]] = []
                  top_ks: List[int] = []
                  temperatures: List[float] = []
                  top_ps: List[float] = []
                  min_ps: List[float] = []
                  presence_penalties: List[float] = []
                  frequency_penalties: List[float] = []
                  repetition_penalties: List[float] = []
                  sampling_seeds: List[int] = []
                  sample_indices: List[int] = []
                  do_samples: List[bool] = []  # To Do
                  do_penalties = False
                  do_top_p_top_k = False
                  do_min_p = False
                  greedy_flag = False
                  
                  if sampling_metadata.seq_groups is None:
                      raise RuntimeError("sampling_metadata.seq_group is None, no data received.")
                  for seq_group in sampling_metadata.seq_groups:
                      do_samples.append(seq_group.do_sample)
                      seq_ids = seq_group.seq_ids
                      sampling_params = seq_group.sampling_params
                      temperature = sampling_params.temperature
                      p = sampling_params.presence_penalty
                      f = sampling_params.frequency_penalty
                      r = sampling_params.repetition_penalty
                      top_p = sampling_params.top_p
                      min_p = sampling_params.min_p
                      is_greedy = sampling_params.sampling_type == SamplingType.GREEDY
                      seed = sampling_params.seed
                      if seed is None:
                          if is_greedy:
                              seed = 0
                          else:
                              lo, hi = torch.iinfo(torch.long).min, torch.iinfo(torch.long).max
                              seed = random.randint(lo, hi)
                      if is_greedy:
                          greedy_flag = True
                      # k should not be greater than the vocab size.
                      top_k = min(sampling_params.top_k, vocab_size)
                      top_k = vocab_size if top_k == -1 else top_k
                      if temperature < _SAMPLING_EPS:
                          temperature = 1.0
                      if not do_top_p_top_k and (top_p < 1.0 - _SAMPLING_EPS
                                                 or top_k != vocab_size):
                          do_top_p_top_k = True
                      if not do_min_p and min_p > _SAMPLING_EPS:
                          do_min_p = True
                      if not do_penalties:
                          if abs(p) >= _SAMPLING_EPS:
                              do_penalties = True
                          elif abs(f) >= _SAMPLING_EPS:
                              do_penalties = True
                          elif abs(r - 1.0) >= _SAMPLING_EPS:
                              do_penalties = True
                      is_prompt = seq_group.is_prompt
                      if (seq_group.is_prompt
                              and sampling_params.prompt_logprobs is not None):
                          # For tokens in the prompt that we only need to get
                          # their logprobs
                          query_len = seq_group.query_len
                          if query_len is None:
                              raise RuntimeError("query_len is None")
                          prefill_len = len(seq_group.prompt_logprob_indices)
                          temperatures += [temperature] * prefill_len
                          sampling_seeds += [seed] * prefill_len
                          top_ps += [top_p] * prefill_len
                          top_ks += [top_k] * prefill_len
                          min_ps += [min_p] * prefill_len
                          presence_penalties += [0] * prefill_len
                          frequency_penalties += [0] * prefill_len
                          repetition_penalties += [1] * prefill_len
                          prompt_tokens.extend([] for _ in range(prefill_len))
                          output_tokens.extend([] for _ in range(prefill_len))
                          all_input_tokens.extend([] for _ in range(prefill_len))
                      if seq_group.do_sample:
                          sample_lens = len(seq_group.sample_indices)
                          if sample_lens != len(seq_ids):
                              raise ValueError("sample_lens != len(seq_ids)")
                          for seq_id in seq_ids:
                              seq_data = seq_group.seq_data[seq_id]
                              prompt_tokens.append(seq_data.prompt_token_ids)
                              output_tokens.append(seq_data.output_token_ids)
                              all_input_tokens.append(seq_data.prompt_token_ids + seq_data.output_token_ids)
                          temperatures += [temperature] * len(seq_ids)
                          sampling_seeds += [seed] * len(seq_ids)
                          top_ps += [top_p] * len(seq_ids)
                          top_ks += [top_k] * len(seq_ids)
                          min_ps += [min_p] * len(seq_ids)
                          presence_penalties += [p] * len(seq_ids)
                          frequency_penalties += [f] * len(seq_ids)
                          repetition_penalties += [r] * len(seq_ids)
                  repetition_penalties = np.array(repetition_penalties, dtype=np.float32)
                  frequency_penalties = np.array(frequency_penalties, dtype=np.float32)
                  presence_penalties = np.array(presence_penalties, dtype=np.float32)
                  temperatures = np.array(temperatures, dtype=np.float32)
                  top_ks = np.array(top_ks, dtype=np.int32)
                  top_ps = np.array(top_ps, dtype=np.float32)
                  sampling_seeds = np.array(sampling_seeds)
                  do_samples = np.array(do_samples)
                  max_tokens_len = max([len(tokens) for tokens in all_input_tokens], default=0)
                  padded_all_input_tokens = [
                      tokens + [vocab_size] * (max_tokens_len - len(tokens))
                      for tokens in all_input_tokens
                  ]
                  padded_all_input_tokens = np.array(padded_all_input_tokens, dtype=np.int32)
                  output_max_len = max([len(tokens) for tokens in output_tokens], default=0)
                  padded_output_tokens = [
                      tokens + [vocab_size] * (output_max_len - len(tokens))
                      for tokens in output_tokens
                  ]
                  padded_output_tokens = np.array(padded_output_tokens, dtype=np.int32)
                  all_input_ids_tensor = _to_tensor(
                      padded_all_input_tokens, 
                      torch.int32
                  ) if padded_all_input_tokens is not None else None
                  output_ids_tensor = _to_tensor(
                      padded_output_tokens, 
                      torch.int32
                  ) if padded_output_tokens is not None else None
                  mindie_sampling_data = SamplingData(
                      all_input_ids=all_input_ids_tensor, 
                      output_ids=output_ids_tensor
                  )
                  if greedy_flag:
                      mindie_sampling_param = None
                  else:
                      mindie_sampling_param = SamplingParam.from_numpy(
                          repetition_penalty=repetition_penalties,
                          frequency_penalty=frequency_penalties,
                          presence_penalty=presence_penalties,
                          temperature=temperatures,
                          top_k=top_ks,
                          top_p=top_ps,
                          seed=sampling_seeds,
                          do_sample=do_samples,
                          to_tensor=_to_tensor,
                      )
                  return (mindie_sampling_data, mindie_sampling_param)
          def recover_data(
              sampling_metadata: SamplingMetadata,
              sampled_tokens: np.ndarray,
              logprobs: torch.Tensor,
              include_gpu_probs_tensor: bool,
          ) -> Tuple[SampleResultType, Optional[torch.Tensor]]:
              categorized_seq_group_ids: Dict[SamplingType,
                                              List[int]] = {t: []
                                                            for t in SamplingType}
              categorized_sample_indices = sampling_metadata.categorized_sample_indices
              for i, seq_group in enumerate(sampling_metadata.seq_groups):
                  sampling_params = seq_group.sampling_params
                  sampling_type = sampling_params.sampling_type
                  categorized_seq_group_ids[sampling_type].append(i)
              sample_results_dict: Dict[int, Tuple[List[int], List[int]]] = {}
              sample_metadata = {}
              # Create output tensor for sampled token ids.
              if include_gpu_probs_tensor:
                  sampled_token_ids_tensor = torch.empty(logprobs.shape[0],
                                                         1,
                                                         dtype=torch.long,
                                                         device=logprobs.device)
              else:
                  sampled_token_ids_tensor = None
              for sampling_type in SamplingType:
                  sample_indices = categorized_sample_indices[sampling_type][:, 0]
                  num_tokens = len(sample_indices)
                  if num_tokens == 0:
                      continue
                  seq_group_id = categorized_seq_group_ids[sampling_type]
                  seq_groups = [sampling_metadata.seq_groups[i] for i in seq_group_id]
                  sample_metadata[sampling_type] = (seq_group_id, seq_groups)
              for sampling_type in SamplingType:
                  if sampling_type not in sample_metadata:
                      continue
                  (seq_group_id, seq_groups) = sample_metadata[sampling_type]
                  if sampling_type in (SamplingType.GREEDY, SamplingType.RANDOM, SamplingType.RANDOM_SEED):
                      sample_results = normal_wrap(seq_groups, sampled_tokens)
                  elif sampling_type == SamplingType.BEAM:
                      sample_results = beam_wrap(seq_groups, sampled_tokens)
                  sample_results_dict.update(zip(seq_group_id, sample_results))
              sample_results = [
                  sample_results_dict.get(i, ([], []))
                  for i in range(len(sampling_metadata.seq_groups))
              ]
              return sample_results, sampled_token_ids_tensor
          def normal_wrap(
              selected_seq_groups: List[SequenceGroupToSample],
              samples: np.ndarray,
          ):
              samples = samples.tolist()
              sample_idx = 0
              results: SampleResultType = []
              for seq_group in selected_seq_groups:
                  if not seq_group.do_sample:
                      results.append(([], []))
                      continue
                  seq_ids = seq_group.seq_ids
                  num_parent_seqs = len(seq_ids)
                  parent_ids = list(range(num_parent_seqs))
                  next_token_ids = [samples[sample_idx]]
                  results.append((next_token_ids, parent_ids))
                  sample_idx += num_parent_seqs
              return results
          def beam_wrap(
              selected_seq_groups: List[SequenceGroupToSample],
              samples: np.ndarray,
          ):
              raise ValueError(f"Unsupported sampling type: beam search")
        • models
        • __init__.py:空白文件
        • ascend
        • __init__.py:空白文件
        • mindie_llm_wrapper.py
          from typing import List, Optional
          import math
          import torch
          from torch import nn
          from vllm.model_executor import SamplingMetadata
          from vllm.sequence import SamplerOutput
          from vllm.attention import AttentionMetadata
          from mindie_llm.text_generator.adapter.generator_torch import GeneratorTorch
          from vllm_npu.worker.cache_engine import KVCache
          from vllm_npu.model_executor.layers.ascend_sampler import AscendSampler
          class MindIELlmWrapper(nn.Module):
              def __init__(self, mindie_model_config, linear_method=None, lora_config=None):
                  super(MindIELlmWrapper, self).__init__()
                  
                  self.mindie_model_config = mindie_model_config
                  self.rank = mindie_model_config['rank']
                  self.local_rank = mindie_model_config['local_rank']
                  self.npu_id = self.local_rank
                  self.world_size = mindie_model_config['world_size']
                  self.mindie_model = None
                  self.sampler = None
              def forward(
                      self,
                      input_ids: torch.Tensor,
                      positions: torch.Tensor,
                      kv_caches: List[KVCache],
                      attn_metadata: AttentionMetadata,
              ) -> torch.Tensor:
                  is_prompt = attn_metadata.num_prefill_tokens > 0
                  
                  if kv_caches[0][0] is None:
                      kv_caches, block_tables, slots = self.create_dummy_kv_cache(attn_metadata, input_ids)
                  else:
                      if is_prompt:
                          block_tables = torch.tensor([0], dtype=torch.int32, device="npu")
                      else:
                          block_tables = attn_metadata.decode_metadata.block_tables
                      slots = attn_metadata.slot_mapping
                  if is_prompt:
                      input_lengths = attn_metadata.prefill_metadata.seq_lens_tensor.to(torch.int32)
                      max_seq_len = int(attn_metadata.prefill_metadata.seq_lens_tensor.max())
                      lm_head_indices = (attn_metadata.prefill_metadata.seq_lens_tensor.cumsum(dim=-1) - 1).to(torch.int64)
                  else:
                      input_lengths = attn_metadata.decode_metadata.seq_lens_tensor
                      max_seq_len = attn_metadata.decode_metadata.max_seq_len
                      lm_head_indices = None
                  
                  logits = self.mindie_model.forward_tensor(input_ids, positions, is_prompt, kv_caches, block_tables, slots,
                                          input_lengths, max_seq_len, lm_head_indices)
                  return logits
              def compute_logits(self, hidden_states: torch.Tensor,
                                 sampling_metadata: SamplingMetadata) -> torch.Tensor:
                  return hidden_states
              def sample(
                  self,
                  logits: torch.Tensor,
                  sampling_metadata: SamplingMetadata,
              ) -> Optional[SamplerOutput]:
                  # hidden_states is logits
                  next_tokens = self.sampler(logits, sampling_metadata)
                  return next_tokens
              def load_weights(self,
                               model_name_or_path: str,
                               cache_dir: Optional[str] = None,
                               load_format: str = "auto",
                               revision: Optional[str] = None):
                  if load_format not in ['auto', 'safetensors', 'pt']:
                      raise ValueError('load-format support [safetensors, pt]')
                  self.weight_dtype = torch.get_default_dtype()
                  torch.set_default_dtype(torch.float32)
                  self.mindie_model = GeneratorTorch(self.mindie_model_config)
                  self.sampler = AscendSampler(self.mindie_model)
                  torch.set_default_dtype(self.weight_dtype)
              # when warmup, create dummy kvcache, block_tables, slot_mapping
              def create_dummy_kv_cache(self, attn_metadata, input_ids):
                  dummy_block_num = 1
                  dummy_block_size = 128
                  model_runner = self.mindie_model.model_wrapper.model_runner
                  kv_cache = [
                      (
                          torch.empty(
                              (dummy_block_num, dummy_block_size, model_runner.num_kv_heads, model_runner.head_size),
                              dtype=self.weight_dtype,
                              device="npu",
                          ),
                          torch.empty(
                              (dummy_block_num, dummy_block_size, model_runner.num_kv_heads, model_runner.head_size),
                              dtype=self.weight_dtype,
                              device="npu",
                          ),
                      )
                      for _ in range(model_runner.num_layers)
                  ]
                  max_s = max(attn_metadata.prefill_metadata.seq_lens_tensor)
                  max_need_block = math.ceil(max_s / dummy_block_size)
                  batch_size = len(attn_metadata.prefill_metadata.seq_lens_tensor)
                  block_tables = torch.zeros(batch_size, max_need_block, dtype=int, device="npu")
                  slot = [i for i in range(dummy_block_size)]
                  slots = []
                  warm_up_len = len(input_ids)
                  while warm_up_len > 0:
                      if warm_up_len > dummy_block_size:
                          slots.extend(slot)
                          warm_up_len -= dummy_block_size
                      else:
                          slots.extend(slot[:warm_up_len])
                          warm_up_len = 0
                  slots = torch.tensor(slots, dtype=torch.long, device="npu")
                  return kv_cache, block_tables, slots
      • usage
        • __init__.py
          import types
          from vllm.engine.llm_engine import usage_message
          import vllm_npu.usage.usage_lib as vllm_npu_usage_lib
          usage_message._report_usage_once = types.MethodType(vllm_npu_usage_lib._report_usage_once, usage_message)
        • usage_lib.py
          # Part of code in this file was copied from project [vLLM Team][vllm] for adapting usage
          import platform
          from typing import Any, Dict
          import cpuinfo
          import psutil
          import torch
          import vllm.envs as envs
          from vllm.usage.usage_lib import UsageContext, _detect_cloud_provider, _get_current_timestamp_ns
          def _report_usage_once(self, model_architecture: str,
                                     usage_context: UsageContext,
                                     extra_kvs: Dict[str, Any]) -> None:
              # Platform information
              if torch.npu.is_available():
                  device_property = torch.npu.get_device_properties()
                  self.gpu_count = torch.npu.device_count()
                  self.gpu_type = device_property.name
                  self.gpu_memory_per_device = device_property.total_memory
              self.provider = _detect_cloud_provider()
              self.architecture = platform.machine()
              self.platform = platform.platform()
              self.total_memory = psutil.virtual_memory().total
              info = cpuinfo.get_cpu_info()
              self.num_cpu = info.get("count", None)
              self.cpu_type = info.get("brand_raw", "")
              self.cpu_family_model_stepping = ",".join([
                  str(info.get("family", "")),
                  str(info.get("model", "")),
                  str(info.get("stepping", ""))
              ])
              # vLLM information
              import vllm  # delayed import to prevent circular import
              self.context = usage_context.value
              self.vllm_version = vllm.__version__
              self.model_architecture = model_architecture
              # Metadata
              self.log_time = _get_current_timestamp_ns()
              self.source = envs.VLLM_USAGE_SOURCE
              data = vars(self)
              if data["_report_usage_once"] is not None:
                  del data["_report_usage_once"]
              if extra_kvs:
                  data.update(extra_kvs)
              self._write_to_file(data)
              self._send_to_server(data)
      • worker
        • __init__.py
          from vllm_npu.worker.cache_engine import _allocate_kv_cache
          from vllm.worker import cache_engine
          cache_engine.CacheEngine._allocate_kv_cache = _allocate_kv_cache
        • ascend_model_runner.py
          # Part of codes in this file was copied from project [vLLM Team][vllm]
          import torch
          from typing import List, Optional, Tuple
          from vllm.logger import init_logger
          from vllm.sequence import (SamplerOutput, SequenceGroupMetadata)
          from vllm.sampling_params import SamplingParams
          from vllm.worker.model_runner import _prepare_fake_inputs, ModelRunner
          from vllm.utils import is_hip
          from vllm.lora.request import LoRARequest
          from vllm.config import (DeviceConfig, LoadConfig, LoRAConfig, ModelConfig,
                                   ParallelConfig, SchedulerConfig, VisionLanguageConfig)
          from vllm_npu.model_executor.ascend_model_loader import get_model
          logger = init_logger(__name__)
          LORA_WARMUP_RANK = 8
          class AscendModelRunner(ModelRunner):
              def __init__(
                  self,
                  model_config: ModelConfig,
                  parallel_config: ParallelConfig,
                  scheduler_config: SchedulerConfig,
                  device_config: DeviceConfig,
                  load_config: LoadConfig,
                  lora_config: Optional[LoRAConfig],
                  mindie_model_config,
                  kv_cache_dtype: Optional[str] = "auto",
                  is_driver_worker: bool = False,
                  vision_language_config: Optional[VisionLanguageConfig] = None,
              ):
                  super(AscendModelRunner, self).__init__(
                      model_config,
                      parallel_config,
                      scheduler_config,
                      device_config,
                      load_config,
                      lora_config,
                      kv_cache_dtype,
                      is_driver_worker,
                      vision_language_config
                  )
                  self.mindie_model_config = mindie_model_config
              
              def load_model(self) -> None:
                  self.model = get_model(
                      model_config=self.model_config,
                      device_config=self.device_config,
                      load_config=self.load_config,
                      mindie_model_config=self.mindie_model_config,
                  )
                  if self.kv_cache_dtype == "fp8" and is_hip():
                      # Currently scaled KV cache is only enabled on ROCm
                      if self.model_config.quantization_param_path is not None:
                          if callable(getattr(self.model, "load_kv_cache_scales", None)):
                              self.model.load_kv_cache_scales(
                                  self.model_config.quantization_param_path)
                          else:
                              raise RuntimeError(
                                  "Using FP8 KV cache and scaling factors provided but "
                                  "model %s does not support loading scaling factors.",
                                  self.model.__class__)
                      else:
                          logger.warning(
                              "Using FP8 KV cache but no scaling factors "
                              "provided. Defaulting to scaling factors of 1.0. "
                              "This may lead to less accurate results!")
                  elif self.model_config.quantization_param_path is not None:
                      logger.warning("KV cache scaling factors provided, "
                                     "but the KV cache data type is not FP8. "
                                     "KV cache scaling factors will not be used.")
              @torch.inference_mode()
              def execute_model(
                  self,
                  seq_group_metadata_list: List[SequenceGroupMetadata],
                  kv_caches: List[Tuple[torch.Tensor, torch.Tensor]],
              ) -> Optional[SamplerOutput]:
                  (input_tokens, input_positions, attn_metadata, sampling_metadata,
                      lora_requests, lora_mapping, multi_modal_input
                      ) = self.prepare_input_tensors(seq_group_metadata_list)
                  if self.lora_config:
                      self.set_active_loras(lora_requests, lora_mapping)
                  # Currently cuda graph is only supported by the decode phase.
                  prefill_meta = attn_metadata.prefill_metadata
                  decode_meta = attn_metadata.decode_metadata
                  model_executable = self.model
                  execute_model_kwargs = {
                      "input_ids": input_tokens,
                      "positions": input_positions,
                      "kv_caches": kv_caches,
                      "attn_metadata": attn_metadata,
                  }
                  hidden_states = model_executable(**execute_model_kwargs)
                  # Only perform sampling in the driver worker.
                  if not self.is_driver_worker:
                      return None
                  # Sample the next token.
                  output = self.model.sample(
                      logits=hidden_states,
                      sampling_metadata=sampling_metadata,
                  )
                  return output
              @torch.inference_mode()
              def profile_run(self) -> None:
                  # Enable top-k sampling to reflect the accurate memory usage.
                  sampling_params = SamplingParams(top_p=0.99, top_k=self.vocab_size - 1)
                  max_num_batched_tokens = self.scheduler_config.max_num_batched_tokens
                  max_num_seqs = self.scheduler_config.max_num_seqs
                  
                  dummy_lora_requests = []
                  dummy_lora_requests_per_seq = []
                  if self.lora_config:
                      assert self.lora_manager is not None
                      with self.lora_manager.dummy_lora_cache():
                          for idx in range(self.lora_config.max_loras):
                              lora_id = idx + 1
                              dummy_lora_request = LoRARequest(
                                  lora_name=f"warmup_{lora_id}",
                                  lora_int_id=lora_id,
                                  lora_local_path="/not/a/real/path",
                              )
                              self.lora_manager.add_dummy_lora(dummy_lora_request,
                                                                  rank=LORA_WARMUP_RANK)
                              dummy_lora_requests.append(dummy_lora_request)
                          dummy_lora_requests_per_seq = [
                              dummy_lora_requests[idx % len(dummy_lora_requests)]
                              for idx in range(max_num_seqs)
                          ]
                  seqs: List[SequenceGroupMetadata] = []
                  
                  if self.vision_language_config:
                      max_num_seqs = min(
                          max_num_seqs,
                          int(max_num_batched_tokens /
                              self.vision_language_config.image_feature_size))
                  for group_id in range(max_num_seqs):
                      seq_len = (max_num_batched_tokens // max_num_seqs +
                                  (group_id < max_num_batched_tokens % max_num_seqs))
                      seq_data, fake_multi_modal_input = _prepare_fake_inputs(
                          seq_len, self.vision_language_config)
                      seq = SequenceGroupMetadata(
                          request_id=str(group_id),
                          is_prompt=True,
                          seq_data={group_id: seq_data},
                          sampling_params=sampling_params,
                          block_tables=None,
                          lora_request=dummy_lora_requests_per_seq[group_id]
                          if dummy_lora_requests_per_seq else None,
                          multi_modal_data=fake_multi_modal_input,
                      )
                      seqs.append(seq)
                  # Run the model with the dummy inputs.
                  num_layers = self.model_config.get_num_layers(self.parallel_config)
                  kv_caches = [(None, None)] * num_layers
                  self.execute_model(seqs, kv_caches)
                  torch.npu.synchronize()
                  return
        • ascend_worker.py
          # Part of codes in this file was copied from project [vLLM Team][vllm]
          """A Ascend worker class."""
          import gc
          from typing import Dict, List, Tuple, Set, Optional, Any
          import torch
          import torch.distributed
          from vllm.config import (CacheConfig, DeviceConfig, ModelConfig, LoadConfig,
                                   ParallelConfig, SchedulerConfig, LoRAConfig, VisionLanguageConfig)
          from vllm.model_executor import set_random_seed
          from vllm.distributed import (broadcast_tensor_dict,
                                        ensure_model_parallel_initialized,
                                        init_distributed_environment)
          from vllm.sequence import SamplerOutput, ExecuteModelRequest
          from vllm.worker.cache_engine import CacheEngine
          from vllm.lora.request import LoRARequest
          from vllm.worker.worker_base import WorkerBase
          from vllm.worker.worker import raise_if_cache_size_invalid
          from vllm_npu.worker.ascend_model_runner import AscendModelRunner
          class AscendWorker(WorkerBase):
              """A worker class that executes the model on a group of Ascend NPUs.
              """
              def __init__(
                  self,
                  model_config: ModelConfig,
                  parallel_config: ParallelConfig,
                  scheduler_config: SchedulerConfig,
                  device_config: DeviceConfig,
                  cache_config: CacheConfig,
                  load_config: LoadConfig,
                  local_rank: int,
                  rank: int,
                  distributed_init_method: str,
                  lora_config: Optional[LoRAConfig] = None,
                  vision_language_config: Optional[VisionLanguageConfig] = None,
                  is_driver_worker: bool = False,
              ) -> None:
                  self.model_config = model_config
                  self.parallel_config = parallel_config
                  self.scheduler_config = scheduler_config
                  self.device_config = device_config
                  self.cache_config = cache_config
                  self.local_rank = local_rank
                  self.rank = rank
                  self.distributed_init_method = distributed_init_method
                  self.lora_config = lora_config
                  self.load_config = load_config
                  self.is_driver_worker = is_driver_worker
                  if self.is_driver_worker:
                      assert self.rank == 0, "The driver worker must have rank 0."
                  if self.model_config.trust_remote_code:
                      # note: lazy import to avoid importing torch before initializing
                      from vllm.utils import init_cached_hf_modules
                      init_cached_hf_modules()
                  self.vision_language_config = vision_language_config
                  mindie_model_config = {
                      'backend_type': 'atb',
                      'model_id': model_config.model,
                      'rank': rank,
                      'local_rank': local_rank,
                      'world_size': parallel_config.world_size,
                      'npu_device_id': local_rank,
                  }
                  self.model_runner = AscendModelRunner(
                      model_config,
                      parallel_config,
                      scheduler_config,
                      device_config,
                      load_config=load_config,
                      lora_config=self.lora_config,
                      kv_cache_dtype=self.cache_config.cache_dtype,
                      is_driver_worker=is_driver_worker,
                      mindie_model_config=mindie_model_config)
                  # Uninitialized cache engine. Will be initialized by
                  # self.initialize_cache().
                  self.cache_engine: CacheEngine
                  self.gpu_cache: List[torch.Tensor]
              def init_device(self) -> None:
                  self.device = torch.device(f"npu:{self.local_rank}")
                  torch.npu.set_device(self.device)
                  # Initialize the distributed environment.
                  init_worker_distributed_environment(self.parallel_config, self.rank,
                                                      self.distributed_init_method,
                                                      self.local_rank)
                  # Initialize the model.
                  set_random_seed(self.model_config.seed)
              def load_model(self):
                  self.model_runner.load_model()
              @torch.inference_mode()
              def determine_num_available_blocks(self) -> Tuple[int, int]:
                  """Profiles the peak memory usage of the model and returns the maximum
                  number of NPU and CPU cache blocks that can be allocated.
                  """
                  # Profile the memory usage of the model and get the maximum number of
                  # cache blocks that can be allocated with the remaining free memory.
                  torch.npu.empty_cache()
                  torch.npu.reset_peak_memory_stats()
                  # Execute a forward pass with dummy inputs to profile the memory usage
                  # of the model.
                  self.model_runner.profile_run()
                  block_size = self.cache_config.block_size
                  dummy_block_size = 128
                  dummy_num_blocks = dummy_block_size // block_size
                  # Calculate the number of blocks that can be allocated with the
                  # profiled peak memory.
                  torch.npu.synchronize()
                  peak_memory = torch.npu.max_memory_allocated()
                  total_gpu_memory = torch.npu.get_device_properties(self.rank).total_memory
                  cache_block_size = CacheEngine.get_cache_block_size(
                      self.cache_config, self.model_config, self.parallel_config)
                  num_gpu_blocks = int(
                      (total_gpu_memory * self.cache_config.gpu_memory_utilization - peak_memory) //
                      cache_block_size) + dummy_num_blocks
                  num_cpu_blocks = int(self.cache_config.swap_space_bytes // cache_block_size)
                  num_gpu_blocks = max(num_gpu_blocks, 0)
                  num_cpu_blocks = max(num_cpu_blocks, 0)
                  if self.model_runner.lora_manager:
                      self.model_runner.remove_all_loras()
                  gc.collect()
                  torch.npu.empty_cache()
                  return num_gpu_blocks, num_cpu_blocks
              def initialize_cache(self, num_gpu_blocks: int,
                                   num_cpu_blocks: int) -> None:
                  raise_if_cache_size_invalid(num_gpu_blocks,
                                              self.cache_config.block_size,
                                              self.model_config.max_model_len)
                  self.cache_config.num_gpu_blocks = num_gpu_blocks
                  self.cache_config.num_cpu_blocks = num_cpu_blocks
                  
                  self._init_cache_engine()
                  self._warm_up_model()
              def _init_cache_engine(self):
                  assert self.cache_config.num_gpu_blocks is not None
                  self.cache_engine = CacheEngine(self.cache_config, self.model_config,
                                                  self.parallel_config)
                  self.gpu_cache = self.cache_engine.gpu_cache
                  self.model_runner.set_block_size(self.cache_engine.block_size)
              def _warm_up_model(self) -> None:
                  pass
              def cache_swap(
                  self,
                  blocks_to_swap_in: Dict[int, int],
                  blocks_to_swap_out: Dict[int, int],
                  blocks_to_copy: Dict[int, List[int]],
              ) -> None:
                  if blocks_to_swap_in:
                      self.cache_engine.swap_in(blocks_to_swap_in)
                  if blocks_to_swap_out:
                      self.cache_engine.swap_out(blocks_to_swap_out)
                  if blocks_to_copy:
                      self.cache_engine.copy(blocks_to_copy)
              @torch.inference_mode()
              def execute_model(
                  self,
                  execute_model_req: Optional[ExecuteModelRequest] = None
              ) -> List[SamplerOutput]:
                  if execute_model_req is None:
                      seq_group_metadata_list = None
                  else:
                      seq_group_metadata_list = execute_model_req.seq_group_metadata_list
                  if self.is_driver_worker:
                      assert seq_group_metadata_list is not None
                      assert execute_model_req is not None
                      num_seq_groups = len(seq_group_metadata_list)
                      blocks_to_swap_in = execute_model_req.blocks_to_swap_in
                      blocks_to_swap_out = execute_model_req.blocks_to_swap_out
                      blocks_to_copy = execute_model_req.blocks_to_copy
                      data: Dict[str, Any] = {
                          "num_seq_groups": num_seq_groups,
                          "blocks_to_swap_in": blocks_to_swap_in,
                          "blocks_to_swap_out": blocks_to_swap_out,
                          "blocks_to_copy": blocks_to_copy,
                      }
                      broadcast_tensor_dict(data, src=0)
                  else:
                      data = broadcast_tensor_dict(src=0)
                      num_seq_groups = data["num_seq_groups"]
                      blocks_to_swap_in = data["blocks_to_swap_in"]
                      blocks_to_swap_out = data["blocks_to_swap_out"]
                      blocks_to_copy = data["blocks_to_copy"]
                  self.cache_swap(blocks_to_swap_in, blocks_to_swap_out, blocks_to_copy)
                  # If there is no input, we don't need to execute the model.
                  if num_seq_groups == 0:
                      return []
                  output = self.model_runner.execute_model(seq_group_metadata_list,
                                                           self.gpu_cache)
                  # Worker only supports single-step execution. Wrap the output in a list
                  # to conform to interface.
                  return [output]
              def add_lora(self, lora_request: LoRARequest) -> bool:
                  return self.model_runner.add_lora(lora_request)
              def remove_lora(self, lora_id: int) -> bool:
                  return self.model_runner.remove_lora(lora_id)
              def list_loras(self) -> Set[int]:
                  return self.model_runner.list_loras()
              def get_cache_block_size_bytes(self) -> int:
                  """Get the size of the KV cache block size in bytes.
                  """
                  return CacheEngine.get_cache_block_size(self.cache_config,
                                                          self.model_config,
                                                          self.parallel_config)
          def init_worker_distributed_environment(
              parallel_config: ParallelConfig,
              rank: int,
              distributed_init_method: Optional[str] = None,
              local_rank: int = -1,
          ) -> None:
              """Initialize the distributed environment."""
              init_distributed_environment(parallel_config.world_size, rank,
                                           distributed_init_method, local_rank)
              ensure_model_parallel_initialized(parallel_config.tensor_parallel_size,
                                                parallel_config.pipeline_parallel_size)
          def _check_if_gpu_supports_dtype(torch_dtype: torch.dtype):
              # Check if the GPU supports the dtype.
              if torch_dtype == torch.bfloat16:
                  compute_capability = torch.cuda.get_device_capability()
                  if compute_capability[0] < 8:
                      gpu_name = torch.cuda.get_device_name()
                      raise ValueError(
                          "Bfloat16 is only supported on GPUs with compute capability "
                          f"of at least 8.0. Your {gpu_name} GPU has compute capability "
                          f"{compute_capability[0]}.{compute_capability[1]}. "
                          "You can use float16 instead by explicitly setting the"
                          "`dtype` flag in CLI, for example: --dtype=half.")
        • cache_engine.py
          from typing import Tuple, List
          import torch
          KVCache = Tuple[torch.Tensor, torch.Tensor]
          def _allocate_kv_cache(
              self,
              num_blocks: int,
              device: str,
          ) -> List[KVCache]:
              """Allocates KV cache on the specified device."""
              kv_cache: List[KVCache] = []
              key_block_shape = (self.block_size, self.num_heads, self.head_size)
              value_block_shape = (self.block_size, self.num_heads, self.head_size)
              for _ in range(self.num_layers):
                  key_blocks = torch.empty(
                      size=(num_blocks, *key_block_shape),
                      dtype=self.dtype,
                      device=device,
                  )
                  value_blocks = torch.empty(
                      size=(num_blocks, *value_block_shape),
                      dtype=self.dtype,
                      device=device,
                  )
                  kv_cache.append((key_blocks, value_blocks))
              return kv_cache
      • config.py
        import warnings
        import torch
        class DeviceConfig:
            def __init__(self, device: str = "auto") -> None:
                if device == "auto":
                    # Automated device type detection
                    if getattr(torch.version, "cann", None) is not None:
                        self.device_type = "npu" 
                    else:
                        warnings.warn(
                            "Failed to detect cann in your environment. \
                            Please check whether you have installed cann correctly. \
                            Now the device type for processing input is set to cpu."
                        )
                        self.device_type = "cpu"
                else:
                    # Device type is assigned explicitly
                    self.device_type = device
                self.device = torch.device(self.device_type)
      • utils.py
        # Part of codes in this file was copied from project [vLLM Team][vllm]
        import socket
        import warnings
        from functools import lru_cache
        import vllm.envs as envs
        @lru_cache(maxsize=None)
        def is_ascend() -> bool:
            try:
                import torch_npu
            except ImportError:
                torch_npu = None
            return torch_npu is not None
        def get_ip() -> str:
            host_ip = envs.VLLM_HOST_IP
            if host_ip:
                return host_ip
            # IP is not set, try to get it from the network interface
            # try ipv4
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            try:
                s.connect(("localhost", 80))  # Doesn't need to be reachable
                socket_name = s.getsockname()[0]
                s.close()
                return socket_name
            except Exception:
                warnings.warn("Encounted with connection errors. Using 0.0.0.0 by default.")
                s.close()
            # try ipv6
            try:
                s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
                # Google's public DNS server, see
                # https://developers.google.com/speed/public-dns/docs/using#addresses
                s.connect(("localhost", 80))  # Doesn't need to be reachable
                socket_name = s.getsockname()[0]
                s.close()
                return socket_name
            except Exception:
                warnings.warn("Encounted with connection errors. Using 0.0.0.0 by default.")
                s.close()
            s.close()
            warnings.warn(
                "Failed to get the IP address, using 0.0.0.0 by default."
                "The value can be set by the environment variable"
                " VLLM_HOST_IP or HOST_IP.",
                stacklevel=2)
            return "0.0.0.0"
      • npu_adaptor.py
        import importlib
        import sys
        def replace_modules(old_module_name, new_module_name):
            if old_module_name in sys.modules:
                del sys.modules[old_module_name]
            sys.modules[old_module_name] = importlib.import_module(new_module_name)
        _default_ops = (
            'xformers',
            'xformers.ops',
            'vllm._C',
            'xformers.ops.fmha.attn_bias',
            'vllm.model_executor.layers.ops.sample'
        )
        for _ops in _default_ops:
            replace_modules(_ops, 'vllm_npu')
        # dummy class to avoid import error.
        class ops:
            pass
        class cuda_utils:
            pass
        class cache_ops:
            pass
        class BlockDiagonalCausalMask:
            pass
        class LowerTriangularMaskWithTensorBias:
            pass
        def context_attention_fwd():
            pass
        def get_num_triton_sampler_splits():
            pass
        def sample():
            pass
      • __init__.py
        import torch
        import torch_npu
        from torch_npu.contrib import transfer_to_npu
        from vllm_npu.npu_adaptor import (BlockDiagonalCausalMask,
                                          LowerTriangularMaskWithTensorBias,
                                          cache_ops, cuda_utils, ops,
                                          context_attention_fwd,
                                          get_num_triton_sampler_splits, sample)
        import vllm_npu.core
        import vllm_npu.engine
        import vllm_npu.worker
        import vllm_npu.model_executor
        import vllm_npu.executor
        import vllm_npu.attention
        import vllm_npu.usage
        from vllm_npu.utils import get_ip
        from vllm_npu.config import DeviceConfig
        import vllm.utils as utils
        import vllm.executor.ray_utils as ray_utils
        import vllm.config as vconfig
        import vllm.engine.arg_utils as varg_utils
        utils.get_ip = get_ip
        ray_utils.get_ip = get_ip
        vconfig.DeviceConfig = DeviceConfig
        varg_utils.DeviceConfig = DeviceConfig
        __version__ = "0.4.2"
  • examples
    • start_server.sh
      #!/bin/bash
      # ATB加速库环境变量
      export ATB_LAYER_INTERNAL_TENSOR_REUSE=1
      export ATB_WORKSPACE_MEM_ALLOC_GLOBAL=1
      export ATB_OPERATION_EXECUTE_ASYNC=1
      export TASK_QUEUE_ENABLE=1
      export ATB_CONTENT_NCHW_TO_ND=1
      export ATB_CONTEXT_WORKSPACE_RING=1
      export HCCL_BUFFSIZE=120
      export LCCL_DETERMINISTIC=1
      export HCCL_DETERMINISTIC=true
      export ATB_OPSRUNNER_KERNEL_CACHE_LOCAL_COUNT=8
      export ATB_OPSRUNNER_KERNEL_CACHE_GLOABL_COUNT=16
      export ATB_LAUNCH_KERNEL_WITH_TILING=0
      export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors
      python -m vllm.entrypoints.openai.api_server --model=facebook/opt-125m --trust-remote-code --enforce-eager --worker-use-ray
    • test_offline.py
      from vllm import LLM, SamplingParams
      import json
      import argparse
      parser = argparse.ArgumentParser()
      parser.add_argument('--model_path', type=str, default="facebook/opt-125m")
      # input prompts for test
      prompts = [
          "Hello, my name is",
          "The president of the United States is",
          "The capital of France is",
          "The future of AI is",
      ]
      sampling_params = SamplingParams(max_tokens=512, temperature=0)
      args = parser.parse_args()
      model_path = args.model_path
      llm = LLM(model=model_path,
                block_size=128,
                max_model_len=4096, # max length of prompt
                tensor_parallel_size=8, # number of NPUs to be used
                max_num_seqs=256, # max batch number
                enforce_eager=True, # disable CUDA graph mode
                trust_remote_code=True, # If the model is a custom model not yet available in the HuggingFace transformers library
      )
      outputs = llm.generate(prompts, sampling_params)
      for i, output in enumerate(outputs):
          prompt = output.prompt
          generated_text = output.outputs[0].text
          print(f"req_num: {i}\nPrompt: {prompt!r}\nGenerated text: {generated_text!r}")
    • test_offline.sh
      #!/bin/bash
      # ATB加速环境变量
      export ATB_LAYER_INTERNAL_TENSOR_REUSE=1
      export ATB_WORKSPACE_MEM_ALLOC_GLOBAL=1
      export ATB_OPERATION_EXECUTE_ASYNC=1
      export TASK_QUEUE_ENABLE=1
      export ATB_CONTENT_NCHW_TO_ND=1
      export ATB_CONTEXT_WORKSPACE_RING=1
      export HCCL_BUFFSIZE=120
      export LCCL_DETERMINISTIC=1
      export HCCL_DETERMINISTIC=true
      export ATB_OPSRUNNER_KERNEL_CACHE_LOCAL_COUNT=8
      export ATB_OPSRUNNER_KERNEL_CACHE_GLOABL_COUNT=16
      export ATB_LAUNCH_KERNEL_WITH_TILING=0
      export VLLM_TARGET_DEVICE="npu" # add environment variable to use npu device
      export VLLM_NO_USAGE_STATS=1 # close vllm usage messages to avoid errors
      python3 test_offline.py --model_path facebook/opt-125m
  • install.sh
    #!/bin/bash
    if [ -d "./vllm" ]; then
        echo "./vllm directory has already exist!"
        exit 1
    fi
    git clone -b v0.4.2 https://github.com/vllm-project/vllm.git vllm
    cp -r cover/* vllm/
    cd vllm
    pip install -r requirements-ascend.txt
    python3 setup.py install
    cd ../vllm_npu
    pip install -r requirements.txt
    python3 setup.py install

    将上述所有代码内容安装项目的目录结构完全还原后,进入项目根目录执行如下脚本,即可完成vLLM昇腾适配版的安装。

    bash install.sh