Skip to content

feat(sglang): add ephemeral KV session routing#7665

Open
ishandhanani wants to merge 1 commit intomainfrom
idhanani/dyn-ephemeral-kv-sessions
Open

feat(sglang): add ephemeral KV session routing#7665
ishandhanani wants to merge 1 commit intomainfrom
idhanani/dyn-ephemeral-kv-sessions

Conversation

@ishandhanani
Copy link
Copy Markdown
Contributor

@ishandhanani ishandhanani commented Mar 27, 2026

Summary

  • add sticky session routing and worker session lifecycle control for ephemeral KV cache reuse
  • wire session control through streaming sessions and the SGLang frontend path

Validation

  • cargo test -p dynamo-llm sticky_sessions -- --nocapture
  • python -m pytest -q components/src/dynamo/frontend/tests/test_sglang_processor_unit.py -q

Summary by CodeRabbit

  • New Features

    • Session control for subagent KV isolation with open/close lifecycle actions
    • Agent-aware cache control with priority-based KV eviction via retention_seconds
    • Sticky session routing for persistent worker affinity
    • Streaming session support with configurable timeouts
  • Documentation

    • Updated guides for session control and cache retention features
  • Chores

    • Enhanced parser configuration support
    • Infrastructure improvements for session lifecycle management

@ishandhanani ishandhanani requested review from a team as code owners March 27, 2026 13:45
@github-actions github-actions bot added feat documentation Improvements or additions to documentation backend::sglang Relates to the sglang backend frontend `python -m dynamo.frontend` and `dynamo-run in=http|text|grpc` router Relates to routing, KV-aware routing, etc. labels Mar 27, 2026
@ishandhanani ishandhanani changed the title feat: add ephemeral KV session routing on top of GLM responses fixes feat: add ephemeral KV session routing Mar 27, 2026
@ishandhanani ishandhanani changed the title feat: add ephemeral KV session routing feat(sglang): add ephemeral KV session routing Mar 27, 2026
@ishandhanani ishandhanani changed the base branch from idhanani/dyn-glm47-responses-codex to main March 27, 2026 14:11
@ishandhanani ishandhanani force-pushed the idhanani/dyn-ephemeral-kv-sessions branch from 595ae55 to 9f89eb6 Compare March 27, 2026 14:14
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot bot commented Mar 27, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@ishandhanani ishandhanani force-pushed the idhanani/dyn-ephemeral-kv-sessions branch from 9f89eb6 to f0f178d Compare March 27, 2026 14:14
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 27, 2026

Walkthrough

This pull request transitions cache control infrastructure from TTL-based prefix pinning to session-based lifecycle management. It replaces the pin_prefix endpoint with open_session/close_session handlers, introduces sticky session routing for worker affinity, adds retention_seconds injection for priority-based KV eviction with time decay, and updates related configurations and documentation.

Changes

Cohort / File(s) Summary
Configuration & Documentation Help Text
components/src/dynamo/common/configuration/groups/kv_router_args.py, lib/bindings/python/src/dynamo/_core.pyi, lib/kv-router/src/scheduling/config.rs
Updated --enable-cache-control / router_enable_cache_control help descriptions to reflect agent-aware cache control covering session lifecycle RPCs, sticky routing, and retention_seconds injection instead of prior PIN-with-TTL semantics.
Frontend Token Normalization & Parser Configuration
components/src/dynamo/frontend/sglang_prepost.py, components/src/dynamo/frontend/sglang_processor.py, components/src/dynamo/frontend/tests/test_sglang_processor_unit.py
Added _normalize_prompt_token_ids helper to standardize tokenizer output into list[int]; added _runtime_config_parser_name helper to resolve parser names from runtime config; extended unit tests for both helpers.
Request Handler Session & Retention Support
components/src/dynamo/sglang/request_handlers/handler_base.py, components/src/dynamo/sglang/request_handlers/llm/decode_handler.py, components/src/dynamo/sglang/request_handlers/llm/prefill_handler.py
Replaced pin_prefix endpoint with open_session, close_session, and session_control handlers; added _retention_kwargs and _session_kwargs helpers; updated generation calls to inject session and retention parameters.
SGLang Endpoint & Service Configuration
components/src/dynamo/sglang/init_llm.py, components/src/dynamo/sglang/publisher.py, components/src/dynamo/sglang/register.py
Added new session_control endpoint registration in init_decode; minor import and formatting adjustments.
Protocol Definitions for Session Control
lib/llm/src/protocols/openai/nvext.rs, lib/llm/src/protocols/common/preprocessor.rs, lib/llm/src/preprocessor.rs
Introduced SessionControl struct with session_id, action (open/close), and timeout fields; added session_control field to RoutingHints and NvExt; propagated session control data through preprocessor.
Core Session & Affinity Management (New Modules)
lib/llm/src/kv_router/agent_controller.rs, lib/llm/src/kv_router/sticky_sessions.rs
Added AgentController for session lifecycle management with event-plane client initialization and deferred close actions; added StickySessionRouter with InMemoryAffinityStore for session-to-worker affinity tracking with TTL expiration and background reaper.
KV Router Refactoring
lib/llm/src/kv_router.rs, lib/llm/src/kv_router/push_router.rs
Removed cache_control module; added re-exports for approx, protocols, scheduling, selector from external crate; exposed AgentController and StickySessionRouter; refactored KvPushRouter to replace PIN infrastructure with sticky session and agent controller integration, injecting retention_seconds into request extra args.
Removed Cache Control Implementation
lib/llm/src/kv_router/cache_control.rs
Deleted entire cache-control module including PinState, CacheControlClient, create_cache_control_client, and spawn_pin_prefix function.
Example Configuration & Documentation
examples/backends/sglang/launch/agg_router.sh, docs/backends/sglang/agents.md
Updated launch script with larger model, configurable context length, session control flags, parser configurations, and radix eviction policy; replaced cache-pinning documentation with session-control semantics, priority-based retention, lifecycle actions, and updated limitations.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Description check ❓ Inconclusive The description covers the summary and validation approach but lacks details about specific changes, entry points for review, and explicitly related issues as outlined in the template. Add a 'Details' section explaining key changes (session control, sticky routing, etc.), specify 'Where should the reviewer start' with critical files, and clarify any related issues.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main change: adding ephemeral KV session routing, which is the primary objective of this large PR.
Docstring Coverage ✅ Passed Docstring coverage is 93.15% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/src/dynamo/frontend/tests/test_sglang_processor_unit.py (1)

13-31: ⚠️ Potential issue | 🟠 Major

Add the required pytest markers at module scope.

This file is explicitly called out in .ai/pytest-guidelines.md, but the new tests are still unmarked. Please add the required scheduling + GPU + type markers once at module level so collection and test selection stay consistent in CI.

As per coding guidelines, "Ensure every test has the required markers (scheduling + GPU + type) and that markers are defined in both pyproject.toml and tests/conftest.py to prevent collection failures."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/src/dynamo/frontend/tests/test_sglang_processor_unit.py` around
lines 13 - 31, Add the required pytest markers at module scope by defining a
pytestmark list near the top of test_sglang_processor_unit.py (after the
imports) that includes the scheduling marker, the GPU marker, and the test-type
marker (e.g., pytest.mark.scheduling(...), pytest.mark.gpu, pytest.mark.unit).
Ensure you use the exact marker names used across the repo (matching
pyproject.toml and tests/conftest.py) so collection works in CI; place this
module-level pytestmark before any test definitions so it applies to all tests
in the file.
🧹 Nitpick comments (5)
components/src/dynamo/sglang/request_handlers/llm/decode_handler.py (1)

116-119: Add a regression test around the new forwarded kwargs.

These additions now thread extra_args.retention_seconds, _session_kwargs(), and _retention_kwargs() through two separate async_generate() paths. A small mock-engine test for aggregated and disaggregated modes would make this much harder to regress silently.

Also applies to: 155-157, 197-199

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/src/dynamo/sglang/request_handlers/llm/decode_handler.py` around
lines 116 - 119, Add regression tests that verify extra_args.retention_seconds
is forwarded through both async_generate() paths (aggregated and disaggregated).
Create tests that call the handler logic which uses _session_kwargs() and
_retention_kwargs(), mock the LLM engine used by async_generate() to capture the
kwargs passed, and assert that retention_seconds appears in the session and
retention kwargs in both the aggregated and disaggregated code paths; cover the
same behavior referenced around the extraction of extra_args/retention_seconds
and the code paths calling async_generate() so future changes cannot regress the
forwarding.
components/src/dynamo/sglang/request_handlers/handler_base.py (2)

457-475: Consider defensive handling for malformed requests.

The method assumes request is a dict with a "get" method. If the caller passes a malformed request (e.g., None or non-dict), this could raise an AttributeError. Given this is an internal service endpoint, the risk is low, but defensive coding would improve robustness.

💡 Optional: Add defensive request handling
     async def session_control(self, request, context=None):
+        if not isinstance(request, dict):
+            yield {"status": "error", "message": "Invalid request format"}
+            return
         action = request.get("action")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/src/dynamo/sglang/request_handlers/handler_base.py` around lines
457 - 475, session_control currently assumes `request` is a dict and calls
request.get which will raise if request is None or not a mapping; guard the
start of session_control to defensively validate `request` (e.g., check
isinstance(request, dict) or hasattr(request, "get") and that "action" exists)
and return a consistent error dict like {"status":"error","message":"malformed
request"} when validation fails, before calling open_session or close_session;
reference the session_control method and the open_session/close_session calls so
you add the validation and early-return logic at the top of session_control.

414-416: Session ID validation is correct, but consider narrowing exception handling.

The if not session_id: pattern correctly catches both None and empty strings, which is appropriate for validation. However, the broad except Exception catches (lines 434, 453) are flagged by static analysis. Consider catching specific exceptions from SGLang if the API defines them.

💡 Optional: Narrow exception handling if SGLang exposes specific exceptions

If SGLang's open_session/close_session raise specific exception types, catching those would provide better error handling:

except SomeSpecificSGLangException as e:
    logging.error(f"Failed to open session {session_id}: {e}")
    return {"status": "error", "message": str(e)}

If the API can raise arbitrary exceptions, the current approach is acceptable for resilience.

Also applies to: 446-448

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/src/dynamo/sglang/request_handlers/handler_base.py` around lines
414 - 416, The broad except Exception blocks in handler_base.py around the
SGLang calls (specifically the try/except wrapping open_session and
close_session usage in the request handlers) should be narrowed: replace except
Exception with the specific SGLang exception types exposed by the library (e.g.,
SGLangError or SGLangException) when calling open_session and close_session, and
log the caught exception (e) in the processLogger/logging call and return its
message; if the library does not expose specific types, catch Exception as e so
you still log and return str(e). Ensure changes target the try/except blocks
that handle open_session and close_session in the handler methods.
lib/llm/src/kv_router/push_router.rs (1)

532-537: Consider logging when extra_args is not an object.

If extra_args is unexpectedly set to a non-object JSON value, the retention_seconds injection silently fails. This could make debugging cache retention issues difficult.

💡 Optional: Add trace logging for unexpected extra_args type
         if let Some(ttl) = request.routing.as_ref().and_then(|r| r.cache_control_ttl) {
             let extra = request.extra_args.get_or_insert_with(|| json!({}));
             if let serde_json::Value::Object(map) = extra {
                 map.insert("retention_seconds".to_string(), json!(ttl));
+            } else {
+                tracing::trace!(
+                    "extra_args is not an object, skipping retention_seconds injection"
+                );
             }
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lib/llm/src/kv_router/push_router.rs` around lines 532 - 537, The code
injects retention_seconds into request.extra_args only when extra_args is a JSON
object, but it silently does nothing if extra_args exists and is a non-object;
update the block in push_router.rs (the code around request.routing,
request.extra_args and cache_control_ttl) to detect the else case where
extra_args is present but not a serde_json::Value::Object and emit a trace/debug
log (e.g., using tracing::trace! or the project's logger) that includes the
unexpected extra_args type and the ttl value; keep the existing insertion
behavior for the Object case and do not change behavior when extra_args is
absent.
lib/llm/src/kv_router/sticky_sessions.rs (1)

64-79: Background reaper task runs indefinitely without cancellation.

The tokio::spawn in new_with_on_expire creates a task that runs forever. When InMemoryAffinityStore is dropped, the reaper continues to run, holding an Arc reference to the map and callback. In practice this may be acceptable since KvPushRouter lives for the process lifetime, but it prevents clean shutdown.

💡 Optional: Add cancellation token for graceful shutdown

If clean shutdown becomes important, consider passing a CancellationToken or storing a JoinHandle to abort the reaper on drop.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lib/llm/src/kv_router/sticky_sessions.rs` around lines 64 - 79, The reaper
task spawned in new_with_on_expire keeps running after InMemoryAffinityStore is
dropped because it holds an Arc to the store; modify InMemoryAffinityStore to
support graceful shutdown by either (A) adding a CancellationToken field (e.g.,
tokio_util::sync::CancellationToken) and passing a cloned token into the spawned
task so the task breaks its loop when token.cancel() is called, or (B) storing
the JoinHandle returned by tokio::spawn on the store and implementing Drop for
InMemoryAffinityStore to call handle.abort() (or await a shutdown signal) to
stop the task; update new_with_on_expire signature to accept or create the
token/handle and ensure Drop properly cancels the reaper to release the Arc and
allow clean shutdown.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/src/dynamo/frontend/sglang_prepost.py`:
- Around line 97-110: The function _normalize_prompt_token_ids currently falls
back to list(prompt_token_ids) which, for a dict with a non-list "input_ids"
iterable, will iterate dict keys instead of token IDs; to fix this, update the
logic to detect non-list but iterable input IDs and convert those to a list:
when prompt_token_ids has an attribute input_ids (variable input_ids) and it's
not a list but is iterable, return list(input_ids); likewise, when
prompt_token_ids is a dict and dict_input_ids exists but is not a list, return
list(dict_input_ids) instead of falling through to list(prompt_token_ids); keep
existing paths for list input_ids and other non-iterable types.

In `@components/src/dynamo/frontend/tests/test_sglang_processor_unit.py`:
- Around line 435-441: The test's FakeBatchEncoding class defines input_ids as a
mutable class attribute which can leak state between tests; change
FakeBatchEncoding so input_ids is an instance attribute (e.g., set in __init__)
or make it an immutable tuple, then use that instance when calling
_normalize_prompt_token_ids to avoid shared mutable state and satisfy the
linter.

In `@components/src/dynamo/sglang/init_llm.py`:
- Around line 122-127: The new session_control endpoint created via
session_control_endpoint =
runtime.endpoint(f"{dynamo_args.namespace}.{dynamo_args.component}.session_control")
is not being added to the shutdown_endpoints list, so update the
shutdown/unregister logic to include session_control_endpoint alongside
generate_endpoint (the list referenced as shutdown_endpoints) so both endpoints
are cleaned up on shutdown; locate where generate_endpoint is appended to
shutdown_endpoints and add session_control_endpoint there (and mirror the same
addition at the other occurrence around lines 136-138).

In `@components/src/dynamo/sglang/publisher.py`:
- Around line 13-14: Remove the duplicate import of urlparse: locate the
repeated "from urllib.parse import urlparse" in the sglang publisher module (the
second occurrence around the top of publisher.py) and delete the redundant line
so only the original import remains, ensuring imports are not duplicated and
lint errors are resolved.

In `@docs/backends/sglang/agents.md`:
- Around line 368-400: The example breaks the append-only session contract by
making Turn 3 (resp3) not extend the prior transcript and leaving both resp2 and
resp3 unread while using stream=True; fix by either making resp3 append to the
prior messages (include prior assistant/user turns like t1 and the same
SESSION_ID) and consuming the streamed responses from resp2 and resp3, or change
both client.chat.completions.create calls to stream=False (or set resp2/resp3
stream handling to read the stream fully) so the session control semantics and
KV lifecycle (SESSION_ID with action "close") are correct; update references to
resp2, resp3, SESSION_ID, and the messages arrays accordingly.
- Around line 299-303: The docs state session_control.timeout default is 300 but
the actual default is 30 (see default_session_timeout()), so update the table
entry for `session_control.timeout` to indicate default 30 seconds to match the
code and tests; ensure the description for the `session_control.timeout` field
mentions "default 30" and aligns with the behavior of default_session_timeout()
and the new serde test.

In `@examples/backends/sglang/launch/agg_router.sh`:
- Around line 56-59: The MODEL_OVERRIDE_ARGS here embeds a hard-coded
rope_scaling.factor of 4.0 while CONTEXT_LENGTH is configurable; compute
rope_scaling.factor dynamically as CONTEXT_LENGTH divided by the
original_max_position_embeddings (32768) and inject that value into the here-doc
so rope_scaling.factor = ${CONTEXT_LENGTH}/32768 (as a float). Update the block
that defines MODEL_OVERRIDE_ARGS to reference CONTEXT_LENGTH and derive the
factor variable (keep rope_theta and original_max_position_embeddings unchanged)
so any non-default CONTEXT_LENGTH produces consistent rope settings.

---

Outside diff comments:
In `@components/src/dynamo/frontend/tests/test_sglang_processor_unit.py`:
- Around line 13-31: Add the required pytest markers at module scope by defining
a pytestmark list near the top of test_sglang_processor_unit.py (after the
imports) that includes the scheduling marker, the GPU marker, and the test-type
marker (e.g., pytest.mark.scheduling(...), pytest.mark.gpu, pytest.mark.unit).
Ensure you use the exact marker names used across the repo (matching
pyproject.toml and tests/conftest.py) so collection works in CI; place this
module-level pytestmark before any test definitions so it applies to all tests
in the file.

---

Nitpick comments:
In `@components/src/dynamo/sglang/request_handlers/handler_base.py`:
- Around line 457-475: session_control currently assumes `request` is a dict and
calls request.get which will raise if request is None or not a mapping; guard
the start of session_control to defensively validate `request` (e.g., check
isinstance(request, dict) or hasattr(request, "get") and that "action" exists)
and return a consistent error dict like {"status":"error","message":"malformed
request"} when validation fails, before calling open_session or close_session;
reference the session_control method and the open_session/close_session calls so
you add the validation and early-return logic at the top of session_control.
- Around line 414-416: The broad except Exception blocks in handler_base.py
around the SGLang calls (specifically the try/except wrapping open_session and
close_session usage in the request handlers) should be narrowed: replace except
Exception with the specific SGLang exception types exposed by the library (e.g.,
SGLangError or SGLangException) when calling open_session and close_session, and
log the caught exception (e) in the processLogger/logging call and return its
message; if the library does not expose specific types, catch Exception as e so
you still log and return str(e). Ensure changes target the try/except blocks
that handle open_session and close_session in the handler methods.

In `@components/src/dynamo/sglang/request_handlers/llm/decode_handler.py`:
- Around line 116-119: Add regression tests that verify
extra_args.retention_seconds is forwarded through both async_generate() paths
(aggregated and disaggregated). Create tests that call the handler logic which
uses _session_kwargs() and _retention_kwargs(), mock the LLM engine used by
async_generate() to capture the kwargs passed, and assert that retention_seconds
appears in the session and retention kwargs in both the aggregated and
disaggregated code paths; cover the same behavior referenced around the
extraction of extra_args/retention_seconds and the code paths calling
async_generate() so future changes cannot regress the forwarding.

In `@lib/llm/src/kv_router/push_router.rs`:
- Around line 532-537: The code injects retention_seconds into
request.extra_args only when extra_args is a JSON object, but it silently does
nothing if extra_args exists and is a non-object; update the block in
push_router.rs (the code around request.routing, request.extra_args and
cache_control_ttl) to detect the else case where extra_args is present but not a
serde_json::Value::Object and emit a trace/debug log (e.g., using
tracing::trace! or the project's logger) that includes the unexpected extra_args
type and the ttl value; keep the existing insertion behavior for the Object case
and do not change behavior when extra_args is absent.

In `@lib/llm/src/kv_router/sticky_sessions.rs`:
- Around line 64-79: The reaper task spawned in new_with_on_expire keeps running
after InMemoryAffinityStore is dropped because it holds an Arc to the store;
modify InMemoryAffinityStore to support graceful shutdown by either (A) adding a
CancellationToken field (e.g., tokio_util::sync::CancellationToken) and passing
a cloned token into the spawned task so the task breaks its loop when
token.cancel() is called, or (B) storing the JoinHandle returned by tokio::spawn
on the store and implementing Drop for InMemoryAffinityStore to call
handle.abort() (or await a shutdown signal) to stop the task; update
new_with_on_expire signature to accept or create the token/handle and ensure
Drop properly cancels the reaper to release the Arc and allow clean shutdown.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: cba71e2f-e596-4edc-b538-ecdd9e5b9965

📥 Commits

Reviewing files that changed from the base of the PR and between 310f8ca and f0f178d.

📒 Files selected for processing (22)
  • components/src/dynamo/common/configuration/groups/kv_router_args.py
  • components/src/dynamo/frontend/sglang_prepost.py
  • components/src/dynamo/frontend/sglang_processor.py
  • components/src/dynamo/frontend/tests/test_sglang_processor_unit.py
  • components/src/dynamo/sglang/init_llm.py
  • components/src/dynamo/sglang/publisher.py
  • components/src/dynamo/sglang/register.py
  • components/src/dynamo/sglang/request_handlers/handler_base.py
  • components/src/dynamo/sglang/request_handlers/llm/decode_handler.py
  • components/src/dynamo/sglang/request_handlers/llm/prefill_handler.py
  • docs/backends/sglang/agents.md
  • examples/backends/sglang/launch/agg_router.sh
  • lib/bindings/python/src/dynamo/_core.pyi
  • lib/kv-router/src/scheduling/config.rs
  • lib/llm/src/kv_router.rs
  • lib/llm/src/kv_router/agent_controller.rs
  • lib/llm/src/kv_router/cache_control.rs
  • lib/llm/src/kv_router/push_router.rs
  • lib/llm/src/kv_router/sticky_sessions.rs
  • lib/llm/src/preprocessor.rs
  • lib/llm/src/protocols/common/preprocessor.rs
  • lib/llm/src/protocols/openai/nvext.rs
💤 Files with no reviewable changes (2)
  • components/src/dynamo/sglang/register.py
  • lib/llm/src/kv_router/cache_control.rs

Comment on lines +97 to +110
def _normalize_prompt_token_ids(prompt_token_ids: Any) -> list[int]:
if isinstance(prompt_token_ids, list):
return prompt_token_ids

input_ids = getattr(prompt_token_ids, "input_ids", None)
if isinstance(input_ids, list):
return input_ids

if isinstance(prompt_token_ids, dict):
dict_input_ids = prompt_token_ids.get("input_ids")
if isinstance(dict_input_ids, list):
return dict_input_ids

return list(prompt_token_ids)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

For SGLang/Hugging Face tokenizers, what concrete return types can tokenizer.apply_chat_template(..., tokenize=True)produce, and caninput_ids be non-list iterables (e.g., tuple/tensor/array)?

💡 Result:

For Hugging Face tokenizers, tokenizer.apply_chat_template(..., tokenize=True) returns a list of integers (the input_ids) by default. With return_tensors="pt" or "np", it returns a PyTorch tensor or NumPy array (batched if multiple conversations). With return_dict=True, it returns a dictionary containing "input_ids" (list/tensor/array), and optionally "attention_mask" and other fields depending on tokenizer configuration and flags like return_assistant_tokens_mask=True. SGLang uses Hugging Face tokenizers (loaded via tokenizer_manager.tokenizer.apply_chat_template in server code) for models with chat templates, so it produces the same return types. SGLang also has its own custom chat template system (via conversation.py and get_chat_template) for the SGLang language frontend (e.g., RuntimeEndpoint.chat_template), which formats messages into strings but does not directly use apply_chat_template with tokenize=True; tokenization happens separately. input_ids for tokenizer.call (used internally by apply_chat_template when tokenize=True) accepts non-list iterables. The text parameter supports str, list[str], tuple[str], PreTokenizedInput, etc., but when passing pre-tokenized input_ids directly to tokenizer (not via apply_chat_template), it expects list[int] or array-like for return_tensors, and iterables like tuples/arrays/tensors are typically converted if they yield ints.

Citations:


🏁 Script executed:

# First, locate and examine the actual file
fd "sglang_prepost.py" -t f

Repository: ai-dynamo/dynamo

Length of output: 108


🏁 Script executed:

# Read the file to see the actual implementation
if [ -f "components/src/dynamo/frontend/sglang_prepost.py" ]; then
  wc -l "components/src/dynamo/frontend/sglang_prepost.py"
fi

Repository: ai-dynamo/dynamo

Length of output: 112


🏁 Script executed:

# Get the specific lines mentioned in the review
if [ -f "components/src/dynamo/frontend/sglang_prepost.py" ]; then
  sed -n '90,120p' "components/src/dynamo/frontend/sglang_prepost.py" | cat -n
fi

Repository: ai-dynamo/dynamo

Length of output: 1143


🏁 Script executed:

# Also check lines 143-145 for usage context
if [ -f "components/src/dynamo/frontend/sglang_prepost.py" ]; then
  sed -n '140,150p' "components/src/dynamo/frontend/sglang_prepost.py" | cat -n
fi

Repository: ai-dynamo/dynamo

Length of output: 524


🏁 Script executed:

# Search for any tests or usage of _normalize_prompt_token_ids
rg "_normalize_prompt_token_ids" -A 2 -B 2

Repository: ai-dynamo/dynamo

Length of output: 2226


🏁 Script executed:

# Search for where this function might be called with different types
rg "apply_chat_template" -t py -A 3 -B 3 | head -50

Repository: ai-dynamo/dynamo

Length of output: 4278


🏁 Script executed:

# Look at the test file to understand FakeBatchEncoding and test coverage
if [ -f "components/src/dynamo/frontend/tests/test_sglang_processor_unit.py" ]; then
  rg "class FakeBatchEncoding|def test.*normalize" -A 10 "components/src/dynamo/frontend/tests/test_sglang_processor_unit.py"
fi

Repository: ai-dynamo/dynamo

Length of output: 456


🏁 Script executed:

# Get more context from the test file around _normalize_prompt_token_ids tests
if [ -f "components/src/dynamo/frontend/tests/test_sglang_processor_unit.py" ]; then
  rg "_normalize_prompt_token_ids" -B 5 -A 15 "components/src/dynamo/frontend/tests/test_sglang_processor_unit.py"
fi

Repository: ai-dynamo/dynamo

Length of output: 1391


🏁 Script executed:

# Check what template_kwargs contains and how apply_chat_template is called
sed -n '130,160p' "components/src/dynamo/frontend/sglang_prepost.py" | cat -n

Repository: ai-dynamo/dynamo

Length of output: 1377


🏁 Script executed:

# Search for any imports of numpy, torch, or array types in sglang_prepost.py
rg "^import|^from" "components/src/dynamo/frontend/sglang_prepost.py" | head -20

Repository: ai-dynamo/dynamo

Length of output: 473


Dict fallback can emit dict keys instead of token IDs if input_ids is non-list.

When _normalize_prompt_token_ids receives a dict with "input_ids" as a non-list iterable (e.g., tensor, array, tuple), the function falls through to return list(prompt_token_ids), which iterates dict keys instead of token IDs. While current usage (tokenizer.apply_chat_template(messages, tokenize=True) without return_dict=True) returns a list directly, the function accepts Any and should handle this case defensively.

Suggested fix
 def _normalize_prompt_token_ids(prompt_token_ids: Any) -> list[int]:
     if isinstance(prompt_token_ids, list):
         return prompt_token_ids
 
     input_ids = getattr(prompt_token_ids, "input_ids", None)
-    if isinstance(input_ids, list):
-        return input_ids
+    if input_ids is not None and not isinstance(input_ids, (str, bytes)):
+        return list(input_ids)
 
     if isinstance(prompt_token_ids, dict):
-        dict_input_ids = prompt_token_ids.get("input_ids")
-        if isinstance(dict_input_ids, list):
-            return dict_input_ids
+        if "input_ids" in prompt_token_ids:
+            dict_input_ids = prompt_token_ids["input_ids"]
+            if not isinstance(dict_input_ids, (str, bytes)):
+                return list(dict_input_ids)
 
     return list(prompt_token_ids)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/src/dynamo/frontend/sglang_prepost.py` around lines 97 - 110, The
function _normalize_prompt_token_ids currently falls back to
list(prompt_token_ids) which, for a dict with a non-list "input_ids" iterable,
will iterate dict keys instead of token IDs; to fix this, update the logic to
detect non-list but iterable input IDs and convert those to a list: when
prompt_token_ids has an attribute input_ids (variable input_ids) and it's not a
list but is iterable, return list(input_ids); likewise, when prompt_token_ids is
a dict and dict_input_ids exists but is not a list, return list(dict_input_ids)
instead of falling through to list(prompt_token_ids); keep existing paths for
list input_ids and other non-iterable types.

Comment on lines +435 to +441
class FakeBatchEncoding:
input_ids = [11, 22, 33]

def __iter__(self):
yield from ("input_ids", "attention_mask")

assert _normalize_prompt_token_ids(FakeBatchEncoding()) == [11, 22, 33]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid a mutable class attribute in FakeBatchEncoding.

input_ids is shared state at class scope here, which is exactly what Ruff is warning about. Making it an instance attribute or a tuple avoids accidental cross-test leakage if this helper ever gets mutated.

♻️ Minimal cleanup
         class FakeBatchEncoding:
-            input_ids = [11, 22, 33]
+            def __init__(self):
+                self.input_ids = [11, 22, 33]
 
             def __iter__(self):
                 yield from ("input_ids", "attention_mask")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class FakeBatchEncoding:
input_ids = [11, 22, 33]
def __iter__(self):
yield from ("input_ids", "attention_mask")
assert _normalize_prompt_token_ids(FakeBatchEncoding()) == [11, 22, 33]
class FakeBatchEncoding:
def __init__(self):
self.input_ids = [11, 22, 33]
def __iter__(self):
yield from ("input_ids", "attention_mask")
assert _normalize_prompt_token_ids(FakeBatchEncoding()) == [11, 22, 33]
🧰 Tools
🪛 Ruff (0.15.7)

[warning] 436-436: Mutable default value for class attribute

(RUF012)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/src/dynamo/frontend/tests/test_sglang_processor_unit.py` around
lines 435 - 441, The test's FakeBatchEncoding class defines input_ids as a
mutable class attribute which can leak state between tests; change
FakeBatchEncoding so input_ids is an instance attribute (e.g., set in __init__)
or make it an immutable tuple, then use that instance when calling
_normalize_prompt_token_ids to avoid shared mutable state and satisfy the
linter.

Comment on lines +122 to +127
# Serve session_control as a discoverable endpoint so the router's
# AgentController can find it via component.endpoint("session_control").
session_control_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.session_control"
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Track session_control_endpoint in shutdown endpoint list.

Line 86 only tracks generate_endpoint. The newly served session_control_endpoint should also be added to shutdown_endpoints so shutdown/unregister logic treats both endpoints consistently.

Suggested fix
-    shutdown_endpoints[:] = [generate_endpoint]
+    shutdown_endpoints[:] = [generate_endpoint]
@@
     session_control_endpoint = runtime.endpoint(
         f"{dynamo_args.namespace}.{dynamo_args.component}.session_control"
     )
+    shutdown_endpoints.append(session_control_endpoint)

Also applies to: 136-138

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/src/dynamo/sglang/init_llm.py` around lines 122 - 127, The new
session_control endpoint created via session_control_endpoint =
runtime.endpoint(f"{dynamo_args.namespace}.{dynamo_args.component}.session_control")
is not being added to the shutdown_endpoints list, so update the
shutdown/unregister logic to include session_control_endpoint alongside
generate_endpoint (the list referenced as shutdown_endpoints) so both endpoints
are cleaned up on shutdown; locate where generate_endpoint is appended to
shutdown_endpoints and add session_control_endpoint there (and mirror the same
addition at the other occurrence around lines 136-138).

Comment on lines +13 to +14
from urllib.parse import urlparse

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove the duplicate urlparse import.

urlparse is already imported at Line 8; the second import is redundant and likely to trip lint checks.

Suggested fix
-from urllib.parse import urlparse

As per coding guidelines: “Use consistent code formatting and adhere to the established linting rules defined in the project.”

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from urllib.parse import urlparse
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/src/dynamo/sglang/publisher.py` around lines 13 - 14, Remove the
duplicate import of urlparse: locate the repeated "from urllib.parse import
urlparse" in the sglang publisher module (the second occurrence around the top
of publisher.py) and delete the redundant line so only the original import
remains, ensuring imports are not duplicated and lint errors are resolved.

Comment on lines +299 to +303
| Field | Type | Description |
|-------|------|-------------|
| `session_control.session_id` | `string` | Unique session identifier. Present on every turn. |
| `session_control.action` | `string` | `"open"` or `"close"`. Omit on intermediate turns. |
| `session_control.timeout` | `integer` | Inactivity timeout in seconds (default 300). Only used with `action: "open"`. |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The documented session timeout default is off by 10x.

default_session_timeout() in lib/llm/src/protocols/openai/nvext.rs returns 30, and the new serde test asserts the same. Leaving default 300 here will cause users who omit timeout to get unexpected session expiry.

📝 Suggested doc fix
-| `session_control.timeout` | `integer` | Inactivity timeout in seconds (default 300). Only used with `action: "open"`. |
+| `session_control.timeout` | `integer` | Inactivity timeout in seconds (default 30). Only used with `action: "open"`. |
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
| Field | Type | Description |
|-------|------|-------------|
| `session_control.session_id` | `string` | Unique session identifier. Present on every turn. |
| `session_control.action` | `string` | `"open"` or `"close"`. Omit on intermediate turns. |
| `session_control.timeout` | `integer` | Inactivity timeout in seconds (default 300). Only used with `action: "open"`. |
| Field | Type | Description |
|-------|------|-------------|
| `session_control.session_id` | `string` | Unique session identifier. Present on every turn. |
| `session_control.action` | `string` | `"open"` or `"close"`. Omit on intermediate turns. |
| `session_control.timeout` | `integer` | Inactivity timeout in seconds (default 30). Only used with `action: "open"`. |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/backends/sglang/agents.md` around lines 299 - 303, The docs state
session_control.timeout default is 300 but the actual default is 30 (see
default_session_timeout()), so update the table entry for
`session_control.timeout` to indicate default 30 seconds to match the code and
tests; ensure the description for the `session_control.timeout` field mentions
"default 30" and aligns with the behavior of default_session_timeout() and the
new serde test.

Comment on lines +368 to 400
resp2 = client.chat.completions.create(
model="Qwen/Qwen3-14B-FP8",
messages=[
{"role": "system", "content": SYSTEM},
{"role": "user", "content": "Rank Federer's 20 Grand Slam titles by artistic beauty."},
{"role": "assistant", "content": t1},
{"role": "user", "content": "Now explain why the 2017 Australian Open final against Nadal was the single greatest moment in competitive sports. Include the fifth set backhand winner."},
],
stream=True,
extra_body={
"nvext": {
"session_control": {"session_id": SESSION_ID}
}
}
)

# Later turns reuse the pinned prefix -- even after heavy load from
# other requests, the KV cache for this conversation is preserved.
response = client.chat.completions.create(
# Turn 3: Close session (KV freed after generation completes)
resp3 = client.chat.completions.create(
model="Qwen/Qwen3-14B-FP8",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Analyze this codebase and suggest improvements."},
{"role": "assistant", "content": assistant_response},
{"role": "user", "content": "Now focus on the database layer."},
{"role": "system", "content": SYSTEM},
{"role": "user", "content": "Compose a closing argument for why Federer's career transcends sport and enters the realm of fine art."},
],
stream=True,
extra_body={
"nvext": {
"cache_control": {
"type": "ephemeral",
"ttl": "1h"
"session_control": {
"session_id": SESSION_ID,
"action": "close"
}
}
}
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

The follow-up session example stops matching the session contract.

Turn 3 no longer extends the prior transcript even though the limitations below describe append-only streaming sessions, and both resp2 and resp3 are left unread despite stream=True. Please either show true append-style turns and consume the streams, or switch those calls to stream=False.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/backends/sglang/agents.md` around lines 368 - 400, The example breaks
the append-only session contract by making Turn 3 (resp3) not extend the prior
transcript and leaving both resp2 and resp3 unread while using stream=True; fix
by either making resp3 append to the prior messages (include prior
assistant/user turns like t1 and the same SESSION_ID) and consuming the streamed
responses from resp2 and resp3, or change both client.chat.completions.create
calls to stream=False (or set resp2/resp3 stream handling to read the stream
fully) so the session control semantics and KV lifecycle (SESSION_ID with action
"close") are correct; update references to resp2, resp3, SESSION_ID, and the
messages arrays accordingly.

Comment on lines +56 to +59
CONTEXT_LENGTH="${CONTEXT_LENGTH:-131072}"
MODEL_OVERRIDE_ARGS=$(cat <<EOF
{"rope_theta":1000000,"max_position_embeddings":${CONTEXT_LENGTH},"rope_scaling":{"rope_type":"yarn","rope_theta":1000000,"factor":4.0,"original_max_position_embeddings":32768}}
EOF
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Derive YaRN scaling from CONTEXT_LENGTH.

The script now advertises CONTEXT_LENGTH as an override, but rope_scaling.factor stays hard-coded at 4.0. Any value other than 131072 will produce inconsistent rope settings and a misconfigured long-context launch.

🐛 Suggested fix
 CONTEXT_LENGTH="${CONTEXT_LENGTH:-131072}"
-MODEL_OVERRIDE_ARGS=$(cat <<EOF
-{"rope_theta":1000000,"max_position_embeddings":${CONTEXT_LENGTH},"rope_scaling":{"rope_type":"yarn","rope_theta":1000000,"factor":4.0,"original_max_position_embeddings":32768}}
-EOF
-)
+MODEL_OVERRIDE_ARGS="$(python3 - "$CONTEXT_LENGTH" <<'PY'
+import json
+import sys
+
+context_length = int(sys.argv[1])
+original_max_position_embeddings = 32768
+
+print(json.dumps({
+    "rope_theta": 1000000,
+    "max_position_embeddings": context_length,
+    "rope_scaling": {
+        "rope_type": "yarn",
+        "rope_theta": 1000000,
+        "factor": context_length / original_max_position_embeddings,
+        "original_max_position_embeddings": original_max_position_embeddings,
+    },
+}))
+PY
+)"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
CONTEXT_LENGTH="${CONTEXT_LENGTH:-131072}"
MODEL_OVERRIDE_ARGS=$(cat <<EOF
{"rope_theta":1000000,"max_position_embeddings":${CONTEXT_LENGTH},"rope_scaling":{"rope_type":"yarn","rope_theta":1000000,"factor":4.0,"original_max_position_embeddings":32768}}
EOF
CONTEXT_LENGTH="${CONTEXT_LENGTH:-131072}"
MODEL_OVERRIDE_ARGS="$(python3 - "$CONTEXT_LENGTH" <<'PY'
import json
import sys
context_length = int(sys.argv[1])
original_max_position_embeddings = 32768
print(json.dumps({
"rope_theta": 1000000,
"max_position_embeddings": context_length,
"rope_scaling": {
"rope_type": "yarn",
"rope_theta": 1000000,
"factor": context_length / original_max_position_embeddings,
"original_max_position_embeddings": original_max_position_embeddings,
},
}))
PY
)"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/backends/sglang/launch/agg_router.sh` around lines 56 - 59, The
MODEL_OVERRIDE_ARGS here embeds a hard-coded rope_scaling.factor of 4.0 while
CONTEXT_LENGTH is configurable; compute rope_scaling.factor dynamically as
CONTEXT_LENGTH divided by the original_max_position_embeddings (32768) and
inject that value into the here-doc so rope_scaling.factor =
${CONTEXT_LENGTH}/32768 (as a float). Update the block that defines
MODEL_OVERRIDE_ARGS to reference CONTEXT_LENGTH and derive the factor variable
(keep rope_theta and original_max_position_embeddings unchanged) so any
non-default CONTEXT_LENGTH produces consistent rope settings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend::sglang Relates to the sglang backend documentation Improvements or additions to documentation feat frontend `python -m dynamo.frontend` and `dynamo-run in=http|text|grpc` router Relates to routing, KV-aware routing, etc. size/XXL

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant