Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions components/src/dynamo/common/multimodal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
TransferRequest,
)
from dynamo.common.multimodal.image_loader import ImageLoader
from dynamo.common.multimodal.video_loader import VideoLoader

EMBEDDING_SENDER_FACTORIES: dict[
EmbeddingTransferMode, Callable[[], AbstractEmbeddingSender]
Expand All @@ -43,6 +44,7 @@
"EMBEDDING_RECEIVER_FACTORIES",
"EMBEDDING_SENDER_FACTORIES",
"ImageLoader",
"VideoLoader",
"NixlReadEmbeddingReceiver",
"NixlReadEmbeddingSender",
"NixlWriteEmbeddingSender",
Expand Down
164 changes: 164 additions & 0 deletions components/src/dynamo/common/multimodal/video_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import logging
import os
from pathlib import Path
from typing import Any, Dict, Final, List
from urllib.parse import urlparse

import numpy as np

import dynamo.nixl_connect as nixl_connect
from dynamo.common.utils.media_nixl import read_decoded_media_via_nixl
from dynamo.common.utils.runtime import run_async

logger = logging.getLogger(__name__)


URL_VARIANT_KEY: Final = "Url"
DECODED_VARIANT_KEY: Final = "Decoded"


def _require_vllm_video_media() -> tuple[Any, Any, Any]:
try:
from vllm.multimodal.media import MediaConnector, VideoMediaIO
from vllm.multimodal.media.image import ImageMediaIO
except ImportError as exc:
raise RuntimeError(
"vLLM multimodal media components are required to decode `video_url` "
"inputs in the vLLM backend."
) from exc
return MediaConnector, VideoMediaIO, ImageMediaIO


class VideoLoader:
CACHE_SIZE_MAXIMUM = int(os.environ.get("DYN_MM_VIDEO_CACHE_SIZE", "8"))
NUM_FRAMES_DEFAULT = int(os.environ.get("DYN_MM_VIDEO_NUM_FRAMES", "8"))

def __init__(
self,
cache_size: int = CACHE_SIZE_MAXIMUM,
http_timeout: float = 60.0,
num_frames: int = NUM_FRAMES_DEFAULT,
enable_frontend_decoding: bool = False,
) -> None:
del cache_size

self._http_timeout = int(http_timeout)
self._num_frames = num_frames
self._enable_frontend_decoding = enable_frontend_decoding
self._nixl_connector = None
self._vllm_media_connector = None
if self._enable_frontend_decoding:
self._nixl_connector = nixl_connect.Connector()
run_async(self._nixl_connector.initialize)

@staticmethod
def _normalize_video_url(video_url: str) -> str:
parsed_url = urlparse(video_url)
if parsed_url.scheme or not video_url:
return video_url

file_path = Path(video_url).expanduser()
if not file_path.exists():
raise FileNotFoundError(f"Error reading file: {file_path}")

return file_path.resolve().as_uri()

def _get_vllm_media_connector(self) -> Any:
if self._vllm_media_connector is None:
MediaConnector, _, _ = _require_vllm_video_media()
# Match the previous backend behavior and allow direct local file paths.
self._vllm_media_connector = MediaConnector(allowed_local_media_path="/")

return self._vllm_media_connector

def _create_vllm_video_io(self) -> Any:
_, VideoMediaIO, ImageMediaIO = _require_vllm_video_media()
return VideoMediaIO(
ImageMediaIO(image_mode="RGB"),
num_frames=self._num_frames,
)

async def _load_video_with_vllm(
self, video_url: str
) -> tuple[np.ndarray, Dict[str, Any]]:
connector = self._get_vllm_media_connector()
normalized_url = self._normalize_video_url(video_url)
return await connector.load_from_url_async(
normalized_url,
self._create_vllm_video_io(),
fetch_timeout=self._http_timeout,
)

async def load_video(self, video_url: str) -> tuple[np.ndarray, Dict[str, Any]]:
try:
frames, metadata = await self._load_video_with_vllm(video_url)
if frames.size == 0:
raise ValueError(
f"Failed to extract video frames from {video_url}. Decoded clip is empty."
)
return np.ascontiguousarray(frames), metadata
except FileNotFoundError:
raise
except Exception as exc:
logger.error("Error loading video from %s: %s", video_url, exc)
raise ValueError(f"Failed to load video from {video_url}: {exc}") from exc

async def load_video_batch(
self,
video_mm_items: List[Dict[str, Any]],
) -> List[tuple[np.ndarray, Dict[str, Any]]]:
video_futures = []

for item in video_mm_items:
if isinstance(item, dict) and URL_VARIANT_KEY in item:
url = item[URL_VARIANT_KEY]
video_futures.append(self.load_video(url))
logger.debug("Preparing to load video from URL: %s...", url[:80])
elif isinstance(item, dict) and DECODED_VARIANT_KEY in item:
if self._enable_frontend_decoding:
metadata = item[DECODED_VARIANT_KEY]
if self._nixl_connector is None:
raise RuntimeError("NIXL connector is not initialized")
video_futures.append(
read_decoded_media_via_nixl(self._nixl_connector, metadata)
)
else:
raise ValueError(
"Received decoded video data but enable_frontend_decoding=False. "
"Enable frontend decoding to transfer decoded video frames via NIXL."
)

results = await asyncio.gather(*video_futures, return_exceptions=True)
loaded_videos: list[tuple[np.ndarray, Dict[str, Any]]] = []
collective_exceptions = ""
for media_item, result in zip(video_mm_items, results):
if isinstance(result, Exception):
source = media_item.get(URL_VARIANT_KEY, "decoded")
logger.error("Failed to load video from %s...: %s", source[:80], result)
collective_exceptions += (
f"Failed to load video from {source[:80]}...: {result}\n"
)
continue
frames, metadata = result
loaded_videos.append((np.ascontiguousarray(frames), metadata))

if collective_exceptions:
raise Exception(collective_exceptions)

return loaded_videos
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from unittest.mock import AsyncMock

import numpy as np
import pytest

from dynamo.common.multimodal.video_loader import VideoLoader

pytestmark = [
pytest.mark.unit,
pytest.mark.pre_merge,
]


def test_normalize_video_url_converts_local_paths(tmp_path):
video_path = tmp_path / "sample.webm"
video_path.write_bytes(b"video")

assert (
VideoLoader._normalize_video_url(str(video_path))
== video_path.resolve().as_uri()
)


def test_normalize_video_url_preserves_data_urls():
data_url = "data:video/webm;base64,Zm9v"

assert VideoLoader._normalize_video_url(data_url) == data_url


@pytest.mark.asyncio
async def test_load_video_uses_vllm_media_connector():
loader = VideoLoader()
frames = np.arange(24, dtype=np.uint8).reshape(1, 2, 4, 3)[:, :, ::-1, :]
metadata = {"fps": 4.0, "frames_indices": [0], "total_num_frames": 1}
loader._load_video_with_vllm = AsyncMock( # type: ignore[method-assign]
return_value=(frames, metadata)
)

loaded_frames, loaded_metadata = await loader.load_video(
"data:video/webm;base64,Zm9v"
)

assert loaded_frames.flags["C_CONTIGUOUS"]
np.testing.assert_array_equal(loaded_frames, np.ascontiguousarray(frames))
assert loaded_metadata == metadata


@pytest.mark.asyncio
async def test_load_video_batch_uses_url_loader():
loader = VideoLoader()
first = (
np.zeros((1, 2, 2, 3), dtype=np.uint8),
{"fps": 2.0, "frames_indices": [0], "total_num_frames": 1},
)
second = (
np.ones((1, 2, 2, 3), dtype=np.uint8),
{"fps": 2.0, "frames_indices": [0], "total_num_frames": 1},
)
loader.load_video = AsyncMock(side_effect=[first, second]) # type: ignore[method-assign]

videos = await loader.load_video_batch(
[
{"Url": "https://example.com/one.mp4"},
{"Url": "https://example.com/two.mp4"},
]
)

np.testing.assert_array_equal(videos[0][0], first[0])
np.testing.assert_array_equal(videos[1][0], second[0])
assert videos[0][1] == first[1]
assert videos[1][1] == second[1]


@pytest.mark.asyncio
async def test_load_video_batch_rejects_decoded_variant_without_frontend_decoding():
loader = VideoLoader(enable_frontend_decoding=False)

with pytest.raises(ValueError, match="enable_frontend_decoding=False"):
await loader.load_video_batch([{"Decoded": {"shape": [1, 2, 2, 3]}}])
45 changes: 29 additions & 16 deletions components/src/dynamo/vllm/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
NixlWriteEmbeddingReceiver,
)
from dynamo.common.multimodal.image_loader import ImageLoader
from dynamo.common.multimodal.video_loader import VideoLoader
from dynamo.common.utils.engine_response import normalize_finish_reason
from dynamo.common.utils.input_params import InputParamManager
from dynamo.common.utils.otel_tracing import build_trace_headers
Expand Down Expand Up @@ -388,6 +389,9 @@ def __init__(
self.image_loader = ImageLoader(
enable_frontend_decoding=enable_frontend_decoding
)
self.video_loader = VideoLoader(
enable_frontend_decoding=enable_frontend_decoding
)
self.embedding_loader = self.init_embedding_loader(config, encode_worker_client)

self.use_vllm_tokenizer = use_vllm_tokenizer
Expand Down Expand Up @@ -1165,8 +1169,10 @@ async def _extract_multimodal_data(

mm_map = request["multi_modal_data"]

# [gluo NOTE] If embedding loader is configured, currently we unconditionally
# fetch from the embedding loader.
vllm_mm_data = {}

# [gluo NOTE] If embedding loader is configured, fetch image embeddings first.
# Still continue below so mixed image+video requests can attach `video`.
if self.embedding_loader is not None:
# [gluo FIXME] couldn't simply pass 'mm_map.get(IMAGE_URL_KEY, [])' like below
# as currently the encode worker is using 'ImageLoader.load_image()' which doesn't
Expand All @@ -1185,23 +1191,30 @@ async def _extract_multimodal_data(
logger.debug(
f"Fetched multimodal embeddings for {len(vllm_mm_data)} items"
)
return vllm_mm_data if vllm_mm_data else None

# Fallback that the vLLM engine will perform encoding internally.
vllm_mm_data = {}
# Process image_url entries
images = await self.image_loader.load_image_batch(
mm_map.get(IMAGE_URL_KEY, []),
)
image_mm_items = mm_map.get(IMAGE_URL_KEY, [])
if "image" not in vllm_mm_data and image_mm_items:
images = await self.image_loader.load_image_batch(
image_mm_items,
)

if images:
# vLLM expects single image or list
vllm_mm_data["image"] = images[0] if len(images) == 1 else images
logger.debug(
f"Extracted {len(images)} image(s) for multimodal processing"
)

if images:
# vLLM expects single image or list
vllm_mm_data["image"] = images[0] if len(images) == 1 else images
logger.debug(f"Extracted {len(images)} image(s) for multimodal processing")
video_mm_items = mm_map.get(VIDEO_URL_KEY, [])
if video_mm_items:
videos = await self.video_loader.load_video_batch(video_mm_items)

# Handle video_url entries (future expansion)
if VIDEO_URL_KEY in mm_map:
logger.warning("Video multimodal data not yet supported in standard worker")
if videos:
# vLLM expects single video or list
vllm_mm_data["video"] = videos[0] if len(videos) == 1 else videos
logger.debug(
f"Extracted {len(videos)} video(s) for multimodal processing"
)

return vllm_mm_data if vllm_mm_data else None

Expand Down
Loading
Loading