Skip to content

vllm.v1.worker.cpu_model_runner

logger module-attribute

logger = init_logger(__name__)

CPUModelRunner

Bases: GPUModelRunner

Source code in vllm/v1/worker/cpu_model_runner.py
class CPUModelRunner(GPUModelRunner):
    def __init__(self, vllm_config: VllmConfig, device: torch.device):
        with _torch_cuda_wrapper():
            super().__init__(vllm_config, device)

        assert device == torch.device("cpu")
        assert self.speculative_config is None, "spec decode is not supported."

        self.use_cuda_graph = False
        self.cascade_attn_enabled = False

        self._postprocess_tensors()

    # Note: Remove the override after new attention backend finished
    def _may_reorder_batch(self, scheduler_output: "SchedulerOutput") -> None:
        if len(self.kv_cache_config.kv_cache_groups) > 1:
            raise ValueError(
                "Multiple KVCacheGroups is not"
                "currently supported with CPU model runner."
            )
        super()._may_reorder_batch(scheduler_output)

    def _postprocess_tensors(self) -> None:
        # Note: replace device tensors with cpu tensors
        def replace_tensor(obj: Any, cpu_attr_name: str, device_attr_name) -> None:
            cpu_tensor = getattr(obj, cpu_attr_name, None)
            device_tensor = getattr(obj, device_attr_name, None)
            if cpu_tensor is not None and device_tensor is not None:
                assert isinstance(cpu_tensor, torch.Tensor)
                assert isinstance(device_tensor, torch.Tensor)
                setattr(obj, device_attr_name, cpu_tensor)

        for v in vars(self).values():
            if isinstance(v, CpuGpuBuffer):
                v.gpu = v.cpu

        for k, v in vars(self.input_batch).items():
            if k.endswith("_cpu_tensor") and isinstance(v, torch.Tensor):
                replace_tensor(self.input_batch, k, k[:-11])

        for block_table in self.input_batch.block_table.block_tables:
            for v in vars(block_table).values():
                if isinstance(v, CpuGpuBuffer):
                    v.gpu = v.cpu

    def load_model(self, eep_scale_up: bool = False) -> None:
        logger.info("Starting to load model %s...", self.model_config.model)
        self.model = get_model(vllm_config=self.vllm_config)

        if self.lora_config:
            self.model = self.load_lora_model(self.model, self.vllm_config, self.device)

    def get_model(self) -> nn.Module:
        return self.model

    def warming_up_model(self) -> None:
        logger.info("Warming up model for the compilation...")
        # Only generate graph for the generic shape
        with _set_global_compilation_settings(self.vllm_config):
            self._dummy_run(
                min(
                    max(16, self.max_num_reqs),
                    self.scheduler_config.max_num_batched_tokens,
                )
            )

        logger.info("Warming up done.")

    def _init_device_properties(self) -> None:
        pass

    def _sync_device(self) -> None:
        pass

    def _to_list(self, sampled_token_ids: torch.Tensor) -> list[list[int]]:
        return sampled_token_ids.tolist()

    def get_dp_padding(self, num_tokens: int) -> tuple[int, Optional[torch.Tensor]]:
        # Note: For CPU backend, dp padding is not required for now.
        return 0, None

cascade_attn_enabled instance-attribute

cascade_attn_enabled = False

use_cuda_graph instance-attribute

use_cuda_graph = False

__init__

__init__(vllm_config: VllmConfig, device: device)
Source code in vllm/v1/worker/cpu_model_runner.py
def __init__(self, vllm_config: VllmConfig, device: torch.device):
    with _torch_cuda_wrapper():
        super().__init__(vllm_config, device)

    assert device == torch.device("cpu")
    assert self.speculative_config is None, "spec decode is not supported."

    self.use_cuda_graph = False
    self.cascade_attn_enabled = False

    self._postprocess_tensors()

_init_device_properties

_init_device_properties() -> None
Source code in vllm/v1/worker/cpu_model_runner.py
def _init_device_properties(self) -> None:
    pass

_may_reorder_batch

_may_reorder_batch(
    scheduler_output: SchedulerOutput,
) -> None
Source code in vllm/v1/worker/cpu_model_runner.py
def _may_reorder_batch(self, scheduler_output: "SchedulerOutput") -> None:
    if len(self.kv_cache_config.kv_cache_groups) > 1:
        raise ValueError(
            "Multiple KVCacheGroups is not"
            "currently supported with CPU model runner."
        )
    super()._may_reorder_batch(scheduler_output)

_postprocess_tensors

_postprocess_tensors() -> None
Source code in vllm/v1/worker/cpu_model_runner.py
def _postprocess_tensors(self) -> None:
    # Note: replace device tensors with cpu tensors
    def replace_tensor(obj: Any, cpu_attr_name: str, device_attr_name) -> None:
        cpu_tensor = getattr(obj, cpu_attr_name, None)
        device_tensor = getattr(obj, device_attr_name, None)
        if cpu_tensor is not None and device_tensor is not None:
            assert isinstance(cpu_tensor, torch.Tensor)
            assert isinstance(device_tensor, torch.Tensor)
            setattr(obj, device_attr_name, cpu_tensor)

    for v in vars(self).values():
        if isinstance(v, CpuGpuBuffer):
            v.gpu = v.cpu

    for k, v in vars(self.input_batch).items():
        if k.endswith("_cpu_tensor") and isinstance(v, torch.Tensor):
            replace_tensor(self.input_batch, k, k[:-11])

    for block_table in self.input_batch.block_table.block_tables:
        for v in vars(block_table).values():
            if isinstance(v, CpuGpuBuffer):
                v.gpu = v.cpu

_sync_device

_sync_device() -> None
Source code in vllm/v1/worker/cpu_model_runner.py
def _sync_device(self) -> None:
    pass

_to_list

_to_list(sampled_token_ids: Tensor) -> list[list[int]]
Source code in vllm/v1/worker/cpu_model_runner.py
def _to_list(self, sampled_token_ids: torch.Tensor) -> list[list[int]]:
    return sampled_token_ids.tolist()

get_dp_padding

get_dp_padding(
    num_tokens: int,
) -> tuple[int, Optional[Tensor]]
Source code in vllm/v1/worker/cpu_model_runner.py
def get_dp_padding(self, num_tokens: int) -> tuple[int, Optional[torch.Tensor]]:
    # Note: For CPU backend, dp padding is not required for now.
    return 0, None

get_model

get_model() -> Module
Source code in vllm/v1/worker/cpu_model_runner.py
def get_model(self) -> nn.Module:
    return self.model

load_model

load_model(eep_scale_up: bool = False) -> None
Source code in vllm/v1/worker/cpu_model_runner.py
def load_model(self, eep_scale_up: bool = False) -> None:
    logger.info("Starting to load model %s...", self.model_config.model)
    self.model = get_model(vllm_config=self.vllm_config)

    if self.lora_config:
        self.model = self.load_lora_model(self.model, self.vllm_config, self.device)

warming_up_model

warming_up_model() -> None
Source code in vllm/v1/worker/cpu_model_runner.py
def warming_up_model(self) -> None:
    logger.info("Warming up model for the compilation...")
    # Only generate graph for the generic shape
    with _set_global_compilation_settings(self.vllm_config):
        self._dummy_run(
            min(
                max(16, self.max_num_reqs),
                self.scheduler_config.max_num_batched_tokens,
            )
        )

    logger.info("Warming up done.")

_set_global_compilation_settings

_set_global_compilation_settings(config: VllmConfig)
Source code in vllm/v1/worker/cpu_model_runner.py
@contextmanager
def _set_global_compilation_settings(config: VllmConfig):
    import torch._inductor.config as torch_inductor_config

    inductor_config = config.compilation_config.inductor_compile_config
    # Note: The MKLDNN and CPPGEMM backend requires freezing parameters.
    freezing_value = torch_inductor_config.freezing
    try:
        if inductor_config.get("max_autotune", False):
            torch_inductor_config.freezing = True
        yield
    finally:
        torch_inductor_config.freezing = freezing_value

_torch_cuda_wrapper

_torch_cuda_wrapper()
Source code in vllm/v1/worker/cpu_model_runner.py
@contextmanager
def _torch_cuda_wrapper():
    class _EventPlaceholder:
        def __init__(self, *args, **kwargs) -> None:
            self.record = lambda: None
            self.synchronize = lambda: None

    class _StreamPlaceholder:
        def __init__(self, *args, **kwargs) -> None:
            pass

    cuda_event = torch.cuda.Event
    cuda_stream = torch.cuda.Stream
    try:
        torch.cuda.Event = _EventPlaceholder
        torch.cuda.Stream = _StreamPlaceholder
        yield
    finally:
        torch.cuda.Event = cuda_event
        torch.cuda.Stream = cuda_stream