Skip to content

vllm.model_executor.models.nano_nemotron_vl

DEFAULT_NUM_TILES module-attribute

DEFAULT_NUM_TILES = 12

IMG_CONTEXT module-attribute

IMG_CONTEXT = '<image>'

IMG_END module-attribute

IMG_END = '</img>'

IMG_START module-attribute

IMG_START = '<img>'

MAX_FRAMES module-attribute

MAX_FRAMES = 16

NanoNemotronVLImageInputs module-attribute

NanoNemotronVLVideoInputs module-attribute

_I module-attribute

BaseNanoNemotronVLProcessingInfo

Bases: BaseProcessingInfo

Basic image-only ProcessingInfo for InternVL-style models.

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class BaseNanoNemotronVLProcessingInfo(BaseProcessingInfo):
    """Basic image-only ProcessingInfo for InternVL-style models."""

    @abstractmethod
    def get_hf_processor(
        self,
        **kwargs: object,
    ) -> BaseNanoNemotronVLProcessor:
        raise NotImplementedError

    def get_supported_mm_limits(self) -> Mapping[str, Optional[int]]:
        return {"image": None}

    def get_num_image_tokens(
        self,
        *,
        image_width: int,
        image_height: int,
        max_num_tiles: int,
        processor: Optional[BaseNanoNemotronVLProcessor],
    ) -> int:
        if processor is None:
            processor = self.get_hf_processor()

        return processor.get_num_image_tokens(
            image_width=image_width,
            image_height=image_height,
            max_num_tiles=max_num_tiles,
        )

    def get_image_size_with_most_features(self, max_num_tiles: int) -> ImageSize:
        processor = self.get_hf_processor()

        base_size = processor.image_size
        target_ratios = get_internvl_target_ratios(1, max_num_tiles)

        largest_feature_size, largest_feature_pinpoint = 0, None
        for wr, hr in target_ratios:
            width, height = base_size * wr, base_size * hr

            feat_size = self.get_num_image_tokens(
                image_width=width,
                image_height=height,
                max_num_tiles=max_num_tiles,
                processor=processor,
            )
            if feat_size > largest_feature_size:
                largest_feature_size = feat_size
                largest_feature_pinpoint = ImageSize(width=width, height=height)

        if largest_feature_size == 0 or largest_feature_pinpoint is None:
            raise ValueError("Cannot have a largest feature size of 0!")

        return largest_feature_pinpoint

    def get_max_image_tokens(self) -> int:
        processor = self.get_hf_processor()
        # Use default max_num_tiles for max tokens calculation
        max_num_tiles = processor.max_num_tiles
        target_width, target_height = self.get_image_size_with_most_features(
            max_num_tiles
        )

        return self.get_num_image_tokens(
            image_width=target_width,
            image_height=target_height,
            max_num_tiles=max_num_tiles,
            processor=processor,
        )

get_hf_processor abstractmethod

get_hf_processor(
    **kwargs: object,
) -> BaseNanoNemotronVLProcessor
Source code in vllm/model_executor/models/nano_nemotron_vl.py
@abstractmethod
def get_hf_processor(
    self,
    **kwargs: object,
) -> BaseNanoNemotronVLProcessor:
    raise NotImplementedError

get_image_size_with_most_features

get_image_size_with_most_features(
    max_num_tiles: int,
) -> ImageSize
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_image_size_with_most_features(self, max_num_tiles: int) -> ImageSize:
    processor = self.get_hf_processor()

    base_size = processor.image_size
    target_ratios = get_internvl_target_ratios(1, max_num_tiles)

    largest_feature_size, largest_feature_pinpoint = 0, None
    for wr, hr in target_ratios:
        width, height = base_size * wr, base_size * hr

        feat_size = self.get_num_image_tokens(
            image_width=width,
            image_height=height,
            max_num_tiles=max_num_tiles,
            processor=processor,
        )
        if feat_size > largest_feature_size:
            largest_feature_size = feat_size
            largest_feature_pinpoint = ImageSize(width=width, height=height)

    if largest_feature_size == 0 or largest_feature_pinpoint is None:
        raise ValueError("Cannot have a largest feature size of 0!")

    return largest_feature_pinpoint

get_max_image_tokens

get_max_image_tokens() -> int
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_max_image_tokens(self) -> int:
    processor = self.get_hf_processor()
    # Use default max_num_tiles for max tokens calculation
    max_num_tiles = processor.max_num_tiles
    target_width, target_height = self.get_image_size_with_most_features(
        max_num_tiles
    )

    return self.get_num_image_tokens(
        image_width=target_width,
        image_height=target_height,
        max_num_tiles=max_num_tiles,
        processor=processor,
    )

get_num_image_tokens

get_num_image_tokens(
    *,
    image_width: int,
    image_height: int,
    max_num_tiles: int,
    processor: Optional[BaseNanoNemotronVLProcessor],
) -> int
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_num_image_tokens(
    self,
    *,
    image_width: int,
    image_height: int,
    max_num_tiles: int,
    processor: Optional[BaseNanoNemotronVLProcessor],
) -> int:
    if processor is None:
        processor = self.get_hf_processor()

    return processor.get_num_image_tokens(
        image_width=image_width,
        image_height=image_height,
        max_num_tiles=max_num_tiles,
    )

get_supported_mm_limits

get_supported_mm_limits() -> Mapping[str, Optional[int]]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_supported_mm_limits(self) -> Mapping[str, Optional[int]]:
    return {"image": None}

BaseNanoNemotronVLProcessor

Bases: ABC

This model doesn't define its own HF processor, so we implement our own one here.

The code to insert image tokens is based on: https://huggingface.co/OpenGVLab/InternVL2-1B/blob/main/modeling_internvl_chat.py#L252

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class BaseNanoNemotronVLProcessor(ABC):
    """
    This model doesn't define its own HF processor,
    so we implement our own one here.

    The code to insert image tokens is based on:
    https://huggingface.co/OpenGVLab/InternVL2-1B/blob/main/modeling_internvl_chat.py#L252
    """

    def __init__(
        self,
        config: PretrainedConfig,
        tokenizer: AnyTokenizer,
        *args,
        max_num_tiles: Optional[int] = None,
        **kwargs,
    ) -> None:
        super().__init__()

        self.config = config
        self.tokenizer = tokenizer

        self.max_num_tiles = max_num_tiles or DEFAULT_NUM_TILES
        image_size: int = config.force_image_size
        patch_size: int = config.patch_size

        self.num_image_token = int(
            (image_size // patch_size) ** 2 * (config.downsample_ratio**2)
        )
        self.image_size = image_size
        self.use_thumbnail: bool = config.use_thumbnail
        self.norm_mean = torch.Tensor(config.norm_mean).reshape(1, 3, 1, 1)
        self.norm_std = torch.Tensor(config.norm_std).reshape(1, 3, 1, 1)

    @property
    @abstractmethod
    def image_token_id(self) -> int:
        raise NotImplementedError

    @abstractmethod
    def get_image_repl(
        self,
        feature_size: int,
        num_patches: Optional[int],
    ) -> PromptUpdateDetails[str]:
        raise NotImplementedError

    def get_num_image_tokens(
        self,
        *,
        image_width: int,
        image_height: int,
        max_num_tiles: int,
    ) -> int:
        target_ratios = get_internvl_target_ratios(1, max_num_tiles)

        num_patches, _, _ = calculate_internvl_targets(
            orig_width=image_width,
            orig_height=image_height,
            target_ratios=target_ratios,
            image_size=self.image_size,
            use_thumbnail=self.use_thumbnail,
        )

        return num_patches * self.num_image_token

    def _images_to_pixel_values_lst(
        self,
        images: list[Image.Image],
        max_num_tiles: int,
    ) -> list[torch.Tensor]:
        return [
            image_to_pixel_values(
                image,
                input_size=self.image_size,
                max_num=max_num_tiles,
                use_thumbnail=self.use_thumbnail,
                idx=idx,
            )
            for idx, image in enumerate(images)
        ]

    def _preprocess_image(
        self,
        text: list[str],
        images: list[Image.Image],
        max_num_tiles: int,
    ) -> tuple[list[str], dict[str, torch.Tensor]]:
        if len(images) == 0:
            image_inputs = {}
        else:
            pixel_values_lst = self._images_to_pixel_values_lst(images, max_num_tiles)
            image_inputs = {
                "pixel_values_flat": torch.cat(pixel_values_lst),
                "image_num_patches": torch.tensor(
                    [len(item) for item in pixel_values_lst]
                ),
            }

            for pixel_values in pixel_values_lst:
                num_patches = pixel_values.shape[0]
                feature_size = num_patches * self.num_image_token
                image_repl = self.get_image_repl(feature_size, num_patches)
                text = [t.replace("<image>", image_repl.full, 1) for t in text]
        return text, image_inputs

    def _make_batch_input(self, input_item: Optional[Union[Any, list[Any]]] = None):
        if input_item is None:
            input_item = []
        if not isinstance(input_item, list):
            input_item = [input_item]
        return input_item

    def __call__(
        self,
        text: Optional[Union[str, list[str]]] = None,
        images: Optional[Union[Image.Image, list[Image.Image]]] = None,
        return_tensors: Optional[Union[str, TensorType]] = None,
        max_num_tiles: Optional[int] = None,
    ) -> BatchFeature:
        # Use default if not provided
        if max_num_tiles is None:
            max_num_tiles = self.max_num_tiles

        text, images = [self._make_batch_input(x) for x in (text, images)]

        text, image_inputs = self._preprocess_image(
            text=text,
            images=images,
            max_num_tiles=max_num_tiles,
        )

        text_inputs = self.tokenizer(text, add_special_tokens=False)

        combined_outputs = {**text_inputs, **image_inputs}

        return BatchFeature(combined_outputs, tensor_type=return_tensors)

config instance-attribute

config = config

image_size instance-attribute

image_size = image_size

image_token_id abstractmethod property

image_token_id: int

max_num_tiles instance-attribute

max_num_tiles = max_num_tiles or DEFAULT_NUM_TILES

norm_mean instance-attribute

norm_mean = reshape(1, 3, 1, 1)

norm_std instance-attribute

norm_std = reshape(1, 3, 1, 1)

num_image_token instance-attribute

num_image_token = int(
    (image_size // patch_size) ** 2 * downsample_ratio**2
)

tokenizer instance-attribute

tokenizer = tokenizer

use_thumbnail instance-attribute

use_thumbnail: bool = use_thumbnail

__call__

__call__(
    text: Optional[Union[str, list[str]]] = None,
    images: Optional[Union[Image, list[Image]]] = None,
    return_tensors: Optional[Union[str, TensorType]] = None,
    max_num_tiles: Optional[int] = None,
) -> BatchFeature
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def __call__(
    self,
    text: Optional[Union[str, list[str]]] = None,
    images: Optional[Union[Image.Image, list[Image.Image]]] = None,
    return_tensors: Optional[Union[str, TensorType]] = None,
    max_num_tiles: Optional[int] = None,
) -> BatchFeature:
    # Use default if not provided
    if max_num_tiles is None:
        max_num_tiles = self.max_num_tiles

    text, images = [self._make_batch_input(x) for x in (text, images)]

    text, image_inputs = self._preprocess_image(
        text=text,
        images=images,
        max_num_tiles=max_num_tiles,
    )

    text_inputs = self.tokenizer(text, add_special_tokens=False)

    combined_outputs = {**text_inputs, **image_inputs}

    return BatchFeature(combined_outputs, tensor_type=return_tensors)

__init__

__init__(
    config: PretrainedConfig,
    tokenizer: AnyTokenizer,
    *args,
    max_num_tiles: Optional[int] = None,
    **kwargs,
) -> None
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def __init__(
    self,
    config: PretrainedConfig,
    tokenizer: AnyTokenizer,
    *args,
    max_num_tiles: Optional[int] = None,
    **kwargs,
) -> None:
    super().__init__()

    self.config = config
    self.tokenizer = tokenizer

    self.max_num_tiles = max_num_tiles or DEFAULT_NUM_TILES
    image_size: int = config.force_image_size
    patch_size: int = config.patch_size

    self.num_image_token = int(
        (image_size // patch_size) ** 2 * (config.downsample_ratio**2)
    )
    self.image_size = image_size
    self.use_thumbnail: bool = config.use_thumbnail
    self.norm_mean = torch.Tensor(config.norm_mean).reshape(1, 3, 1, 1)
    self.norm_std = torch.Tensor(config.norm_std).reshape(1, 3, 1, 1)

_images_to_pixel_values_lst

_images_to_pixel_values_lst(
    images: list[Image], max_num_tiles: int
) -> list[Tensor]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _images_to_pixel_values_lst(
    self,
    images: list[Image.Image],
    max_num_tiles: int,
) -> list[torch.Tensor]:
    return [
        image_to_pixel_values(
            image,
            input_size=self.image_size,
            max_num=max_num_tiles,
            use_thumbnail=self.use_thumbnail,
            idx=idx,
        )
        for idx, image in enumerate(images)
    ]

_make_batch_input

_make_batch_input(
    input_item: Optional[Union[Any, list[Any]]] = None,
)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _make_batch_input(self, input_item: Optional[Union[Any, list[Any]]] = None):
    if input_item is None:
        input_item = []
    if not isinstance(input_item, list):
        input_item = [input_item]
    return input_item

_preprocess_image

_preprocess_image(
    text: list[str], images: list[Image], max_num_tiles: int
) -> tuple[list[str], dict[str, Tensor]]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _preprocess_image(
    self,
    text: list[str],
    images: list[Image.Image],
    max_num_tiles: int,
) -> tuple[list[str], dict[str, torch.Tensor]]:
    if len(images) == 0:
        image_inputs = {}
    else:
        pixel_values_lst = self._images_to_pixel_values_lst(images, max_num_tiles)
        image_inputs = {
            "pixel_values_flat": torch.cat(pixel_values_lst),
            "image_num_patches": torch.tensor(
                [len(item) for item in pixel_values_lst]
            ),
        }

        for pixel_values in pixel_values_lst:
            num_patches = pixel_values.shape[0]
            feature_size = num_patches * self.num_image_token
            image_repl = self.get_image_repl(feature_size, num_patches)
            text = [t.replace("<image>", image_repl.full, 1) for t in text]
    return text, image_inputs

get_image_repl abstractmethod

get_image_repl(
    feature_size: int, num_patches: Optional[int]
) -> PromptUpdateDetails[str]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
@abstractmethod
def get_image_repl(
    self,
    feature_size: int,
    num_patches: Optional[int],
) -> PromptUpdateDetails[str]:
    raise NotImplementedError

get_num_image_tokens

get_num_image_tokens(
    *,
    image_width: int,
    image_height: int,
    max_num_tiles: int,
) -> int
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_num_image_tokens(
    self,
    *,
    image_width: int,
    image_height: int,
    max_num_tiles: int,
) -> int:
    target_ratios = get_internvl_target_ratios(1, max_num_tiles)

    num_patches, _, _ = calculate_internvl_targets(
        orig_width=image_width,
        orig_height=image_height,
        target_ratios=target_ratios,
        image_size=self.image_size,
        use_thumbnail=self.use_thumbnail,
    )

    return num_patches * self.num_image_token

NanoNemotronBaseVLMultiModalProcessor

Bases: BaseMultiModalProcessor[_I]

Basic image-only MultiModalProcessor for InternVL-style models.

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronBaseVLMultiModalProcessor(BaseMultiModalProcessor[_I]):
    """Basic image-only MultiModalProcessor for InternVL-style models."""

    def _call_hf_processor(
        self,
        prompt: str,
        mm_data: Mapping[str, object],
        mm_kwargs: Mapping[str, object],
        tok_kwargs: Mapping[str, object],
    ) -> BatchFeature:
        processed_outputs = super()._call_hf_processor(
            prompt=prompt,
            mm_data=mm_data,
            mm_kwargs=mm_kwargs,
            tok_kwargs=tok_kwargs,
        )

        hf_processor = self.info.get_hf_processor(**mm_kwargs)
        image_token_id = hf_processor.image_token_id

        # Since there may be extra tokens in the feature placeholders,
        # we need to pass the image token ID to the model to select the
        # tokens to merge from the vision encoder outputs
        processed_outputs["image_token_id"] = torch.tensor(image_token_id)

        return processed_outputs

    def _get_mm_fields_config(
        self,
        hf_inputs: BatchFeature,
        hf_processor_mm_kwargs: Mapping[str, object],
    ) -> Mapping[str, MultiModalFieldConfig]:
        image_num_patches = hf_inputs.get("image_num_patches", torch.empty(0))
        num_images = len(image_num_patches)

        return dict(
            pixel_values_flat=MultiModalFieldConfig.flat_from_sizes(
                "image", image_num_patches
            ),
            image_num_patches=MultiModalFieldConfig.batched("image"),
            image_embeds=MultiModalFieldConfig.batched("image"),
            image_token_id=MultiModalFieldConfig.shared("image", num_images),
        )

    def _get_prompt_updates(
        self,
        mm_items: MultiModalDataItems,
        hf_processor_mm_kwargs: Mapping[str, object],
        out_mm_kwargs: MultiModalKwargs,
    ) -> Sequence[PromptUpdate]:
        hf_processor = self.info.get_hf_processor(**hf_processor_mm_kwargs)

        if "image_num_patches" in out_mm_kwargs:
            image_num_patches = out_mm_kwargs["image_num_patches"]
            assert isinstance(image_num_patches, torch.Tensor)
            image_num_patches = image_num_patches.tolist()
        elif "image_embeds" in out_mm_kwargs:
            # to compute num_patches (similar to Qwen2-VL)
            image_num_patches = [None] * len(out_mm_kwargs["image_embeds"])
        else:
            image_num_patches = []

        def get_replacement_custom(item_idx: int):
            images = mm_items.get_items(
                "image", (ImageEmbeddingItems, ImageProcessorItems)
            )

            if isinstance(images, ImageEmbeddingItems):
                feature_size = images.get_feature_size(item_idx)
            else:
                image_size = images.get_image_size(item_idx)
                # Extract max_num_tiles from kwargs, default to 12
                max_num_tiles = hf_processor_mm_kwargs.get(
                    "max_num_tiles", hf_processor.max_num_tiles
                )
                feature_size = self.info.get_num_image_tokens(
                    image_width=image_size.width,
                    image_height=image_size.height,
                    max_num_tiles=max_num_tiles,
                    processor=hf_processor,
                )

            num_patches = None
            local_image_num_patches = image_num_patches
            if isinstance(local_image_num_patches, torch.Tensor):
                local_image_num_patches = local_image_num_patches.tolist()
            if isinstance(local_image_num_patches, (list, tuple)) and item_idx < len(
                local_image_num_patches
            ):
                num_patches = int(local_image_num_patches[item_idx])

            return hf_processor.get_image_repl(feature_size, num_patches)

        return [
            PromptReplacement(
                modality="image",
                target="<image>",
                replacement=get_replacement_custom,
            )
        ]

_call_hf_processor

_call_hf_processor(
    prompt: str,
    mm_data: Mapping[str, object],
    mm_kwargs: Mapping[str, object],
    tok_kwargs: Mapping[str, object],
) -> BatchFeature
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _call_hf_processor(
    self,
    prompt: str,
    mm_data: Mapping[str, object],
    mm_kwargs: Mapping[str, object],
    tok_kwargs: Mapping[str, object],
) -> BatchFeature:
    processed_outputs = super()._call_hf_processor(
        prompt=prompt,
        mm_data=mm_data,
        mm_kwargs=mm_kwargs,
        tok_kwargs=tok_kwargs,
    )

    hf_processor = self.info.get_hf_processor(**mm_kwargs)
    image_token_id = hf_processor.image_token_id

    # Since there may be extra tokens in the feature placeholders,
    # we need to pass the image token ID to the model to select the
    # tokens to merge from the vision encoder outputs
    processed_outputs["image_token_id"] = torch.tensor(image_token_id)

    return processed_outputs

_get_mm_fields_config

_get_mm_fields_config(
    hf_inputs: BatchFeature,
    hf_processor_mm_kwargs: Mapping[str, object],
) -> Mapping[str, MultiModalFieldConfig]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _get_mm_fields_config(
    self,
    hf_inputs: BatchFeature,
    hf_processor_mm_kwargs: Mapping[str, object],
) -> Mapping[str, MultiModalFieldConfig]:
    image_num_patches = hf_inputs.get("image_num_patches", torch.empty(0))
    num_images = len(image_num_patches)

    return dict(
        pixel_values_flat=MultiModalFieldConfig.flat_from_sizes(
            "image", image_num_patches
        ),
        image_num_patches=MultiModalFieldConfig.batched("image"),
        image_embeds=MultiModalFieldConfig.batched("image"),
        image_token_id=MultiModalFieldConfig.shared("image", num_images),
    )

_get_prompt_updates

_get_prompt_updates(
    mm_items: MultiModalDataItems,
    hf_processor_mm_kwargs: Mapping[str, object],
    out_mm_kwargs: MultiModalKwargs,
) -> Sequence[PromptUpdate]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _get_prompt_updates(
    self,
    mm_items: MultiModalDataItems,
    hf_processor_mm_kwargs: Mapping[str, object],
    out_mm_kwargs: MultiModalKwargs,
) -> Sequence[PromptUpdate]:
    hf_processor = self.info.get_hf_processor(**hf_processor_mm_kwargs)

    if "image_num_patches" in out_mm_kwargs:
        image_num_patches = out_mm_kwargs["image_num_patches"]
        assert isinstance(image_num_patches, torch.Tensor)
        image_num_patches = image_num_patches.tolist()
    elif "image_embeds" in out_mm_kwargs:
        # to compute num_patches (similar to Qwen2-VL)
        image_num_patches = [None] * len(out_mm_kwargs["image_embeds"])
    else:
        image_num_patches = []

    def get_replacement_custom(item_idx: int):
        images = mm_items.get_items(
            "image", (ImageEmbeddingItems, ImageProcessorItems)
        )

        if isinstance(images, ImageEmbeddingItems):
            feature_size = images.get_feature_size(item_idx)
        else:
            image_size = images.get_image_size(item_idx)
            # Extract max_num_tiles from kwargs, default to 12
            max_num_tiles = hf_processor_mm_kwargs.get(
                "max_num_tiles", hf_processor.max_num_tiles
            )
            feature_size = self.info.get_num_image_tokens(
                image_width=image_size.width,
                image_height=image_size.height,
                max_num_tiles=max_num_tiles,
                processor=hf_processor,
            )

        num_patches = None
        local_image_num_patches = image_num_patches
        if isinstance(local_image_num_patches, torch.Tensor):
            local_image_num_patches = local_image_num_patches.tolist()
        if isinstance(local_image_num_patches, (list, tuple)) and item_idx < len(
            local_image_num_patches
        ):
            num_patches = int(local_image_num_patches[item_idx])

        return hf_processor.get_image_repl(feature_size, num_patches)

    return [
        PromptReplacement(
            modality="image",
            target="<image>",
            replacement=get_replacement_custom,
        )
    ]

NanoNemotronVLDummyInputsBuilder

Bases: NanoNemotronVLDummyInputsBuilder[NanoNemotronVLProcessingInfo]

DummyInputsBuilder extended for video support

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLDummyInputsBuilder(
    NanoNemotronVLDummyInputsBuilder[NanoNemotronVLProcessingInfo]
):
    """DummyInputsBuilder extended for video support"""

    def get_dummy_text(self, mm_counts: Mapping[str, int]) -> str:
        num_videos = mm_counts.get("video", 0)

        return super().get_dummy_text(mm_counts) + "<video>" * num_videos

    def get_dummy_mm_data(
        self,
        seq_len: int,
        mm_counts: Mapping[str, int],
        mm_options: Optional[Mapping[str, BaseDummyOptions]] = None,
    ) -> MultiModalDataDict:
        dummy_image = super().get_dummy_mm_data(
            seq_len=seq_len, mm_counts=mm_counts, mm_options=mm_options
        )
        if self.info.supports_video:
            config = self.info.get_hf_config()
            image_size: int = config.force_image_size
            target_num_frames = self.info.get_num_frames_with_most_features(
                seq_len, mm_counts
            )
            num_videos = mm_counts.get("video", 0)
            video_overrides = mm_options.get("video") if mm_options else None
            dummy_video = {
                "video": self._get_dummy_videos(
                    width=image_size,
                    height=image_size,
                    num_frames=target_num_frames,
                    num_videos=num_videos,
                    overrides=video_overrides,
                )
            }
        else:
            dummy_video = {}
        return {**dummy_image, **dummy_video}

get_dummy_mm_data

get_dummy_mm_data(
    seq_len: int,
    mm_counts: Mapping[str, int],
    mm_options: Optional[
        Mapping[str, BaseDummyOptions]
    ] = None,
) -> MultiModalDataDict
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_dummy_mm_data(
    self,
    seq_len: int,
    mm_counts: Mapping[str, int],
    mm_options: Optional[Mapping[str, BaseDummyOptions]] = None,
) -> MultiModalDataDict:
    dummy_image = super().get_dummy_mm_data(
        seq_len=seq_len, mm_counts=mm_counts, mm_options=mm_options
    )
    if self.info.supports_video:
        config = self.info.get_hf_config()
        image_size: int = config.force_image_size
        target_num_frames = self.info.get_num_frames_with_most_features(
            seq_len, mm_counts
        )
        num_videos = mm_counts.get("video", 0)
        video_overrides = mm_options.get("video") if mm_options else None
        dummy_video = {
            "video": self._get_dummy_videos(
                width=image_size,
                height=image_size,
                num_frames=target_num_frames,
                num_videos=num_videos,
                overrides=video_overrides,
            )
        }
    else:
        dummy_video = {}
    return {**dummy_image, **dummy_video}

get_dummy_text

get_dummy_text(mm_counts: Mapping[str, int]) -> str
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_dummy_text(self, mm_counts: Mapping[str, int]) -> str:
    num_videos = mm_counts.get("video", 0)

    return super().get_dummy_text(mm_counts) + "<video>" * num_videos

NanoNemotronVLImageEmbeddinInputs

Bases: TypedDict

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLImageEmbeddinInputs(TypedDict):
    type: Literal["image_embeds"]
    data: Union[torch.Tensor, list[torch.Tensor]]
    """ 
    A tensor of shape `(num_images, total_image_feature_size, hidden_size)`
    or a list of tensors of shape `(total_image_feature_size, hidden_size)`

    `hidden_size` must match the hidden size of language model backbone.
    """

data instance-attribute

A tensor of shape (num_images, total_image_feature_size, hidden_size) or a list of tensors of shape (total_image_feature_size, hidden_size)

hidden_size must match the hidden size of language model backbone.

type instance-attribute

type: Literal['image_embeds']

NanoNemotronVLImagePixelInputs

Bases: TypedDict

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLImagePixelInputs(TypedDict):
    type: Literal["pixel_values"]
    pixel_values_flat: torch.Tensor
    """
    Shape:
    `(batch_size * num_images * (1 + num_patches), num_channels, height, width)`
    """

    num_patches: torch.Tensor
    """Shape: `(batch_size * num_images)`"""

num_patches instance-attribute

num_patches: Tensor

Shape: (batch_size * num_images)

pixel_values_flat instance-attribute

pixel_values_flat: Tensor

Shape: (batch_size * num_images * (1 + num_patches), num_channels, height, width)

type instance-attribute

type: Literal['pixel_values']

NanoNemotronVLMultiModalProcessor

Bases: NanoNemotronBaseVLMultiModalProcessor[NanoNemotronVLProcessingInfo]

MultiModalProcessor extended for video support

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLMultiModalProcessor(
    NanoNemotronBaseVLMultiModalProcessor[NanoNemotronVLProcessingInfo]
):
    """MultiModalProcessor extended for video support"""

    def _call_hf_processor(
        self,
        prompt: str,
        mm_data: Mapping[str, object],
        mm_kwargs: Mapping[str, object],
        tok_kwargs: Mapping[str, object],
    ) -> BatchFeature:
        processed_outputs = super()._call_hf_processor(
            prompt, mm_data, mm_kwargs, tok_kwargs
        )

        hf_processor = self.info.get_hf_processor(**mm_kwargs)
        if (
            self.info.supports_video
            and (video_token_id := hf_processor.video_token_id) is not None
        ):
            processed_outputs["video_token_id"] = torch.tensor(video_token_id)
        return processed_outputs

    def _get_mm_fields_config(
        self,
        hf_inputs: BatchFeature,
        hf_processor_mm_kwargs: Mapping[str, object],
    ) -> Mapping[str, MultiModalFieldConfig]:
        image_fields = super()._get_mm_fields_config(hf_inputs, hf_processor_mm_kwargs)
        if self.info.supports_video:
            video_num_patches = hf_inputs.get("video_num_patches", torch.empty(0))
            num_videos = len(video_num_patches)
            video_fields = dict(
                pixel_values_flat_video=MultiModalFieldConfig.flat_from_sizes(
                    "video", video_num_patches
                ),
                video_num_patches=MultiModalFieldConfig.batched("video"),
                video_token_id=MultiModalFieldConfig.shared("video", num_videos),
            )
        else:
            video_fields = {}

        return image_fields | video_fields

    def _get_prompt_updates(
        self,
        mm_items: MultiModalDataItems,
        hf_processor_mm_kwargs: Mapping[str, object],
        out_mm_kwargs: MultiModalKwargsItems,
    ) -> Sequence[PromptUpdate]:
        prompt_repl = super()._get_prompt_updates(
            mm_items=mm_items,
            hf_processor_mm_kwargs=hf_processor_mm_kwargs,
            out_mm_kwargs=out_mm_kwargs,
        )

        hf_processor = self.info.get_hf_processor(**hf_processor_mm_kwargs)

        out_mm_data = out_mm_kwargs.get_data()
        if "video_num_patches" in out_mm_data:
            video_num_patches = out_mm_data["video_num_patches"]
            assert isinstance(video_num_patches, torch.Tensor)
            video_num_patches = video_num_patches.tolist()
        else:
            video_num_patches = []

        def get_video_replacement_internvl(item_idx: int):
            feature_size = hf_processor.num_image_token
            num_patches = video_num_patches[item_idx]
            if num_patches is not None:
                assert isinstance(num_patches, int)

            video_pruning_rate = self.info.ctx.get_mm_config().video_pruning_rate
            if video_pruning_rate is not None and video_pruning_rate > 0.0:
                # Start of EVS-specific code
                num_tokens = compute_retained_tokens_count(
                    tokens_per_frame=feature_size,
                    num_frames=num_patches,
                    q=video_pruning_rate,
                )
                # Here we just need placeholders that won't actually be replaced -
                # we just need to make sure the total number of tokens is correct
                # assign all tokens to the first frame
                tokens_per_frame = [num_tokens] + [0] * (num_patches - 1)

                # End of EVS-specific code
            else:
                tokens_per_frame = [feature_size] * num_patches

            return hf_processor.get_video_repl(
                tokens_per_frame,
                video_context_token=hf_processor.video_token,
            )

        if self.info.supports_video:
            prompt_repl = [
                *prompt_repl,
                PromptReplacement(
                    modality="video",
                    target="<video>",
                    replacement=get_video_replacement_internvl,
                ),
            ]

        return prompt_repl

_call_hf_processor

_call_hf_processor(
    prompt: str,
    mm_data: Mapping[str, object],
    mm_kwargs: Mapping[str, object],
    tok_kwargs: Mapping[str, object],
) -> BatchFeature
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _call_hf_processor(
    self,
    prompt: str,
    mm_data: Mapping[str, object],
    mm_kwargs: Mapping[str, object],
    tok_kwargs: Mapping[str, object],
) -> BatchFeature:
    processed_outputs = super()._call_hf_processor(
        prompt, mm_data, mm_kwargs, tok_kwargs
    )

    hf_processor = self.info.get_hf_processor(**mm_kwargs)
    if (
        self.info.supports_video
        and (video_token_id := hf_processor.video_token_id) is not None
    ):
        processed_outputs["video_token_id"] = torch.tensor(video_token_id)
    return processed_outputs

_get_mm_fields_config

_get_mm_fields_config(
    hf_inputs: BatchFeature,
    hf_processor_mm_kwargs: Mapping[str, object],
) -> Mapping[str, MultiModalFieldConfig]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _get_mm_fields_config(
    self,
    hf_inputs: BatchFeature,
    hf_processor_mm_kwargs: Mapping[str, object],
) -> Mapping[str, MultiModalFieldConfig]:
    image_fields = super()._get_mm_fields_config(hf_inputs, hf_processor_mm_kwargs)
    if self.info.supports_video:
        video_num_patches = hf_inputs.get("video_num_patches", torch.empty(0))
        num_videos = len(video_num_patches)
        video_fields = dict(
            pixel_values_flat_video=MultiModalFieldConfig.flat_from_sizes(
                "video", video_num_patches
            ),
            video_num_patches=MultiModalFieldConfig.batched("video"),
            video_token_id=MultiModalFieldConfig.shared("video", num_videos),
        )
    else:
        video_fields = {}

    return image_fields | video_fields

_get_prompt_updates

_get_prompt_updates(
    mm_items: MultiModalDataItems,
    hf_processor_mm_kwargs: Mapping[str, object],
    out_mm_kwargs: MultiModalKwargsItems,
) -> Sequence[PromptUpdate]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _get_prompt_updates(
    self,
    mm_items: MultiModalDataItems,
    hf_processor_mm_kwargs: Mapping[str, object],
    out_mm_kwargs: MultiModalKwargsItems,
) -> Sequence[PromptUpdate]:
    prompt_repl = super()._get_prompt_updates(
        mm_items=mm_items,
        hf_processor_mm_kwargs=hf_processor_mm_kwargs,
        out_mm_kwargs=out_mm_kwargs,
    )

    hf_processor = self.info.get_hf_processor(**hf_processor_mm_kwargs)

    out_mm_data = out_mm_kwargs.get_data()
    if "video_num_patches" in out_mm_data:
        video_num_patches = out_mm_data["video_num_patches"]
        assert isinstance(video_num_patches, torch.Tensor)
        video_num_patches = video_num_patches.tolist()
    else:
        video_num_patches = []

    def get_video_replacement_internvl(item_idx: int):
        feature_size = hf_processor.num_image_token
        num_patches = video_num_patches[item_idx]
        if num_patches is not None:
            assert isinstance(num_patches, int)

        video_pruning_rate = self.info.ctx.get_mm_config().video_pruning_rate
        if video_pruning_rate is not None and video_pruning_rate > 0.0:
            # Start of EVS-specific code
            num_tokens = compute_retained_tokens_count(
                tokens_per_frame=feature_size,
                num_frames=num_patches,
                q=video_pruning_rate,
            )
            # Here we just need placeholders that won't actually be replaced -
            # we just need to make sure the total number of tokens is correct
            # assign all tokens to the first frame
            tokens_per_frame = [num_tokens] + [0] * (num_patches - 1)

            # End of EVS-specific code
        else:
            tokens_per_frame = [feature_size] * num_patches

        return hf_processor.get_video_repl(
            tokens_per_frame,
            video_context_token=hf_processor.video_token,
        )

    if self.info.supports_video:
        prompt_repl = [
            *prompt_repl,
            PromptReplacement(
                modality="video",
                target="<video>",
                replacement=get_video_replacement_internvl,
            ),
        ]

    return prompt_repl

NanoNemotronVLProcessingInfo

Bases: BaseNanoNemotronVLProcessingInfo

ProcessingInfo extended for video processing

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLProcessingInfo(BaseNanoNemotronVLProcessingInfo):
    """ProcessingInfo extended for video processing"""

    @property
    def supports_video(self):
        return self.get_hf_processor().supports_video

    def get_supported_mm_limits(self):
        video_limit = {"video": None} if self.supports_video else {}
        return {**super().get_supported_mm_limits(), **video_limit}

    def get_video_token(self) -> Optional[str]:
        return IMG_CONTEXT

    def get_video_pruning_rate(self) -> Optional[float]:
        return self.ctx.get_mm_config().video_pruning_rate

    def get_num_frames_with_most_features(
        self,
        seq_len: int,
        mm_counts: Mapping[str, int],
    ) -> int:
        max_images = mm_counts.get("image", 0)
        max_videos = mm_counts.get("video", 0)

        processor = self.get_hf_processor()  # we get the CustomProcessor here

        max_image_tokens = self.get_max_image_tokens() * max_images
        max_total_frames = (seq_len - max_image_tokens) // processor.num_image_token
        max_frames_per_video = max_total_frames // max(max_videos, 1)

        max_frames_per_video = min(max_frames_per_video, MAX_FRAMES)
        return max(max_frames_per_video, 1)

    def get_hf_processor(self, **kwargs: object) -> NanoNemotronVLProcessor:
        return self.ctx.init_processor(
            NanoNemotronVLProcessor,
            config=self.get_hf_config(),
            tokenizer=self.get_tokenizer(),
            video_token=self.get_video_token(),
            video_pruning_rate=self.get_video_pruning_rate(),
            **kwargs,
        )

supports_video property

supports_video

get_hf_processor

get_hf_processor(
    **kwargs: object,
) -> NanoNemotronVLProcessor
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_hf_processor(self, **kwargs: object) -> NanoNemotronVLProcessor:
    return self.ctx.init_processor(
        NanoNemotronVLProcessor,
        config=self.get_hf_config(),
        tokenizer=self.get_tokenizer(),
        video_token=self.get_video_token(),
        video_pruning_rate=self.get_video_pruning_rate(),
        **kwargs,
    )

get_num_frames_with_most_features

get_num_frames_with_most_features(
    seq_len: int, mm_counts: Mapping[str, int]
) -> int
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_num_frames_with_most_features(
    self,
    seq_len: int,
    mm_counts: Mapping[str, int],
) -> int:
    max_images = mm_counts.get("image", 0)
    max_videos = mm_counts.get("video", 0)

    processor = self.get_hf_processor()  # we get the CustomProcessor here

    max_image_tokens = self.get_max_image_tokens() * max_images
    max_total_frames = (seq_len - max_image_tokens) // processor.num_image_token
    max_frames_per_video = max_total_frames // max(max_videos, 1)

    max_frames_per_video = min(max_frames_per_video, MAX_FRAMES)
    return max(max_frames_per_video, 1)

get_supported_mm_limits

get_supported_mm_limits()
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_supported_mm_limits(self):
    video_limit = {"video": None} if self.supports_video else {}
    return {**super().get_supported_mm_limits(), **video_limit}

get_video_pruning_rate

get_video_pruning_rate() -> Optional[float]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_video_pruning_rate(self) -> Optional[float]:
    return self.ctx.get_mm_config().video_pruning_rate

get_video_token

get_video_token() -> Optional[str]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_video_token(self) -> Optional[str]:
    return IMG_CONTEXT

NanoNemotronVLProcessor

Bases: BaseNanoNemotronVLProcessor

HF Processor with extended video processing logic. Code for video processing is adapted from video example: https://huggingface.co/OpenGVLab/InternVL3-1B#inference-with-transformers

Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLProcessor(BaseNanoNemotronVLProcessor):
    """
    HF Processor  with extended video processing logic.
    Code for video processing is adapted from video example:
    https://huggingface.co/OpenGVLab/InternVL3-1B#inference-with-transformers
    """

    def __init__(
        self,
        config: PretrainedConfig,
        tokenizer: AnyTokenizer,
        *,
        max_num_tiles: Optional[int] = None,
        min_dynamic_patch: Optional[int] = None,
        max_dynamic_patch: Optional[int] = None,
        dynamic_image_size: Optional[bool] = None,
        video_token: Optional[str] = None,
        video_pruning_rate: Optional[float] = None,
    ) -> None:
        super().__init__(
            config=config,
            tokenizer=tokenizer,
            max_num_tiles=max_num_tiles,
            min_dynamic_patch=min_dynamic_patch,
            max_dynamic_patch=max_dynamic_patch,
            dynamic_image_size=dynamic_image_size,
        )
        # add extra video token for video processing
        self.video_token = video_token
        self.video_pruning_rate = video_pruning_rate

    @property
    def supports_video(self) -> bool:
        return self.video_token_id is not None

    @property
    def video_token_id(self) -> Optional[int]:
        if self.video_token is None:
            return None
        return self.tokenizer.get_vocab().get(self.video_token, None)

    @property
    def image_token_id(self) -> int:
        return self.tokenizer.convert_tokens_to_ids(IMG_CONTEXT)

    def _videos_to_pixel_values_lst(
        self,
        videos: list[npt.NDArray],
        max_num_tiles: int,
        dynamic_image_size: Optional[bool] = None,
    ) -> list[torch.Tensor]:
        return [
            video_to_pixel_values(
                video,
                input_size=self.image_size,
                max_num_tiles=max_num_tiles,
                use_thumbnail=self.use_thumbnail,
            )
            for video in videos
        ]

    def _preprocess_video(
        self,
        text: list[str],
        videos: list[npt.NDArray],
        max_num_tiles: int,
        dynamic_image_size: Optional[bool] = None,
    ):
        if len(videos) == 0 or not self.supports_video:
            video_inputs = {}
        else:
            pixel_values_lst_video = self._videos_to_pixel_values_lst(
                videos,
                max_num_tiles=max_num_tiles,
                dynamic_image_size=dynamic_image_size,
            )

            video_inputs = {
                "pixel_values_flat_video": torch.cat(pixel_values_lst_video),
                "video_num_patches": torch.tensor(
                    [len(item) for item in pixel_values_lst_video]
                ),
            }

            image_size: int = self.config.force_image_size
            patch_size: int = self.config.patch_size
            downsample_ratio = self.config.downsample_ratio
            tokens_in_single_frame = int(
                (image_size * image_size // patch_size**2) * (downsample_ratio**2)
            )

            for pixel_values in pixel_values_lst_video:
                num_frames = pixel_values.shape[0]

                if (
                    self.video_pruning_rate is not None
                    and self.video_pruning_rate > 0.0
                ):
                    # Start of EVS-specific code
                    num_tokens = compute_retained_tokens_count(
                        tokens_per_frame=tokens_in_single_frame,
                        num_frames=num_frames,
                        q=self.video_pruning_rate,
                    )

                    # Here we just need placeholders that won't actually be replaced -
                    # we just need to make sure the total number of tokens is correct
                    # assign all tokens to the first frame
                    tokens_per_frame = [num_tokens] + [0] * (num_frames - 1)

                    # End of EVS-specific code
                else:
                    tokens_per_frame = [tokens_in_single_frame] * num_frames

                video_repl = self.get_video_repl(tokens_per_frame, self.video_token)

                text = [t.replace("<video>", video_repl.full, 1) for t in text]
        return text, video_inputs

    def __call__(
        self,
        text: Optional[Union[str, list[str]]] = None,
        images: Optional[Union[Image.Image, list[Image.Image]]] = None,
        videos: Optional[Union[npt.NDArray, list[npt.NDArray]]] = None,
        return_tensors: Optional[Union[str, TensorType]] = None,
        max_num_tiles: Optional[int] = None,
        dynamic_image_size: Optional[bool] = None,
    ) -> BatchFeature:
        # Use default if not provided
        if max_num_tiles is None:
            max_num_tiles = self.max_num_tiles

        text, images, videos = [
            self._make_batch_input(x) for x in (text, images, videos)
        ]

        text, image_inputs = self._preprocess_image(
            text=text,
            images=images,
            max_num_tiles=max_num_tiles,
        )

        text, video_inputs = self._preprocess_video(
            text=text,
            videos=videos,
            max_num_tiles=1,
            dynamic_image_size=dynamic_image_size,
        )

        text_inputs = self.tokenizer(text, add_special_tokens=False)

        combined_outputs = {**text_inputs, **image_inputs, **video_inputs}

        return BatchFeature(combined_outputs, tensor_type=return_tensors)

    def get_image_repl(
        self,
        feature_size: int,
        num_patches: Optional[int],
    ) -> PromptUpdateDetails[str]:
        repl_features = IMG_CONTEXT * feature_size
        repl_full = IMG_START + repl_features + IMG_END

        return PromptUpdateDetails.select_text(repl_full, IMG_CONTEXT)

    @classmethod
    def get_video_repl(
        cls,
        tokens_per_frame: list[int],
        video_context_token: str = IMG_CONTEXT,
    ) -> PromptUpdateDetails[str]:
        """
        Build prompt replacement for a video.
        The replacement returned is not actually used to replace the placeholder
        tokens - it's just used to make sure we allocate the correct number
        of tokens.
        Actual replacement is done in get_multimodal_embeddings of
        NemotronH_Nano_VL_V2
        (specifically in _process_video_input -> _create_final_video_embeddings).
        There, we create the final embeddings with text embeddings for indicator tokens
        and video embeddings for video tokens.
        This is a single function that handles all cases - non EVS, EVS dummy, EVS real.
        The differentiation is done via tokens_per_frame parameter.
        - non EVS case - constant value same value across all frames
        - EVS dummy - Doesn't matter how tokens are distributed between frames - just
                        make sure the total number of tokens is correct.
        - EVS real (called from get_real_video_repl_for_evs) - different value per frame
        Args:
            tokens_per_frame (list[int]): number of tokens per frame
            video_context_token (str): the token to use for the video context
        """
        repl_full = "".join(
            [
                f"Frame{i + 1}: {IMG_START}{video_context_token * num_tokens}{IMG_END}"
                for i, num_tokens in enumerate(tokens_per_frame)
            ]
        )

        return PromptUpdateDetails.from_seq(repl_full)

image_token_id property

image_token_id: int

supports_video property

supports_video: bool

video_pruning_rate instance-attribute

video_pruning_rate = video_pruning_rate

video_token instance-attribute

video_token = video_token

video_token_id property

video_token_id: Optional[int]

__call__

__call__(
    text: Optional[Union[str, list[str]]] = None,
    images: Optional[Union[Image, list[Image]]] = None,
    videos: Optional[Union[NDArray, list[NDArray]]] = None,
    return_tensors: Optional[Union[str, TensorType]] = None,
    max_num_tiles: Optional[int] = None,
    dynamic_image_size: Optional[bool] = None,
) -> BatchFeature
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def __call__(
    self,
    text: Optional[Union[str, list[str]]] = None,
    images: Optional[Union[Image.Image, list[Image.Image]]] = None,
    videos: Optional[Union[npt.NDArray, list[npt.NDArray]]] = None,
    return_tensors: Optional[Union[str, TensorType]] = None,
    max_num_tiles: Optional[int] = None,
    dynamic_image_size: Optional[bool] = None,
) -> BatchFeature:
    # Use default if not provided
    if max_num_tiles is None:
        max_num_tiles = self.max_num_tiles

    text, images, videos = [
        self._make_batch_input(x) for x in (text, images, videos)
    ]

    text, image_inputs = self._preprocess_image(
        text=text,
        images=images,
        max_num_tiles=max_num_tiles,
    )

    text, video_inputs = self._preprocess_video(
        text=text,
        videos=videos,
        max_num_tiles=1,
        dynamic_image_size=dynamic_image_size,
    )

    text_inputs = self.tokenizer(text, add_special_tokens=False)

    combined_outputs = {**text_inputs, **image_inputs, **video_inputs}

    return BatchFeature(combined_outputs, tensor_type=return_tensors)

__init__

__init__(
    config: PretrainedConfig,
    tokenizer: AnyTokenizer,
    *,
    max_num_tiles: Optional[int] = None,
    min_dynamic_patch: Optional[int] = None,
    max_dynamic_patch: Optional[int] = None,
    dynamic_image_size: Optional[bool] = None,
    video_token: Optional[str] = None,
    video_pruning_rate: Optional[float] = None,
) -> None
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def __init__(
    self,
    config: PretrainedConfig,
    tokenizer: AnyTokenizer,
    *,
    max_num_tiles: Optional[int] = None,
    min_dynamic_patch: Optional[int] = None,
    max_dynamic_patch: Optional[int] = None,
    dynamic_image_size: Optional[bool] = None,
    video_token: Optional[str] = None,
    video_pruning_rate: Optional[float] = None,
) -> None:
    super().__init__(
        config=config,
        tokenizer=tokenizer,
        max_num_tiles=max_num_tiles,
        min_dynamic_patch=min_dynamic_patch,
        max_dynamic_patch=max_dynamic_patch,
        dynamic_image_size=dynamic_image_size,
    )
    # add extra video token for video processing
    self.video_token = video_token
    self.video_pruning_rate = video_pruning_rate

_preprocess_video

_preprocess_video(
    text: list[str],
    videos: list[NDArray],
    max_num_tiles: int,
    dynamic_image_size: Optional[bool] = None,
)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _preprocess_video(
    self,
    text: list[str],
    videos: list[npt.NDArray],
    max_num_tiles: int,
    dynamic_image_size: Optional[bool] = None,
):
    if len(videos) == 0 or not self.supports_video:
        video_inputs = {}
    else:
        pixel_values_lst_video = self._videos_to_pixel_values_lst(
            videos,
            max_num_tiles=max_num_tiles,
            dynamic_image_size=dynamic_image_size,
        )

        video_inputs = {
            "pixel_values_flat_video": torch.cat(pixel_values_lst_video),
            "video_num_patches": torch.tensor(
                [len(item) for item in pixel_values_lst_video]
            ),
        }

        image_size: int = self.config.force_image_size
        patch_size: int = self.config.patch_size
        downsample_ratio = self.config.downsample_ratio
        tokens_in_single_frame = int(
            (image_size * image_size // patch_size**2) * (downsample_ratio**2)
        )

        for pixel_values in pixel_values_lst_video:
            num_frames = pixel_values.shape[0]

            if (
                self.video_pruning_rate is not None
                and self.video_pruning_rate > 0.0
            ):
                # Start of EVS-specific code
                num_tokens = compute_retained_tokens_count(
                    tokens_per_frame=tokens_in_single_frame,
                    num_frames=num_frames,
                    q=self.video_pruning_rate,
                )

                # Here we just need placeholders that won't actually be replaced -
                # we just need to make sure the total number of tokens is correct
                # assign all tokens to the first frame
                tokens_per_frame = [num_tokens] + [0] * (num_frames - 1)

                # End of EVS-specific code
            else:
                tokens_per_frame = [tokens_in_single_frame] * num_frames

            video_repl = self.get_video_repl(tokens_per_frame, self.video_token)

            text = [t.replace("<video>", video_repl.full, 1) for t in text]
    return text, video_inputs

_videos_to_pixel_values_lst

_videos_to_pixel_values_lst(
    videos: list[NDArray],
    max_num_tiles: int,
    dynamic_image_size: Optional[bool] = None,
) -> list[Tensor]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _videos_to_pixel_values_lst(
    self,
    videos: list[npt.NDArray],
    max_num_tiles: int,
    dynamic_image_size: Optional[bool] = None,
) -> list[torch.Tensor]:
    return [
        video_to_pixel_values(
            video,
            input_size=self.image_size,
            max_num_tiles=max_num_tiles,
            use_thumbnail=self.use_thumbnail,
        )
        for video in videos
    ]

get_image_repl

get_image_repl(
    feature_size: int, num_patches: Optional[int]
) -> PromptUpdateDetails[str]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_image_repl(
    self,
    feature_size: int,
    num_patches: Optional[int],
) -> PromptUpdateDetails[str]:
    repl_features = IMG_CONTEXT * feature_size
    repl_full = IMG_START + repl_features + IMG_END

    return PromptUpdateDetails.select_text(repl_full, IMG_CONTEXT)

get_video_repl classmethod

get_video_repl(
    tokens_per_frame: list[int],
    video_context_token: str = IMG_CONTEXT,
) -> PromptUpdateDetails[str]

Build prompt replacement for a video. The replacement returned is not actually used to replace the placeholder tokens - it's just used to make sure we allocate the correct number of tokens. Actual replacement is done in get_multimodal_embeddings of NemotronH_Nano_VL_V2 (specifically in _process_video_input -> _create_final_video_embeddings). There, we create the final embeddings with text embeddings for indicator tokens and video embeddings for video tokens. This is a single function that handles all cases - non EVS, EVS dummy, EVS real. The differentiation is done via tokens_per_frame parameter. - non EVS case - constant value same value across all frames - EVS dummy - Doesn't matter how tokens are distributed between frames - just make sure the total number of tokens is correct. - EVS real (called from get_real_video_repl_for_evs) - different value per frame Args: tokens_per_frame (list[int]): number of tokens per frame video_context_token (str): the token to use for the video context

Source code in vllm/model_executor/models/nano_nemotron_vl.py
@classmethod
def get_video_repl(
    cls,
    tokens_per_frame: list[int],
    video_context_token: str = IMG_CONTEXT,
) -> PromptUpdateDetails[str]:
    """
    Build prompt replacement for a video.
    The replacement returned is not actually used to replace the placeholder
    tokens - it's just used to make sure we allocate the correct number
    of tokens.
    Actual replacement is done in get_multimodal_embeddings of
    NemotronH_Nano_VL_V2
    (specifically in _process_video_input -> _create_final_video_embeddings).
    There, we create the final embeddings with text embeddings for indicator tokens
    and video embeddings for video tokens.
    This is a single function that handles all cases - non EVS, EVS dummy, EVS real.
    The differentiation is done via tokens_per_frame parameter.
    - non EVS case - constant value same value across all frames
    - EVS dummy - Doesn't matter how tokens are distributed between frames - just
                    make sure the total number of tokens is correct.
    - EVS real (called from get_real_video_repl_for_evs) - different value per frame
    Args:
        tokens_per_frame (list[int]): number of tokens per frame
        video_context_token (str): the token to use for the video context
    """
    repl_full = "".join(
        [
            f"Frame{i + 1}: {IMG_START}{video_context_token * num_tokens}{IMG_END}"
            for i, num_tokens in enumerate(tokens_per_frame)
        ]
    )

    return PromptUpdateDetails.from_seq(repl_full)

NanoNemotronVLVideoEmbeddingInputs

Bases: TensorSchema

Dimensions
  • n: Number of videos
  • f: Total video feature size
  • h: Hidden size (must match the hidden size of language model backbone)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLVideoEmbeddingInputs(TensorSchema):
    """
    Dimensions:
        - n: Number of videos
        - f: Total video feature size
        - h: Hidden size (must match the hidden size of language model backbone)
    """

    type: Literal["video_embeds"]
    data: Annotated[Union[torch.Tensor, list[torch.Tensor]], TensorShape("n", "f", "h")]

data instance-attribute

data: Annotated[
    Union[Tensor, list[Tensor]], TensorShape(n, f, h)
]

type instance-attribute

type: Literal['video_embeds']

NanoNemotronVLVideoPixelInputs

Bases: TensorSchema

Dimensions
  • bvf: Batch size * number of videos * num_frames
  • bn: Batch size * number of images
  • c: Number of channels (3)
  • h: Height of each video frame
  • w: Width of each video frame
Source code in vllm/model_executor/models/nano_nemotron_vl.py
class NanoNemotronVLVideoPixelInputs(TensorSchema):
    """
    Dimensions:
        - bvf: Batch size * number of videos * num_frames
        - bn: Batch size * number of images
        - c: Number of channels (3)
        - h: Height of each video frame
        - w: Width of each video frame
    """

    type: Literal["pixel_values_videos"]
    pixel_values_flat: Annotated[torch.Tensor, TensorShape("bvf", 3, "h", "w")]
    num_patches: Annotated[torch.Tensor, TensorShape("bn")]

num_patches instance-attribute

num_patches: Annotated[Tensor, TensorShape(bn)]

pixel_values_flat instance-attribute

pixel_values_flat: Annotated[
    Tensor, TensorShape(bvf, 3, h, w)
]

type instance-attribute

type: Literal['pixel_values_videos']

NemotronH_Nano_VL_V2

Bases: Module, HasInnerState, IsHybrid, SupportsMultiModal, SupportsMultiModalPruning

Source code in vllm/model_executor/models/nano_nemotron_vl.py
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
@MULTIMODAL_REGISTRY.register_processor(
    NanoNemotronVLMultiModalProcessor,
    info=NanoNemotronVLProcessingInfo,
    dummy_inputs=NanoNemotronVLDummyInputsBuilder,
)
class NemotronH_Nano_VL_V2(
    nn.Module, HasInnerState, IsHybrid, SupportsMultiModal, SupportsMultiModalPruning
):
    @classmethod
    def get_placeholder_str(cls, modality: str, i: int) -> Optional[str]:
        if modality.startswith("image"):
            return "<image>"
        if modality.startswith("video"):
            return "<video>"
        return None

    def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
        super().__init__()
        config = vllm_config.model_config.hf_config
        multimodal_config = vllm_config.model_config.multimodal_config
        image_size = config.force_image_size
        patch_size = config.patch_size
        self.patch_size = patch_size
        self.template = config.template
        self.num_image_token = int(
            (image_size // patch_size) ** 2 * (config.downsample_ratio**2)
        )
        self.downsample_ratio = config.downsample_ratio
        self.ps_version = config.ps_version
        self.image_tag_type = config.image_tag_type
        self.video_pruning_rate = multimodal_config.video_pruning_rate
        self.language_model = init_vllm_registered_model(
            vllm_config=vllm_config,
            hf_config=config.text_config,
            prefix=maybe_prefix(prefix, "language_model"),
        )
        self.vision_model = self.get_vit_model_from_radio_config(config).to(
            self.language_model.config.torch_dtype
        )

        # Construct the vision projection.
        vit_hidden_size = config.vit_hidden_size
        vision_projection_hidden_size = config.projector_hidden_size
        llm_hidden_size = config.text_config.hidden_size

        self.mlp1 = nn.Sequential(
            RMSNorm(
                hidden_size=vit_hidden_size * int(1 / self.downsample_ratio) ** 2,
                eps=1e-5,
            ),
            nn.Linear(
                vit_hidden_size * int(1 / self.downsample_ratio) ** 2,
                vision_projection_hidden_size,
                bias=False,
            ),
            ReLUSquaredActivation(),
            nn.Linear(vision_projection_hidden_size, llm_hidden_size, bias=False),
        )
        self.mlp1 = self.mlp1.to(self.language_model.config.torch_dtype)

        self.img_context_token_id = None
        self.video_context_token_id = None
        self.config = config
        self.model_config = vllm_config.model_config

    def pixel_shuffle(self, x, scale_factor=0.5):
        n, w, h, c = x.size()
        # N, W, H, C --> N, W, H * scale, C // scale
        x = x.view(
            n,
            w,
            int(h * scale_factor),
            int(c / scale_factor),
        )
        # N, W, H * scale, C // scale --> N, H * scale, W, C // scale
        x = x.permute(0, 2, 1, 3).contiguous()
        # N, H * scale, W, C // scale -->
        # N, H * scale, W * scale, C // (scale ** 2)
        x = x.view(
            n,
            int(h * scale_factor),
            int(w * scale_factor),
            int(c / (scale_factor * scale_factor)),
        )
        if self.ps_version == "v1":
            warnings.warn(
                "In ps_version 'v1', the height and width have not "
                "been swapped back, which results in a transposed image.",
                stacklevel=2,
            )
        else:
            x = x.permute(0, 2, 1, 3).contiguous()
        return x

    def extract_feature(self, pixel_values):
        vit_embeds = self.vision_model(pixel_values)
        vit_embeds = vit_embeds.to(dtype=torch.bfloat16)
        h = w = int(vit_embeds.shape[1] ** 0.5)
        vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1)
        vit_embeds = self.pixel_shuffle(vit_embeds, scale_factor=self.downsample_ratio)
        vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1, vit_embeds.shape[-1])
        vit_embeds = self.mlp1(vit_embeds)
        return vit_embeds

    def _parse_and_validate_image_input(
        self, **kwargs: object
    ) -> Optional[NanoNemotronVLImageInputs]:
        pixel_values_flat = kwargs.pop("pixel_values_flat", None)
        image_num_patches = kwargs.pop("image_num_patches", None)
        image_embeds = kwargs.pop("image_embeds", None)

        if pixel_values_flat is None and image_embeds is None:
            return None

        if image_embeds is not None:
            if not isinstance(image_embeds, (torch.Tensor, list)):
                raise ValueError(
                    "Incorrect type of image embeddings. "
                    f"Got type: {type(image_embeds)}"
                )

            return NanoNemotronVLImageEmbeddinInputs(
                type="image_embeds",
                data=flatten_bn(image_embeds),
            )

        image_token_id = kwargs["image_token_id"]
        assert isinstance(image_token_id, torch.Tensor)
        self.img_context_token_id = image_token_id.flatten().unique().item()

        if pixel_values_flat is not None:
            if not isinstance(pixel_values_flat, (torch.Tensor, list)):
                raise ValueError(
                    "Incorrect type of pixel values. "
                    f"Got type: {type(pixel_values_flat)}"
                )

            if not isinstance(image_num_patches, (torch.Tensor, list)):
                raise ValueError(
                    "Incorrect type of image_num_patches. "
                    f"Got type: {type(image_num_patches)}"
                )

            pixel_values_flat = flatten_bn(pixel_values_flat, concat=True)
            image_num_patches = flatten_bn(image_num_patches, concat=True)

            return NanoNemotronVLImagePixelInputs(
                type="pixel_values",
                pixel_values_flat=pixel_values_flat,
                num_patches=image_num_patches,
            )

        raise AssertionError("This line should be unreachable.")

    def _process_image_input(
        self, image_input: NanoNemotronVLImageInputs
    ) -> tuple[torch.Tensor, ...]:
        if image_input["type"] == "image_embeds":
            return image_input["data"]

        assert self.vision_model is not None

        image_embeds = self.extract_feature(image_input["pixel_values_flat"])
        num_patches = image_input["num_patches"]

        # Only one image in the current batch
        if len(num_patches) == 1:
            return (image_embeds.view(-1, self.config.text_config.hidden_size),)

        # NOTE: Image embeddings are split into separate tensors for each image
        # by the size of each embedding.
        feature_size = image_embeds.shape[1]
        image_embeds = image_embeds.view(-1, self.config.text_config.hidden_size)
        image_feature_sizes = [
            num_patches * feature_size for num_patches in num_patches
        ]
        return image_embeds.split(image_feature_sizes)

    def _process_video_input(
        self, video_input: NanoNemotronVLVideoPixelInputs
    ) -> tuple[torch.Tensor, ...]:
        """Process video input and create final embeddings with video content
        and indicator tokens."""
        # Get video embeddings using the same processing as images
        video_embeddings = self._process_image_input(video_input)

        final_video_embeddings: tuple[torch.Tensor, ...] = ()

        image_rows = image_cols = self.config.force_image_size
        downsample_ratio = self.config.downsample_ratio
        patch_size = self.config.patch_size
        rows = int(image_rows * downsample_ratio // patch_size)
        cols = int(image_cols * downsample_ratio // patch_size)
        video_pruning_rate = self.video_pruning_rate

        # Calculate video feature dimensions (number of frames and
        # their feature size (AKA tokens per frame))
        # TODO: Maybe this can be optimized to avoid the loop?
        for i, single_video_embeddings in enumerate(video_embeddings):
            num_frames = video_input["num_patches"][i].item()
            assert single_video_embeddings.shape[0] % num_frames == 0

            if video_pruning_rate is not None and video_pruning_rate > 0.0:
                # Start of EVS-specific code
                retention_mask = compute_retention_mask(
                    single_video_embeddings,
                    video_size_thw=(num_frames, rows, cols),
                    spatial_merge_size=1,
                    q=video_pruning_rate,
                )

                # apply retention mask
                single_video_embeddings = single_video_embeddings[retention_mask]

                # calculate the actual number of retained tokens per frame
                retention_mask_thw = retention_mask.reshape(num_frames, rows, cols)
                num_tokens_per_frame = (
                    retention_mask_thw.sum(dim=(1, 2)).long().tolist()
                )
                # End of EVS-specific code
            else:
                feature_size = single_video_embeddings.shape[0] // num_frames
                num_tokens_per_frame = [feature_size] * num_frames

            final_video_embeddings += (
                self._create_final_video_embeddings(
                    single_video_embeddings,
                    num_tokens_per_frame,
                ),
            )

        return final_video_embeddings

    def _create_final_video_embeddings(
        self,
        video_embeddings: torch.Tensor,
        num_tokens_per_frame: list[int],
    ) -> torch.Tensor:
        """Create final embeddings that combine video embeddings with
        text embeddings of indicator tokens.

        These final embeddings contain:
        - Actual video embeddings in positions corresponding to video content
        - Text embeddings for indicator tokens (<img>, </img>, and
          frame separation text) in their respective positions

        These embeddings will replace the placeholder embeddings to create
        input_embeds for the LLM.
        """
        device = video_embeddings.device

        # Generate video replacement text and convert to token IDs
        video_repl_text = NanoNemotronVLProcessor.get_video_repl(
            num_tokens_per_frame,
            IMG_CONTEXT,
        ).full

        tokenizer = cached_tokenizer_from_config(self.model_config)
        repl_token_ids = torch.tensor(
            _seq2tokens(tokenizer, video_repl_text), device=device
        )

        # Get embedding token IDs for image context
        embed_token_ids = torch.tensor(
            encode_tokens(tokenizer, IMG_CONTEXT), device=device
        )

        # Create mask for video embedding positions
        is_video_embed = torch.isin(repl_token_ids, embed_token_ids)

        # Create final video embeddings, merging text embeddings for indicator
        # tokens with video embeddings
        text_embeddings = self.get_language_model().get_input_embeddings(repl_token_ids)
        final_video_embeddings = _merge_multimodal_embeddings(
            inputs_embeds=text_embeddings,
            multimodal_embeddings=video_embeddings,
            is_multimodal=is_video_embed,
        )

        return final_video_embeddings

    def _parse_and_validate_video_input(
        self, **kwargs: object
    ) -> Optional[NanoNemotronVLVideoPixelInputs]:
        pixel_values_flat_video = kwargs.pop("pixel_values_flat_video", None)
        video_num_patches = kwargs.pop("video_num_patches", None)
        video_embeds = kwargs.pop("video_embeds", None)

        if pixel_values_flat_video is None and video_embeds is None:
            return None

        if video_embeds is not None:
            return NanoNemotronVLVideoEmbeddingInputs(
                type="video_embeds",
                data=flatten_bn(video_embeds),
            )

        video_token_id = kwargs["video_token_id"]
        assert isinstance(video_token_id, torch.Tensor)
        self.video_context_token_id = video_token_id.flatten().unique().item()

        if pixel_values_flat_video is not None:
            if not isinstance(pixel_values_flat_video, (torch.Tensor, list)):
                raise ValueError(
                    "Incorrect type of pixel values. "
                    f"Got type: {type(pixel_values_flat_video)}"
                )

            if not isinstance(video_num_patches, (torch.Tensor, list)):
                raise ValueError(
                    "Incorrect type of image_num_patches. "
                    f"Got type: {type(video_num_patches)}"
                )

            pixel_values_flat_video = flatten_bn(pixel_values_flat_video, concat=True)
            video_num_patches = flatten_bn(video_num_patches, concat=True)
            expected_h = expected_w = self.config.force_image_size
            resolve_bindings = {"h": expected_h, "w": expected_w}

            return NanoNemotronVLVideoPixelInputs(
                type="pixel_values_videos",
                pixel_values_flat=pixel_values_flat_video,
                num_patches=video_num_patches,
                resolve_bindings=resolve_bindings,
            )

        raise AssertionError("This line should be unreachable.")

    def _parse_and_validate_multimodal_inputs(self, **kwargs: object) -> dict:
        modalities = {}
        # Preserve the order of modalities if there are multiple of them
        # from the order of kwargs.
        for input_key in kwargs:
            if (
                input_key in ("pixel_values_flat", "image_embeds")
                and "images" not in modalities
            ):
                modalities["images"] = self._parse_and_validate_image_input(**kwargs)
            if input_key in ("pixel_values_flat_video",) and "videos" not in modalities:
                modalities["videos"] = self._parse_and_validate_video_input(**kwargs)

        return modalities

    def get_multimodal_embeddings(self, **kwargs: object) -> MultiModalEmbeddings:
        # Validate the multimodal input keyword arguments
        modalities = self._parse_and_validate_multimodal_inputs(**kwargs)
        if modalities is None:
            return []

        # # The result multimodal_embeddings is tuple of tensors, with each
        # tensor correspoending to a multimodal data item (image or video).
        multimodal_embeddings: tuple[torch.Tensor, ...] = ()

        # NOTE: It is important to iterate over the keys in this dictionary
        # to preserve the order of the modalities.
        for modality in modalities:
            if modality == "images":
                image_input = modalities["images"]
                vision_embeddings = self._process_image_input(image_input)
                multimodal_embeddings += vision_embeddings
            if modality == "videos":
                video_input = modalities["videos"]
                video_embeddings = self._process_video_input(video_input)
                multimodal_embeddings += video_embeddings

        return multimodal_embeddings

    def get_language_model(self) -> torch.nn.Module:
        return self.language_model

    def forward(
        self,
        input_ids: torch.Tensor,
        positions: torch.Tensor,
        intermediate_tensors: Optional[IntermediateTensors] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        **kwargs: object,
    ) -> Union[torch.Tensor, IntermediateTensors]:
        if intermediate_tensors is not None:
            input_ids = None
            inputs_embeds = None

        hidden_states = self.language_model(
            input_ids=input_ids,
            positions=positions,
            intermediate_tensors=intermediate_tensors,
            inputs_embeds=inputs_embeds,
            **kwargs,
        )

        return hidden_states

    def get_mm_mapping(self) -> MultiModelKeys:
        """
        Get the module prefix in multimodal models
        """
        return MultiModelKeys.from_string_field(
            language_model="language_model",
            connector="mlp1",
            tower_model="vision_model",
        )

    def compute_logits(
        self,
        hidden_states: torch.Tensor,
    ) -> Optional[torch.Tensor]:
        return self.language_model.compute_logits(hidden_states)

    def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]):
        adapter_dict = dict(self.mlp1.named_parameters())

        def is_llm(name: str) -> bool:
            return name.startswith("language_model")

        def is_adapter_weights(weight: tuple[str, torch.Tensor]):
            return weight[0].startswith("mlp1")

        def is_vision_weights(name: str) -> bool:
            return name.startswith("vision_model.radio_model.")

        # Separate weights by component
        llm_weights = []
        vision_weights = []

        for name, w in weights:
            if is_llm(name):
                # Strip 'language_model.' prefix for LLM weights
                llm_weights.append((".".join(name.split(".")[1:]), w))
            elif is_adapter_weights((name, w)):
                # Load vision-language adapter weights directly
                trimmed_name = ".".join(name.split(".")[1:])
                param = adapter_dict[trimmed_name]
                with torch.no_grad():
                    default_weight_loader(param, w)
            elif is_vision_weights(name):
                # Convert: vision_model.radio_model.* → radio_model.*
                hf_key = name[len("vision_model.") :]  # Remove "vision_model." prefix
                vision_weights.append((hf_key, w))

        self.language_model.load_weights(llm_weights)
        self.vision_model.load_weights(vision_weights)

    def print_architecture(self, detailed: bool = True, save_to_file: str = None):
        """
        Print model architecture with parameter names, shapes, and sizes.

        Args:
            detailed: If True, show detailed parameter breakdown
            save_to_file: If provided, save output to this file path
        """
        import sys
        from io import StringIO

        # Capture output if saving to file
        original_stdout = sys.stdout
        if save_to_file:
            sys.stdout = StringIO()

        try:
            print("=" * 100)
            print("NemotronH_Nano_VL_V2 Model Architecture")
            print("=" * 100)

            total_params = 0
            param_groups = {
                "language_model": [],
                "vision_model": [],
                "mlp1": [],
                "other": [],
            }

            for name, param in self.named_parameters():
                param_size = param.numel()
                total_params += param_size

                # Group parameters by main component
                if name.startswith("language_model"):
                    param_groups["language_model"].append(
                        (name, param.shape, param_size, param.dtype)
                    )
                elif name.startswith("vision_model"):
                    param_groups["vision_model"].append(
                        (name, param.shape, param_size, param.dtype)
                    )
                elif name.startswith("mlp1"):
                    param_groups["mlp1"].append(
                        (name, param.shape, param_size, param.dtype)
                    )
                else:
                    param_groups["other"].append(
                        (name, param.shape, param_size, param.dtype)
                    )

                if detailed:
                    print(
                        f"{name:<70} | Shape: {str(param.shape):<25} | "
                        f"Size: {param_size:>12,} | Dtype: {param.dtype}"
                    )

            print("=" * 100)
            print("Summary by Component:")
            print("-" * 60)

            for component, params in param_groups.items():
                if params:  # Only show components that have parameters
                    component_total = sum(size for _, _, size, _ in params)
                    percentage = (
                        (component_total / total_params) * 100
                        if total_params > 0
                        else 0
                    )
                    print(
                        f"{component:<20} | Parameters: {len(params):>4} | "
                        f"Total Size: {component_total:>15,} | "
                        f"{percentage:>6.2f}%"
                    )

            print("-" * 60)
            print(f"{'Total Parameters':<20} | {total_params:>15,}")

            # Estimate memory usage (assuming bfloat16 = 2 bytes per parameter)
            memory_mb = total_params * 2 / (1024**2)
            memory_gb = memory_mb / 1024
            print(f"{'Est. Memory (MB)':<20} | {memory_mb:>15.2f}")
            print(f"{'Est. Memory (GB)':<20} | {memory_gb:>15.2f}")
            print("=" * 100)

            # Save to file if requested
            if save_to_file:
                output = sys.stdout.getvalue()
                sys.stdout = original_stdout
                with open(save_to_file, "w") as f:
                    f.write(output)
                print(f"Architecture saved to: {save_to_file}")
                print(output)  # Also print to console

        finally:
            if save_to_file and sys.stdout != original_stdout:
                sys.stdout = original_stdout

    def get_model_info(self):
        """
        Get basic model information as a dictionary.
        """
        total_params = sum(p.numel() for p in self.parameters())

        component_info = {}
        for name, param in self.named_parameters():
            component = name.split(".")[0]
            if component not in component_info:
                component_info[component] = {"params": 0, "size": 0}
            component_info[component]["params"] += 1
            component_info[component]["size"] += param.numel()

        return {
            "model_name": "NemotronH_Nano_VL_V2",
            "total_parameters": total_params,
            "memory_estimate_mb": total_params * 2 / (1024**2),  # bfloat16
            "components": component_info,
            "config": {
                "image_size": getattr(self.config, "force_image_size", None),
                "patch_size": getattr(self.config, "patch_size", None),
                "num_image_token": self.num_image_token,
                "downsample_ratio": self.downsample_ratio,
            },
        }

    def get_vit_model_from_radio_config(self, hf_config):
        hf_config_vision = hf_config.vision_config
        model_name = hf_config_vision.args.get("model")
        if model_name is None:
            raise ValueError(f"Unsupported vit model type: {model_name}")

        preferred_resolution = getattr(hf_config_vision, "preferred_resolution", None)
        image_size = preferred_resolution[0] if preferred_resolution else 224
        patch_size = getattr(hf_config_vision, "patch_size", 16)

        radio_config = RadioConfig(
            model_name=model_name,
            image_size=image_size,
            patch_size=patch_size,
            norm_mean=hf_config.norm_mean,
            norm_std=hf_config.norm_std,
            reg_tokens=(
                hf_config_vision.args.get("register_multiple")
                if hasattr(hf_config_vision, "args")
                and isinstance(hf_config_vision.args, dict)
                else None
            ),
        )

        return RadioModel(config=radio_config)

    def copy_inputs_before_cuda_graphs(self, input_buffers, **kwargs):
        return self.language_model.mamba_cache.copy_inputs_before_cuda_graphs(
            input_buffers, **kwargs
        )

    def get_seqlen_agnostic_capture_inputs(self, batch_size: int):
        return self.language_model.mamba_cache.get_seqlen_agnostic_capture_inputs(
            batch_size
        )

    @classmethod
    def get_mamba_state_shape_from_config(cls, vllm_config: "VllmConfig"):
        text_config = vllm_config.model_config.hf_config.text_config
        temp_vllm_config = copy.deepcopy(vllm_config)
        temp_vllm_config.model_config.hf_config = text_config
        return NemotronHForCausalLM.get_mamba_state_shape_from_config(temp_vllm_config)

    @classmethod
    def get_mamba_state_dtype_from_config(cls, vllm_config: "VllmConfig"):
        text_config = vllm_config.model_config.hf_config.text_config
        temp_vllm_config = copy.deepcopy(vllm_config)
        temp_vllm_config.model_config.hf_config = text_config
        return NemotronHForCausalLM.get_mamba_state_dtype_from_config(temp_vllm_config)

config instance-attribute

config = config

downsample_ratio instance-attribute

downsample_ratio = downsample_ratio

image_tag_type instance-attribute

image_tag_type = image_tag_type

img_context_token_id instance-attribute

img_context_token_id = None

language_model instance-attribute

language_model = init_vllm_registered_model(
    vllm_config=vllm_config,
    hf_config=text_config,
    prefix=maybe_prefix(prefix, "language_model"),
)

mlp1 instance-attribute

mlp1 = to(torch_dtype)

model_config instance-attribute

model_config = model_config

num_image_token instance-attribute

num_image_token = int(
    (image_size // patch_size) ** 2 * downsample_ratio**2
)

patch_size instance-attribute

patch_size = patch_size

ps_version instance-attribute

ps_version = ps_version

template instance-attribute

template = template

video_context_token_id instance-attribute

video_context_token_id = None

video_pruning_rate instance-attribute

video_pruning_rate = video_pruning_rate

vision_model instance-attribute

vision_model = to(torch_dtype)

__init__

__init__(*, vllm_config: VllmConfig, prefix: str = '')
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def __init__(self, *, vllm_config: VllmConfig, prefix: str = ""):
    super().__init__()
    config = vllm_config.model_config.hf_config
    multimodal_config = vllm_config.model_config.multimodal_config
    image_size = config.force_image_size
    patch_size = config.patch_size
    self.patch_size = patch_size
    self.template = config.template
    self.num_image_token = int(
        (image_size // patch_size) ** 2 * (config.downsample_ratio**2)
    )
    self.downsample_ratio = config.downsample_ratio
    self.ps_version = config.ps_version
    self.image_tag_type = config.image_tag_type
    self.video_pruning_rate = multimodal_config.video_pruning_rate
    self.language_model = init_vllm_registered_model(
        vllm_config=vllm_config,
        hf_config=config.text_config,
        prefix=maybe_prefix(prefix, "language_model"),
    )
    self.vision_model = self.get_vit_model_from_radio_config(config).to(
        self.language_model.config.torch_dtype
    )

    # Construct the vision projection.
    vit_hidden_size = config.vit_hidden_size
    vision_projection_hidden_size = config.projector_hidden_size
    llm_hidden_size = config.text_config.hidden_size

    self.mlp1 = nn.Sequential(
        RMSNorm(
            hidden_size=vit_hidden_size * int(1 / self.downsample_ratio) ** 2,
            eps=1e-5,
        ),
        nn.Linear(
            vit_hidden_size * int(1 / self.downsample_ratio) ** 2,
            vision_projection_hidden_size,
            bias=False,
        ),
        ReLUSquaredActivation(),
        nn.Linear(vision_projection_hidden_size, llm_hidden_size, bias=False),
    )
    self.mlp1 = self.mlp1.to(self.language_model.config.torch_dtype)

    self.img_context_token_id = None
    self.video_context_token_id = None
    self.config = config
    self.model_config = vllm_config.model_config

_create_final_video_embeddings

_create_final_video_embeddings(
    video_embeddings: Tensor,
    num_tokens_per_frame: list[int],
) -> Tensor

Create final embeddings that combine video embeddings with text embeddings of indicator tokens.

These final embeddings contain: - Actual video embeddings in positions corresponding to video content - Text embeddings for indicator tokens (, , and frame separation text) in their respective positions

These embeddings will replace the placeholder embeddings to create input_embeds for the LLM.

Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _create_final_video_embeddings(
    self,
    video_embeddings: torch.Tensor,
    num_tokens_per_frame: list[int],
) -> torch.Tensor:
    """Create final embeddings that combine video embeddings with
    text embeddings of indicator tokens.

    These final embeddings contain:
    - Actual video embeddings in positions corresponding to video content
    - Text embeddings for indicator tokens (<img>, </img>, and
      frame separation text) in their respective positions

    These embeddings will replace the placeholder embeddings to create
    input_embeds for the LLM.
    """
    device = video_embeddings.device

    # Generate video replacement text and convert to token IDs
    video_repl_text = NanoNemotronVLProcessor.get_video_repl(
        num_tokens_per_frame,
        IMG_CONTEXT,
    ).full

    tokenizer = cached_tokenizer_from_config(self.model_config)
    repl_token_ids = torch.tensor(
        _seq2tokens(tokenizer, video_repl_text), device=device
    )

    # Get embedding token IDs for image context
    embed_token_ids = torch.tensor(
        encode_tokens(tokenizer, IMG_CONTEXT), device=device
    )

    # Create mask for video embedding positions
    is_video_embed = torch.isin(repl_token_ids, embed_token_ids)

    # Create final video embeddings, merging text embeddings for indicator
    # tokens with video embeddings
    text_embeddings = self.get_language_model().get_input_embeddings(repl_token_ids)
    final_video_embeddings = _merge_multimodal_embeddings(
        inputs_embeds=text_embeddings,
        multimodal_embeddings=video_embeddings,
        is_multimodal=is_video_embed,
    )

    return final_video_embeddings

_parse_and_validate_image_input

_parse_and_validate_image_input(
    **kwargs: object,
) -> Optional[NanoNemotronVLImageInputs]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _parse_and_validate_image_input(
    self, **kwargs: object
) -> Optional[NanoNemotronVLImageInputs]:
    pixel_values_flat = kwargs.pop("pixel_values_flat", None)
    image_num_patches = kwargs.pop("image_num_patches", None)
    image_embeds = kwargs.pop("image_embeds", None)

    if pixel_values_flat is None and image_embeds is None:
        return None

    if image_embeds is not None:
        if not isinstance(image_embeds, (torch.Tensor, list)):
            raise ValueError(
                "Incorrect type of image embeddings. "
                f"Got type: {type(image_embeds)}"
            )

        return NanoNemotronVLImageEmbeddinInputs(
            type="image_embeds",
            data=flatten_bn(image_embeds),
        )

    image_token_id = kwargs["image_token_id"]
    assert isinstance(image_token_id, torch.Tensor)
    self.img_context_token_id = image_token_id.flatten().unique().item()

    if pixel_values_flat is not None:
        if not isinstance(pixel_values_flat, (torch.Tensor, list)):
            raise ValueError(
                "Incorrect type of pixel values. "
                f"Got type: {type(pixel_values_flat)}"
            )

        if not isinstance(image_num_patches, (torch.Tensor, list)):
            raise ValueError(
                "Incorrect type of image_num_patches. "
                f"Got type: {type(image_num_patches)}"
            )

        pixel_values_flat = flatten_bn(pixel_values_flat, concat=True)
        image_num_patches = flatten_bn(image_num_patches, concat=True)

        return NanoNemotronVLImagePixelInputs(
            type="pixel_values",
            pixel_values_flat=pixel_values_flat,
            num_patches=image_num_patches,
        )

    raise AssertionError("This line should be unreachable.")

_parse_and_validate_multimodal_inputs

_parse_and_validate_multimodal_inputs(
    **kwargs: object,
) -> dict
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _parse_and_validate_multimodal_inputs(self, **kwargs: object) -> dict:
    modalities = {}
    # Preserve the order of modalities if there are multiple of them
    # from the order of kwargs.
    for input_key in kwargs:
        if (
            input_key in ("pixel_values_flat", "image_embeds")
            and "images" not in modalities
        ):
            modalities["images"] = self._parse_and_validate_image_input(**kwargs)
        if input_key in ("pixel_values_flat_video",) and "videos" not in modalities:
            modalities["videos"] = self._parse_and_validate_video_input(**kwargs)

    return modalities

_parse_and_validate_video_input

_parse_and_validate_video_input(
    **kwargs: object,
) -> Optional[NanoNemotronVLVideoPixelInputs]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _parse_and_validate_video_input(
    self, **kwargs: object
) -> Optional[NanoNemotronVLVideoPixelInputs]:
    pixel_values_flat_video = kwargs.pop("pixel_values_flat_video", None)
    video_num_patches = kwargs.pop("video_num_patches", None)
    video_embeds = kwargs.pop("video_embeds", None)

    if pixel_values_flat_video is None and video_embeds is None:
        return None

    if video_embeds is not None:
        return NanoNemotronVLVideoEmbeddingInputs(
            type="video_embeds",
            data=flatten_bn(video_embeds),
        )

    video_token_id = kwargs["video_token_id"]
    assert isinstance(video_token_id, torch.Tensor)
    self.video_context_token_id = video_token_id.flatten().unique().item()

    if pixel_values_flat_video is not None:
        if not isinstance(pixel_values_flat_video, (torch.Tensor, list)):
            raise ValueError(
                "Incorrect type of pixel values. "
                f"Got type: {type(pixel_values_flat_video)}"
            )

        if not isinstance(video_num_patches, (torch.Tensor, list)):
            raise ValueError(
                "Incorrect type of image_num_patches. "
                f"Got type: {type(video_num_patches)}"
            )

        pixel_values_flat_video = flatten_bn(pixel_values_flat_video, concat=True)
        video_num_patches = flatten_bn(video_num_patches, concat=True)
        expected_h = expected_w = self.config.force_image_size
        resolve_bindings = {"h": expected_h, "w": expected_w}

        return NanoNemotronVLVideoPixelInputs(
            type="pixel_values_videos",
            pixel_values_flat=pixel_values_flat_video,
            num_patches=video_num_patches,
            resolve_bindings=resolve_bindings,
        )

    raise AssertionError("This line should be unreachable.")

_process_image_input

_process_image_input(
    image_input: NanoNemotronVLImageInputs,
) -> tuple[Tensor, ...]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _process_image_input(
    self, image_input: NanoNemotronVLImageInputs
) -> tuple[torch.Tensor, ...]:
    if image_input["type"] == "image_embeds":
        return image_input["data"]

    assert self.vision_model is not None

    image_embeds = self.extract_feature(image_input["pixel_values_flat"])
    num_patches = image_input["num_patches"]

    # Only one image in the current batch
    if len(num_patches) == 1:
        return (image_embeds.view(-1, self.config.text_config.hidden_size),)

    # NOTE: Image embeddings are split into separate tensors for each image
    # by the size of each embedding.
    feature_size = image_embeds.shape[1]
    image_embeds = image_embeds.view(-1, self.config.text_config.hidden_size)
    image_feature_sizes = [
        num_patches * feature_size for num_patches in num_patches
    ]
    return image_embeds.split(image_feature_sizes)

_process_video_input

_process_video_input(
    video_input: NanoNemotronVLVideoPixelInputs,
) -> tuple[Tensor, ...]

Process video input and create final embeddings with video content and indicator tokens.

Source code in vllm/model_executor/models/nano_nemotron_vl.py
def _process_video_input(
    self, video_input: NanoNemotronVLVideoPixelInputs
) -> tuple[torch.Tensor, ...]:
    """Process video input and create final embeddings with video content
    and indicator tokens."""
    # Get video embeddings using the same processing as images
    video_embeddings = self._process_image_input(video_input)

    final_video_embeddings: tuple[torch.Tensor, ...] = ()

    image_rows = image_cols = self.config.force_image_size
    downsample_ratio = self.config.downsample_ratio
    patch_size = self.config.patch_size
    rows = int(image_rows * downsample_ratio // patch_size)
    cols = int(image_cols * downsample_ratio // patch_size)
    video_pruning_rate = self.video_pruning_rate

    # Calculate video feature dimensions (number of frames and
    # their feature size (AKA tokens per frame))
    # TODO: Maybe this can be optimized to avoid the loop?
    for i, single_video_embeddings in enumerate(video_embeddings):
        num_frames = video_input["num_patches"][i].item()
        assert single_video_embeddings.shape[0] % num_frames == 0

        if video_pruning_rate is not None and video_pruning_rate > 0.0:
            # Start of EVS-specific code
            retention_mask = compute_retention_mask(
                single_video_embeddings,
                video_size_thw=(num_frames, rows, cols),
                spatial_merge_size=1,
                q=video_pruning_rate,
            )

            # apply retention mask
            single_video_embeddings = single_video_embeddings[retention_mask]

            # calculate the actual number of retained tokens per frame
            retention_mask_thw = retention_mask.reshape(num_frames, rows, cols)
            num_tokens_per_frame = (
                retention_mask_thw.sum(dim=(1, 2)).long().tolist()
            )
            # End of EVS-specific code
        else:
            feature_size = single_video_embeddings.shape[0] // num_frames
            num_tokens_per_frame = [feature_size] * num_frames

        final_video_embeddings += (
            self._create_final_video_embeddings(
                single_video_embeddings,
                num_tokens_per_frame,
            ),
        )

    return final_video_embeddings

compute_logits

compute_logits(hidden_states: Tensor) -> Optional[Tensor]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def compute_logits(
    self,
    hidden_states: torch.Tensor,
) -> Optional[torch.Tensor]:
    return self.language_model.compute_logits(hidden_states)

copy_inputs_before_cuda_graphs

copy_inputs_before_cuda_graphs(input_buffers, **kwargs)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def copy_inputs_before_cuda_graphs(self, input_buffers, **kwargs):
    return self.language_model.mamba_cache.copy_inputs_before_cuda_graphs(
        input_buffers, **kwargs
    )

extract_feature

extract_feature(pixel_values)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def extract_feature(self, pixel_values):
    vit_embeds = self.vision_model(pixel_values)
    vit_embeds = vit_embeds.to(dtype=torch.bfloat16)
    h = w = int(vit_embeds.shape[1] ** 0.5)
    vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1)
    vit_embeds = self.pixel_shuffle(vit_embeds, scale_factor=self.downsample_ratio)
    vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1, vit_embeds.shape[-1])
    vit_embeds = self.mlp1(vit_embeds)
    return vit_embeds

forward

forward(
    input_ids: Tensor,
    positions: Tensor,
    intermediate_tensors: Optional[
        IntermediateTensors
    ] = None,
    inputs_embeds: Optional[Tensor] = None,
    **kwargs: object,
) -> Union[Tensor, IntermediateTensors]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def forward(
    self,
    input_ids: torch.Tensor,
    positions: torch.Tensor,
    intermediate_tensors: Optional[IntermediateTensors] = None,
    inputs_embeds: Optional[torch.Tensor] = None,
    **kwargs: object,
) -> Union[torch.Tensor, IntermediateTensors]:
    if intermediate_tensors is not None:
        input_ids = None
        inputs_embeds = None

    hidden_states = self.language_model(
        input_ids=input_ids,
        positions=positions,
        intermediate_tensors=intermediate_tensors,
        inputs_embeds=inputs_embeds,
        **kwargs,
    )

    return hidden_states

get_language_model

get_language_model() -> Module
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_language_model(self) -> torch.nn.Module:
    return self.language_model

get_mamba_state_dtype_from_config classmethod

get_mamba_state_dtype_from_config(vllm_config: VllmConfig)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
@classmethod
def get_mamba_state_dtype_from_config(cls, vllm_config: "VllmConfig"):
    text_config = vllm_config.model_config.hf_config.text_config
    temp_vllm_config = copy.deepcopy(vllm_config)
    temp_vllm_config.model_config.hf_config = text_config
    return NemotronHForCausalLM.get_mamba_state_dtype_from_config(temp_vllm_config)

get_mamba_state_shape_from_config classmethod

get_mamba_state_shape_from_config(vllm_config: VllmConfig)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
@classmethod
def get_mamba_state_shape_from_config(cls, vllm_config: "VllmConfig"):
    text_config = vllm_config.model_config.hf_config.text_config
    temp_vllm_config = copy.deepcopy(vllm_config)
    temp_vllm_config.model_config.hf_config = text_config
    return NemotronHForCausalLM.get_mamba_state_shape_from_config(temp_vllm_config)

get_mm_mapping

get_mm_mapping() -> MultiModelKeys

Get the module prefix in multimodal models

Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_mm_mapping(self) -> MultiModelKeys:
    """
    Get the module prefix in multimodal models
    """
    return MultiModelKeys.from_string_field(
        language_model="language_model",
        connector="mlp1",
        tower_model="vision_model",
    )

get_model_info

get_model_info()

Get basic model information as a dictionary.

Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_model_info(self):
    """
    Get basic model information as a dictionary.
    """
    total_params = sum(p.numel() for p in self.parameters())

    component_info = {}
    for name, param in self.named_parameters():
        component = name.split(".")[0]
        if component not in component_info:
            component_info[component] = {"params": 0, "size": 0}
        component_info[component]["params"] += 1
        component_info[component]["size"] += param.numel()

    return {
        "model_name": "NemotronH_Nano_VL_V2",
        "total_parameters": total_params,
        "memory_estimate_mb": total_params * 2 / (1024**2),  # bfloat16
        "components": component_info,
        "config": {
            "image_size": getattr(self.config, "force_image_size", None),
            "patch_size": getattr(self.config, "patch_size", None),
            "num_image_token": self.num_image_token,
            "downsample_ratio": self.downsample_ratio,
        },
    }

get_multimodal_embeddings

get_multimodal_embeddings(
    **kwargs: object,
) -> MultiModalEmbeddings
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_multimodal_embeddings(self, **kwargs: object) -> MultiModalEmbeddings:
    # Validate the multimodal input keyword arguments
    modalities = self._parse_and_validate_multimodal_inputs(**kwargs)
    if modalities is None:
        return []

    # # The result multimodal_embeddings is tuple of tensors, with each
    # tensor correspoending to a multimodal data item (image or video).
    multimodal_embeddings: tuple[torch.Tensor, ...] = ()

    # NOTE: It is important to iterate over the keys in this dictionary
    # to preserve the order of the modalities.
    for modality in modalities:
        if modality == "images":
            image_input = modalities["images"]
            vision_embeddings = self._process_image_input(image_input)
            multimodal_embeddings += vision_embeddings
        if modality == "videos":
            video_input = modalities["videos"]
            video_embeddings = self._process_video_input(video_input)
            multimodal_embeddings += video_embeddings

    return multimodal_embeddings

get_placeholder_str classmethod

get_placeholder_str(modality: str, i: int) -> Optional[str]
Source code in vllm/model_executor/models/nano_nemotron_vl.py
@classmethod
def get_placeholder_str(cls, modality: str, i: int) -> Optional[str]:
    if modality.startswith("image"):
        return "<image>"
    if modality.startswith("video"):
        return "<video>"
    return None

get_seqlen_agnostic_capture_inputs

get_seqlen_agnostic_capture_inputs(batch_size: int)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_seqlen_agnostic_capture_inputs(self, batch_size: int):
    return self.language_model.mamba_cache.get_seqlen_agnostic_capture_inputs(
        batch_size
    )

get_vit_model_from_radio_config

get_vit_model_from_radio_config(hf_config)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def get_vit_model_from_radio_config(self, hf_config):
    hf_config_vision = hf_config.vision_config
    model_name = hf_config_vision.args.get("model")
    if model_name is None:
        raise ValueError(f"Unsupported vit model type: {model_name}")

    preferred_resolution = getattr(hf_config_vision, "preferred_resolution", None)
    image_size = preferred_resolution[0] if preferred_resolution else 224
    patch_size = getattr(hf_config_vision, "patch_size", 16)

    radio_config = RadioConfig(
        model_name=model_name,
        image_size=image_size,
        patch_size=patch_size,
        norm_mean=hf_config.norm_mean,
        norm_std=hf_config.norm_std,
        reg_tokens=(
            hf_config_vision.args.get("register_multiple")
            if hasattr(hf_config_vision, "args")
            and isinstance(hf_config_vision.args, dict)
            else None
        ),
    )

    return RadioModel(config=radio_config)

load_weights

load_weights(weights: Iterable[tuple[str, Tensor]])
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]):
    adapter_dict = dict(self.mlp1.named_parameters())

    def is_llm(name: str) -> bool:
        return name.startswith("language_model")

    def is_adapter_weights(weight: tuple[str, torch.Tensor]):
        return weight[0].startswith("mlp1")

    def is_vision_weights(name: str) -> bool:
        return name.startswith("vision_model.radio_model.")

    # Separate weights by component
    llm_weights = []
    vision_weights = []

    for name, w in weights:
        if is_llm(name):
            # Strip 'language_model.' prefix for LLM weights
            llm_weights.append((".".join(name.split(".")[1:]), w))
        elif is_adapter_weights((name, w)):
            # Load vision-language adapter weights directly
            trimmed_name = ".".join(name.split(".")[1:])
            param = adapter_dict[trimmed_name]
            with torch.no_grad():
                default_weight_loader(param, w)
        elif is_vision_weights(name):
            # Convert: vision_model.radio_model.* → radio_model.*
            hf_key = name[len("vision_model.") :]  # Remove "vision_model." prefix
            vision_weights.append((hf_key, w))

    self.language_model.load_weights(llm_weights)
    self.vision_model.load_weights(vision_weights)

pixel_shuffle

pixel_shuffle(x, scale_factor=0.5)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def pixel_shuffle(self, x, scale_factor=0.5):
    n, w, h, c = x.size()
    # N, W, H, C --> N, W, H * scale, C // scale
    x = x.view(
        n,
        w,
        int(h * scale_factor),
        int(c / scale_factor),
    )
    # N, W, H * scale, C // scale --> N, H * scale, W, C // scale
    x = x.permute(0, 2, 1, 3).contiguous()
    # N, H * scale, W, C // scale -->
    # N, H * scale, W * scale, C // (scale ** 2)
    x = x.view(
        n,
        int(h * scale_factor),
        int(w * scale_factor),
        int(c / (scale_factor * scale_factor)),
    )
    if self.ps_version == "v1":
        warnings.warn(
            "In ps_version 'v1', the height and width have not "
            "been swapped back, which results in a transposed image.",
            stacklevel=2,
        )
    else:
        x = x.permute(0, 2, 1, 3).contiguous()
    return x

print_architecture

print_architecture(
    detailed: bool = True, save_to_file: str = None
)

Print model architecture with parameter names, shapes, and sizes.

Parameters:

Name Type Description Default
detailed bool

If True, show detailed parameter breakdown

True
save_to_file str

If provided, save output to this file path

None
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def print_architecture(self, detailed: bool = True, save_to_file: str = None):
    """
    Print model architecture with parameter names, shapes, and sizes.

    Args:
        detailed: If True, show detailed parameter breakdown
        save_to_file: If provided, save output to this file path
    """
    import sys
    from io import StringIO

    # Capture output if saving to file
    original_stdout = sys.stdout
    if save_to_file:
        sys.stdout = StringIO()

    try:
        print("=" * 100)
        print("NemotronH_Nano_VL_V2 Model Architecture")
        print("=" * 100)

        total_params = 0
        param_groups = {
            "language_model": [],
            "vision_model": [],
            "mlp1": [],
            "other": [],
        }

        for name, param in self.named_parameters():
            param_size = param.numel()
            total_params += param_size

            # Group parameters by main component
            if name.startswith("language_model"):
                param_groups["language_model"].append(
                    (name, param.shape, param_size, param.dtype)
                )
            elif name.startswith("vision_model"):
                param_groups["vision_model"].append(
                    (name, param.shape, param_size, param.dtype)
                )
            elif name.startswith("mlp1"):
                param_groups["mlp1"].append(
                    (name, param.shape, param_size, param.dtype)
                )
            else:
                param_groups["other"].append(
                    (name, param.shape, param_size, param.dtype)
                )

            if detailed:
                print(
                    f"{name:<70} | Shape: {str(param.shape):<25} | "
                    f"Size: {param_size:>12,} | Dtype: {param.dtype}"
                )

        print("=" * 100)
        print("Summary by Component:")
        print("-" * 60)

        for component, params in param_groups.items():
            if params:  # Only show components that have parameters
                component_total = sum(size for _, _, size, _ in params)
                percentage = (
                    (component_total / total_params) * 100
                    if total_params > 0
                    else 0
                )
                print(
                    f"{component:<20} | Parameters: {len(params):>4} | "
                    f"Total Size: {component_total:>15,} | "
                    f"{percentage:>6.2f}%"
                )

        print("-" * 60)
        print(f"{'Total Parameters':<20} | {total_params:>15,}")

        # Estimate memory usage (assuming bfloat16 = 2 bytes per parameter)
        memory_mb = total_params * 2 / (1024**2)
        memory_gb = memory_mb / 1024
        print(f"{'Est. Memory (MB)':<20} | {memory_mb:>15.2f}")
        print(f"{'Est. Memory (GB)':<20} | {memory_gb:>15.2f}")
        print("=" * 100)

        # Save to file if requested
        if save_to_file:
            output = sys.stdout.getvalue()
            sys.stdout = original_stdout
            with open(save_to_file, "w") as f:
                f.write(output)
            print(f"Architecture saved to: {save_to_file}")
            print(output)  # Also print to console

    finally:
        if save_to_file and sys.stdout != original_stdout:
            sys.stdout = original_stdout

dynamic_preprocess

dynamic_preprocess(
    image,
    *,
    image_size=512,
    max_num_tiles=12,
    use_thumbnail=True,
    idx=0,
)
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def dynamic_preprocess(
    image, *, image_size=512, max_num_tiles=12, use_thumbnail=True, idx=0
):
    orig_width, orig_height = image.size

    target_ratios = get_internvl_target_ratios(1, max_num_tiles)

    blocks, target_width, target_height = calculate_internvl_targets(
        orig_width=orig_width,
        orig_height=orig_height,
        target_ratios=target_ratios,
        image_size=image_size,
        use_thumbnail=False,
    )
    # resize the image
    resized_img = image.resize((target_width, target_height))
    processed_images = []
    for i in range(blocks):
        box = (
            (i % (target_width // image_size)) * image_size,
            (i // (target_width // image_size)) * image_size,
            ((i % (target_width // image_size)) + 1) * image_size,
            ((i // (target_width // image_size)) + 1) * image_size,
        )
        # split the image
        split_img = resized_img.crop(box)
        processed_images.append(split_img)
    assert len(processed_images) == blocks
    if use_thumbnail and len(processed_images) != 1:
        thumbnail_img = image.resize((image_size, image_size))
        processed_images.append(thumbnail_img)

    processed_images = [
        img.convert("RGB") if img.mode != "RGB" else img for img in processed_images
    ]
    processed_images = [
        T.Resize((image_size, image_size), interpolation=T.InterpolationMode.BICUBIC)(
            img
        )
        for img in processed_images
    ]
    processed_images = [T.ToTensor()(img) for img in processed_images]
    return processed_images

image_to_pixel_values

image_to_pixel_values(
    image: Image,
    *,
    input_size: int,
    max_num: int,
    use_thumbnail: bool,
    idx: int,
) -> Tensor
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def image_to_pixel_values(
    image: Image.Image,
    *,
    input_size: int,
    max_num: int,
    use_thumbnail: bool,
    idx: int,
) -> torch.Tensor:
    images = dynamic_preprocess(
        image,
        image_size=input_size,
        max_num_tiles=max_num,
        use_thumbnail=use_thumbnail,
        idx=idx,
    )

    pixel_values = torch.stack(images)
    return pixel_values

video_to_pixel_values

video_to_pixel_values(
    video: NDArray,
    *,
    input_size: int,
    max_num_tiles: int = 1,
    use_thumbnail: bool,
) -> Tensor
Source code in vllm/model_executor/models/nano_nemotron_vl.py
def video_to_pixel_values(
    video: npt.NDArray,
    *,
    input_size: int,
    max_num_tiles: int = 1,
    use_thumbnail: bool,
) -> torch.Tensor:
    assert max_num_tiles == 1, "Video modality always uses one tile"

    # Convert each frame to a single resized tile tensor consistent
    # with image path
    frames_tensors: list[torch.Tensor] = []
    for frame in video:
        pil_frame = dynamic_preprocess(
            Image.fromarray(frame, mode="RGB"),
            image_size=input_size,
            max_num_tiles=max_num_tiles,
            use_thumbnail=use_thumbnail,
            idx=0,
        )
        # dynamic_preprocess returns tensors already; take the single tile
        assert len(pil_frame) >= 1
        frames_tensors.append(pil_frame[-1])

    return torch.stack(frames_tensors)