feat(sglang): add ephemeral KV session routing#7665
feat(sglang): add ephemeral KV session routing#7665ishandhanani wants to merge 1 commit intomainfrom
Conversation
595ae55 to
9f89eb6
Compare
Signed-off-by: Ishan Dhanani <[email protected]>
9f89eb6 to
f0f178d
Compare
WalkthroughThis pull request transitions cache control infrastructure from TTL-based prefix pinning to session-based lifecycle management. It replaces the Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/src/dynamo/frontend/tests/test_sglang_processor_unit.py (1)
13-31:⚠️ Potential issue | 🟠 MajorAdd the required pytest markers at module scope.
This file is explicitly called out in
.ai/pytest-guidelines.md, but the new tests are still unmarked. Please add the required scheduling + GPU + type markers once at module level so collection and test selection stay consistent in CI.As per coding guidelines, "Ensure every test has the required markers (scheduling + GPU + type) and that markers are defined in both
pyproject.tomlandtests/conftest.pyto prevent collection failures."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/src/dynamo/frontend/tests/test_sglang_processor_unit.py` around lines 13 - 31, Add the required pytest markers at module scope by defining a pytestmark list near the top of test_sglang_processor_unit.py (after the imports) that includes the scheduling marker, the GPU marker, and the test-type marker (e.g., pytest.mark.scheduling(...), pytest.mark.gpu, pytest.mark.unit). Ensure you use the exact marker names used across the repo (matching pyproject.toml and tests/conftest.py) so collection works in CI; place this module-level pytestmark before any test definitions so it applies to all tests in the file.
🧹 Nitpick comments (5)
components/src/dynamo/sglang/request_handlers/llm/decode_handler.py (1)
116-119: Add a regression test around the new forwarded kwargs.These additions now thread
extra_args.retention_seconds,_session_kwargs(), and_retention_kwargs()through two separateasync_generate()paths. A small mock-engine test for aggregated and disaggregated modes would make this much harder to regress silently.Also applies to: 155-157, 197-199
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/src/dynamo/sglang/request_handlers/llm/decode_handler.py` around lines 116 - 119, Add regression tests that verify extra_args.retention_seconds is forwarded through both async_generate() paths (aggregated and disaggregated). Create tests that call the handler logic which uses _session_kwargs() and _retention_kwargs(), mock the LLM engine used by async_generate() to capture the kwargs passed, and assert that retention_seconds appears in the session and retention kwargs in both the aggregated and disaggregated code paths; cover the same behavior referenced around the extraction of extra_args/retention_seconds and the code paths calling async_generate() so future changes cannot regress the forwarding.components/src/dynamo/sglang/request_handlers/handler_base.py (2)
457-475: Consider defensive handling for malformed requests.The method assumes
requestis a dict with a "get" method. If the caller passes a malformed request (e.g.,Noneor non-dict), this could raise anAttributeError. Given this is an internal service endpoint, the risk is low, but defensive coding would improve robustness.💡 Optional: Add defensive request handling
async def session_control(self, request, context=None): + if not isinstance(request, dict): + yield {"status": "error", "message": "Invalid request format"} + return action = request.get("action")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/src/dynamo/sglang/request_handlers/handler_base.py` around lines 457 - 475, session_control currently assumes `request` is a dict and calls request.get which will raise if request is None or not a mapping; guard the start of session_control to defensively validate `request` (e.g., check isinstance(request, dict) or hasattr(request, "get") and that "action" exists) and return a consistent error dict like {"status":"error","message":"malformed request"} when validation fails, before calling open_session or close_session; reference the session_control method and the open_session/close_session calls so you add the validation and early-return logic at the top of session_control.
414-416: Session ID validation is correct, but consider narrowing exception handling.The
if not session_id:pattern correctly catches bothNoneand empty strings, which is appropriate for validation. However, the broadexcept Exceptioncatches (lines 434, 453) are flagged by static analysis. Consider catching specific exceptions from SGLang if the API defines them.💡 Optional: Narrow exception handling if SGLang exposes specific exceptions
If SGLang's
open_session/close_sessionraise specific exception types, catching those would provide better error handling:except SomeSpecificSGLangException as e: logging.error(f"Failed to open session {session_id}: {e}") return {"status": "error", "message": str(e)}If the API can raise arbitrary exceptions, the current approach is acceptable for resilience.
Also applies to: 446-448
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/src/dynamo/sglang/request_handlers/handler_base.py` around lines 414 - 416, The broad except Exception blocks in handler_base.py around the SGLang calls (specifically the try/except wrapping open_session and close_session usage in the request handlers) should be narrowed: replace except Exception with the specific SGLang exception types exposed by the library (e.g., SGLangError or SGLangException) when calling open_session and close_session, and log the caught exception (e) in the processLogger/logging call and return its message; if the library does not expose specific types, catch Exception as e so you still log and return str(e). Ensure changes target the try/except blocks that handle open_session and close_session in the handler methods.lib/llm/src/kv_router/push_router.rs (1)
532-537: Consider logging whenextra_argsis not an object.If
extra_argsis unexpectedly set to a non-object JSON value, theretention_secondsinjection silently fails. This could make debugging cache retention issues difficult.💡 Optional: Add trace logging for unexpected extra_args type
if let Some(ttl) = request.routing.as_ref().and_then(|r| r.cache_control_ttl) { let extra = request.extra_args.get_or_insert_with(|| json!({})); if let serde_json::Value::Object(map) = extra { map.insert("retention_seconds".to_string(), json!(ttl)); + } else { + tracing::trace!( + "extra_args is not an object, skipping retention_seconds injection" + ); } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/llm/src/kv_router/push_router.rs` around lines 532 - 537, The code injects retention_seconds into request.extra_args only when extra_args is a JSON object, but it silently does nothing if extra_args exists and is a non-object; update the block in push_router.rs (the code around request.routing, request.extra_args and cache_control_ttl) to detect the else case where extra_args is present but not a serde_json::Value::Object and emit a trace/debug log (e.g., using tracing::trace! or the project's logger) that includes the unexpected extra_args type and the ttl value; keep the existing insertion behavior for the Object case and do not change behavior when extra_args is absent.lib/llm/src/kv_router/sticky_sessions.rs (1)
64-79: Background reaper task runs indefinitely without cancellation.The
tokio::spawninnew_with_on_expirecreates a task that runs forever. WhenInMemoryAffinityStoreis dropped, the reaper continues to run, holding anArcreference to the map and callback. In practice this may be acceptable sinceKvPushRouterlives for the process lifetime, but it prevents clean shutdown.💡 Optional: Add cancellation token for graceful shutdown
If clean shutdown becomes important, consider passing a
CancellationTokenor storing aJoinHandleto abort the reaper on drop.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/llm/src/kv_router/sticky_sessions.rs` around lines 64 - 79, The reaper task spawned in new_with_on_expire keeps running after InMemoryAffinityStore is dropped because it holds an Arc to the store; modify InMemoryAffinityStore to support graceful shutdown by either (A) adding a CancellationToken field (e.g., tokio_util::sync::CancellationToken) and passing a cloned token into the spawned task so the task breaks its loop when token.cancel() is called, or (B) storing the JoinHandle returned by tokio::spawn on the store and implementing Drop for InMemoryAffinityStore to call handle.abort() (or await a shutdown signal) to stop the task; update new_with_on_expire signature to accept or create the token/handle and ensure Drop properly cancels the reaper to release the Arc and allow clean shutdown.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/src/dynamo/frontend/sglang_prepost.py`:
- Around line 97-110: The function _normalize_prompt_token_ids currently falls
back to list(prompt_token_ids) which, for a dict with a non-list "input_ids"
iterable, will iterate dict keys instead of token IDs; to fix this, update the
logic to detect non-list but iterable input IDs and convert those to a list:
when prompt_token_ids has an attribute input_ids (variable input_ids) and it's
not a list but is iterable, return list(input_ids); likewise, when
prompt_token_ids is a dict and dict_input_ids exists but is not a list, return
list(dict_input_ids) instead of falling through to list(prompt_token_ids); keep
existing paths for list input_ids and other non-iterable types.
In `@components/src/dynamo/frontend/tests/test_sglang_processor_unit.py`:
- Around line 435-441: The test's FakeBatchEncoding class defines input_ids as a
mutable class attribute which can leak state between tests; change
FakeBatchEncoding so input_ids is an instance attribute (e.g., set in __init__)
or make it an immutable tuple, then use that instance when calling
_normalize_prompt_token_ids to avoid shared mutable state and satisfy the
linter.
In `@components/src/dynamo/sglang/init_llm.py`:
- Around line 122-127: The new session_control endpoint created via
session_control_endpoint =
runtime.endpoint(f"{dynamo_args.namespace}.{dynamo_args.component}.session_control")
is not being added to the shutdown_endpoints list, so update the
shutdown/unregister logic to include session_control_endpoint alongside
generate_endpoint (the list referenced as shutdown_endpoints) so both endpoints
are cleaned up on shutdown; locate where generate_endpoint is appended to
shutdown_endpoints and add session_control_endpoint there (and mirror the same
addition at the other occurrence around lines 136-138).
In `@components/src/dynamo/sglang/publisher.py`:
- Around line 13-14: Remove the duplicate import of urlparse: locate the
repeated "from urllib.parse import urlparse" in the sglang publisher module (the
second occurrence around the top of publisher.py) and delete the redundant line
so only the original import remains, ensuring imports are not duplicated and
lint errors are resolved.
In `@docs/backends/sglang/agents.md`:
- Around line 368-400: The example breaks the append-only session contract by
making Turn 3 (resp3) not extend the prior transcript and leaving both resp2 and
resp3 unread while using stream=True; fix by either making resp3 append to the
prior messages (include prior assistant/user turns like t1 and the same
SESSION_ID) and consuming the streamed responses from resp2 and resp3, or change
both client.chat.completions.create calls to stream=False (or set resp2/resp3
stream handling to read the stream fully) so the session control semantics and
KV lifecycle (SESSION_ID with action "close") are correct; update references to
resp2, resp3, SESSION_ID, and the messages arrays accordingly.
- Around line 299-303: The docs state session_control.timeout default is 300 but
the actual default is 30 (see default_session_timeout()), so update the table
entry for `session_control.timeout` to indicate default 30 seconds to match the
code and tests; ensure the description for the `session_control.timeout` field
mentions "default 30" and aligns with the behavior of default_session_timeout()
and the new serde test.
In `@examples/backends/sglang/launch/agg_router.sh`:
- Around line 56-59: The MODEL_OVERRIDE_ARGS here embeds a hard-coded
rope_scaling.factor of 4.0 while CONTEXT_LENGTH is configurable; compute
rope_scaling.factor dynamically as CONTEXT_LENGTH divided by the
original_max_position_embeddings (32768) and inject that value into the here-doc
so rope_scaling.factor = ${CONTEXT_LENGTH}/32768 (as a float). Update the block
that defines MODEL_OVERRIDE_ARGS to reference CONTEXT_LENGTH and derive the
factor variable (keep rope_theta and original_max_position_embeddings unchanged)
so any non-default CONTEXT_LENGTH produces consistent rope settings.
---
Outside diff comments:
In `@components/src/dynamo/frontend/tests/test_sglang_processor_unit.py`:
- Around line 13-31: Add the required pytest markers at module scope by defining
a pytestmark list near the top of test_sglang_processor_unit.py (after the
imports) that includes the scheduling marker, the GPU marker, and the test-type
marker (e.g., pytest.mark.scheduling(...), pytest.mark.gpu, pytest.mark.unit).
Ensure you use the exact marker names used across the repo (matching
pyproject.toml and tests/conftest.py) so collection works in CI; place this
module-level pytestmark before any test definitions so it applies to all tests
in the file.
---
Nitpick comments:
In `@components/src/dynamo/sglang/request_handlers/handler_base.py`:
- Around line 457-475: session_control currently assumes `request` is a dict and
calls request.get which will raise if request is None or not a mapping; guard
the start of session_control to defensively validate `request` (e.g., check
isinstance(request, dict) or hasattr(request, "get") and that "action" exists)
and return a consistent error dict like {"status":"error","message":"malformed
request"} when validation fails, before calling open_session or close_session;
reference the session_control method and the open_session/close_session calls so
you add the validation and early-return logic at the top of session_control.
- Around line 414-416: The broad except Exception blocks in handler_base.py
around the SGLang calls (specifically the try/except wrapping open_session and
close_session usage in the request handlers) should be narrowed: replace except
Exception with the specific SGLang exception types exposed by the library (e.g.,
SGLangError or SGLangException) when calling open_session and close_session, and
log the caught exception (e) in the processLogger/logging call and return its
message; if the library does not expose specific types, catch Exception as e so
you still log and return str(e). Ensure changes target the try/except blocks
that handle open_session and close_session in the handler methods.
In `@components/src/dynamo/sglang/request_handlers/llm/decode_handler.py`:
- Around line 116-119: Add regression tests that verify
extra_args.retention_seconds is forwarded through both async_generate() paths
(aggregated and disaggregated). Create tests that call the handler logic which
uses _session_kwargs() and _retention_kwargs(), mock the LLM engine used by
async_generate() to capture the kwargs passed, and assert that retention_seconds
appears in the session and retention kwargs in both the aggregated and
disaggregated code paths; cover the same behavior referenced around the
extraction of extra_args/retention_seconds and the code paths calling
async_generate() so future changes cannot regress the forwarding.
In `@lib/llm/src/kv_router/push_router.rs`:
- Around line 532-537: The code injects retention_seconds into
request.extra_args only when extra_args is a JSON object, but it silently does
nothing if extra_args exists and is a non-object; update the block in
push_router.rs (the code around request.routing, request.extra_args and
cache_control_ttl) to detect the else case where extra_args is present but not a
serde_json::Value::Object and emit a trace/debug log (e.g., using
tracing::trace! or the project's logger) that includes the unexpected extra_args
type and the ttl value; keep the existing insertion behavior for the Object case
and do not change behavior when extra_args is absent.
In `@lib/llm/src/kv_router/sticky_sessions.rs`:
- Around line 64-79: The reaper task spawned in new_with_on_expire keeps running
after InMemoryAffinityStore is dropped because it holds an Arc to the store;
modify InMemoryAffinityStore to support graceful shutdown by either (A) adding a
CancellationToken field (e.g., tokio_util::sync::CancellationToken) and passing
a cloned token into the spawned task so the task breaks its loop when
token.cancel() is called, or (B) storing the JoinHandle returned by tokio::spawn
on the store and implementing Drop for InMemoryAffinityStore to call
handle.abort() (or await a shutdown signal) to stop the task; update
new_with_on_expire signature to accept or create the token/handle and ensure
Drop properly cancels the reaper to release the Arc and allow clean shutdown.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: cba71e2f-e596-4edc-b538-ecdd9e5b9965
📒 Files selected for processing (22)
components/src/dynamo/common/configuration/groups/kv_router_args.pycomponents/src/dynamo/frontend/sglang_prepost.pycomponents/src/dynamo/frontend/sglang_processor.pycomponents/src/dynamo/frontend/tests/test_sglang_processor_unit.pycomponents/src/dynamo/sglang/init_llm.pycomponents/src/dynamo/sglang/publisher.pycomponents/src/dynamo/sglang/register.pycomponents/src/dynamo/sglang/request_handlers/handler_base.pycomponents/src/dynamo/sglang/request_handlers/llm/decode_handler.pycomponents/src/dynamo/sglang/request_handlers/llm/prefill_handler.pydocs/backends/sglang/agents.mdexamples/backends/sglang/launch/agg_router.shlib/bindings/python/src/dynamo/_core.pyilib/kv-router/src/scheduling/config.rslib/llm/src/kv_router.rslib/llm/src/kv_router/agent_controller.rslib/llm/src/kv_router/cache_control.rslib/llm/src/kv_router/push_router.rslib/llm/src/kv_router/sticky_sessions.rslib/llm/src/preprocessor.rslib/llm/src/protocols/common/preprocessor.rslib/llm/src/protocols/openai/nvext.rs
💤 Files with no reviewable changes (2)
- components/src/dynamo/sglang/register.py
- lib/llm/src/kv_router/cache_control.rs
| def _normalize_prompt_token_ids(prompt_token_ids: Any) -> list[int]: | ||
| if isinstance(prompt_token_ids, list): | ||
| return prompt_token_ids | ||
|
|
||
| input_ids = getattr(prompt_token_ids, "input_ids", None) | ||
| if isinstance(input_ids, list): | ||
| return input_ids | ||
|
|
||
| if isinstance(prompt_token_ids, dict): | ||
| dict_input_ids = prompt_token_ids.get("input_ids") | ||
| if isinstance(dict_input_ids, list): | ||
| return dict_input_ids | ||
|
|
||
| return list(prompt_token_ids) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
For SGLang/Hugging Face tokenizers, what concrete return types can tokenizer.apply_chat_template(..., tokenize=True)produce, and caninput_ids be non-list iterables (e.g., tuple/tensor/array)?
💡 Result:
For Hugging Face tokenizers, tokenizer.apply_chat_template(..., tokenize=True) returns a list of integers (the input_ids) by default. With return_tensors="pt" or "np", it returns a PyTorch tensor or NumPy array (batched if multiple conversations). With return_dict=True, it returns a dictionary containing "input_ids" (list/tensor/array), and optionally "attention_mask" and other fields depending on tokenizer configuration and flags like return_assistant_tokens_mask=True. SGLang uses Hugging Face tokenizers (loaded via tokenizer_manager.tokenizer.apply_chat_template in server code) for models with chat templates, so it produces the same return types. SGLang also has its own custom chat template system (via conversation.py and get_chat_template) for the SGLang language frontend (e.g., RuntimeEndpoint.chat_template), which formats messages into strings but does not directly use apply_chat_template with tokenize=True; tokenization happens separately. input_ids for tokenizer.call (used internally by apply_chat_template when tokenize=True) accepts non-list iterables. The text parameter supports str, list[str], tuple[str], PreTokenizedInput, etc., but when passing pre-tokenized input_ids directly to tokenizer (not via apply_chat_template), it expects list[int] or array-like for return_tensors, and iterables like tuples/arrays/tensors are typically converted if they yield ints.
Citations:
- 1: https://huggingface.co/docs/transformers/en/chat_templating
- 2: https://huggingface.co/docs/transformers/en/main_classes/tokenizer
- 3: https://huggingface.co/docs/transformers/main/chat%5Ftemplating
- 4: https://huggingface.co/docs/transformers/main/en/chat_template_tools_and_documents
- 5: https://readmedium.com/why-do-we-need-apply-chat-template-for-chat-2f7b1d60717d
- 6: https://huggingface.co/docs/transformers/v4.46.3/chat_templating
- 7: https://huggingface.co/docs/tokenizers/main/api/tokenizer
- 8: https://huggingface.co/docs/transformers/v4.33.2/en/main_classes/tokenizer
- 9: https://huggingface.co/docs/transformers/v5.3.0/fast_tokenizers
- 10: https://huggingface.co/docs/tokenizers/python/master/api/reference.html
- 11: Return assistant generated tokens mask in apply_chat_template huggingface/transformers#30650
🏁 Script executed:
# First, locate and examine the actual file
fd "sglang_prepost.py" -t fRepository: ai-dynamo/dynamo
Length of output: 108
🏁 Script executed:
# Read the file to see the actual implementation
if [ -f "components/src/dynamo/frontend/sglang_prepost.py" ]; then
wc -l "components/src/dynamo/frontend/sglang_prepost.py"
fiRepository: ai-dynamo/dynamo
Length of output: 112
🏁 Script executed:
# Get the specific lines mentioned in the review
if [ -f "components/src/dynamo/frontend/sglang_prepost.py" ]; then
sed -n '90,120p' "components/src/dynamo/frontend/sglang_prepost.py" | cat -n
fiRepository: ai-dynamo/dynamo
Length of output: 1143
🏁 Script executed:
# Also check lines 143-145 for usage context
if [ -f "components/src/dynamo/frontend/sglang_prepost.py" ]; then
sed -n '140,150p' "components/src/dynamo/frontend/sglang_prepost.py" | cat -n
fiRepository: ai-dynamo/dynamo
Length of output: 524
🏁 Script executed:
# Search for any tests or usage of _normalize_prompt_token_ids
rg "_normalize_prompt_token_ids" -A 2 -B 2Repository: ai-dynamo/dynamo
Length of output: 2226
🏁 Script executed:
# Search for where this function might be called with different types
rg "apply_chat_template" -t py -A 3 -B 3 | head -50Repository: ai-dynamo/dynamo
Length of output: 4278
🏁 Script executed:
# Look at the test file to understand FakeBatchEncoding and test coverage
if [ -f "components/src/dynamo/frontend/tests/test_sglang_processor_unit.py" ]; then
rg "class FakeBatchEncoding|def test.*normalize" -A 10 "components/src/dynamo/frontend/tests/test_sglang_processor_unit.py"
fiRepository: ai-dynamo/dynamo
Length of output: 456
🏁 Script executed:
# Get more context from the test file around _normalize_prompt_token_ids tests
if [ -f "components/src/dynamo/frontend/tests/test_sglang_processor_unit.py" ]; then
rg "_normalize_prompt_token_ids" -B 5 -A 15 "components/src/dynamo/frontend/tests/test_sglang_processor_unit.py"
fiRepository: ai-dynamo/dynamo
Length of output: 1391
🏁 Script executed:
# Check what template_kwargs contains and how apply_chat_template is called
sed -n '130,160p' "components/src/dynamo/frontend/sglang_prepost.py" | cat -nRepository: ai-dynamo/dynamo
Length of output: 1377
🏁 Script executed:
# Search for any imports of numpy, torch, or array types in sglang_prepost.py
rg "^import|^from" "components/src/dynamo/frontend/sglang_prepost.py" | head -20Repository: ai-dynamo/dynamo
Length of output: 473
Dict fallback can emit dict keys instead of token IDs if input_ids is non-list.
When _normalize_prompt_token_ids receives a dict with "input_ids" as a non-list iterable (e.g., tensor, array, tuple), the function falls through to return list(prompt_token_ids), which iterates dict keys instead of token IDs. While current usage (tokenizer.apply_chat_template(messages, tokenize=True) without return_dict=True) returns a list directly, the function accepts Any and should handle this case defensively.
Suggested fix
def _normalize_prompt_token_ids(prompt_token_ids: Any) -> list[int]:
if isinstance(prompt_token_ids, list):
return prompt_token_ids
input_ids = getattr(prompt_token_ids, "input_ids", None)
- if isinstance(input_ids, list):
- return input_ids
+ if input_ids is not None and not isinstance(input_ids, (str, bytes)):
+ return list(input_ids)
if isinstance(prompt_token_ids, dict):
- dict_input_ids = prompt_token_ids.get("input_ids")
- if isinstance(dict_input_ids, list):
- return dict_input_ids
+ if "input_ids" in prompt_token_ids:
+ dict_input_ids = prompt_token_ids["input_ids"]
+ if not isinstance(dict_input_ids, (str, bytes)):
+ return list(dict_input_ids)
return list(prompt_token_ids)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/src/dynamo/frontend/sglang_prepost.py` around lines 97 - 110, The
function _normalize_prompt_token_ids currently falls back to
list(prompt_token_ids) which, for a dict with a non-list "input_ids" iterable,
will iterate dict keys instead of token IDs; to fix this, update the logic to
detect non-list but iterable input IDs and convert those to a list: when
prompt_token_ids has an attribute input_ids (variable input_ids) and it's not a
list but is iterable, return list(input_ids); likewise, when prompt_token_ids is
a dict and dict_input_ids exists but is not a list, return list(dict_input_ids)
instead of falling through to list(prompt_token_ids); keep existing paths for
list input_ids and other non-iterable types.
| class FakeBatchEncoding: | ||
| input_ids = [11, 22, 33] | ||
|
|
||
| def __iter__(self): | ||
| yield from ("input_ids", "attention_mask") | ||
|
|
||
| assert _normalize_prompt_token_ids(FakeBatchEncoding()) == [11, 22, 33] |
There was a problem hiding this comment.
Avoid a mutable class attribute in FakeBatchEncoding.
input_ids is shared state at class scope here, which is exactly what Ruff is warning about. Making it an instance attribute or a tuple avoids accidental cross-test leakage if this helper ever gets mutated.
♻️ Minimal cleanup
class FakeBatchEncoding:
- input_ids = [11, 22, 33]
+ def __init__(self):
+ self.input_ids = [11, 22, 33]
def __iter__(self):
yield from ("input_ids", "attention_mask")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| class FakeBatchEncoding: | |
| input_ids = [11, 22, 33] | |
| def __iter__(self): | |
| yield from ("input_ids", "attention_mask") | |
| assert _normalize_prompt_token_ids(FakeBatchEncoding()) == [11, 22, 33] | |
| class FakeBatchEncoding: | |
| def __init__(self): | |
| self.input_ids = [11, 22, 33] | |
| def __iter__(self): | |
| yield from ("input_ids", "attention_mask") | |
| assert _normalize_prompt_token_ids(FakeBatchEncoding()) == [11, 22, 33] |
🧰 Tools
🪛 Ruff (0.15.7)
[warning] 436-436: Mutable default value for class attribute
(RUF012)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/src/dynamo/frontend/tests/test_sglang_processor_unit.py` around
lines 435 - 441, The test's FakeBatchEncoding class defines input_ids as a
mutable class attribute which can leak state between tests; change
FakeBatchEncoding so input_ids is an instance attribute (e.g., set in __init__)
or make it an immutable tuple, then use that instance when calling
_normalize_prompt_token_ids to avoid shared mutable state and satisfy the
linter.
| # Serve session_control as a discoverable endpoint so the router's | ||
| # AgentController can find it via component.endpoint("session_control"). | ||
| session_control_endpoint = runtime.endpoint( | ||
| f"{dynamo_args.namespace}.{dynamo_args.component}.session_control" | ||
| ) | ||
|
|
There was a problem hiding this comment.
Track session_control_endpoint in shutdown endpoint list.
Line 86 only tracks generate_endpoint. The newly served session_control_endpoint should also be added to shutdown_endpoints so shutdown/unregister logic treats both endpoints consistently.
Suggested fix
- shutdown_endpoints[:] = [generate_endpoint]
+ shutdown_endpoints[:] = [generate_endpoint]
@@
session_control_endpoint = runtime.endpoint(
f"{dynamo_args.namespace}.{dynamo_args.component}.session_control"
)
+ shutdown_endpoints.append(session_control_endpoint)Also applies to: 136-138
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/src/dynamo/sglang/init_llm.py` around lines 122 - 127, The new
session_control endpoint created via session_control_endpoint =
runtime.endpoint(f"{dynamo_args.namespace}.{dynamo_args.component}.session_control")
is not being added to the shutdown_endpoints list, so update the
shutdown/unregister logic to include session_control_endpoint alongside
generate_endpoint (the list referenced as shutdown_endpoints) so both endpoints
are cleaned up on shutdown; locate where generate_endpoint is appended to
shutdown_endpoints and add session_control_endpoint there (and mirror the same
addition at the other occurrence around lines 136-138).
| from urllib.parse import urlparse | ||
|
|
There was a problem hiding this comment.
Remove the duplicate urlparse import.
urlparse is already imported at Line 8; the second import is redundant and likely to trip lint checks.
Suggested fix
-from urllib.parse import urlparseAs per coding guidelines: “Use consistent code formatting and adhere to the established linting rules defined in the project.”
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| from urllib.parse import urlparse |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/src/dynamo/sglang/publisher.py` around lines 13 - 14, Remove the
duplicate import of urlparse: locate the repeated "from urllib.parse import
urlparse" in the sglang publisher module (the second occurrence around the top
of publisher.py) and delete the redundant line so only the original import
remains, ensuring imports are not duplicated and lint errors are resolved.
| | Field | Type | Description | | ||
| |-------|------|-------------| | ||
| | `session_control.session_id` | `string` | Unique session identifier. Present on every turn. | | ||
| | `session_control.action` | `string` | `"open"` or `"close"`. Omit on intermediate turns. | | ||
| | `session_control.timeout` | `integer` | Inactivity timeout in seconds (default 300). Only used with `action: "open"`. | |
There was a problem hiding this comment.
The documented session timeout default is off by 10x.
default_session_timeout() in lib/llm/src/protocols/openai/nvext.rs returns 30, and the new serde test asserts the same. Leaving default 300 here will cause users who omit timeout to get unexpected session expiry.
📝 Suggested doc fix
-| `session_control.timeout` | `integer` | Inactivity timeout in seconds (default 300). Only used with `action: "open"`. |
+| `session_control.timeout` | `integer` | Inactivity timeout in seconds (default 30). Only used with `action: "open"`. |📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| | Field | Type | Description | | |
| |-------|------|-------------| | |
| | `session_control.session_id` | `string` | Unique session identifier. Present on every turn. | | |
| | `session_control.action` | `string` | `"open"` or `"close"`. Omit on intermediate turns. | | |
| | `session_control.timeout` | `integer` | Inactivity timeout in seconds (default 300). Only used with `action: "open"`. | | |
| | Field | Type | Description | | |
| |-------|------|-------------| | |
| | `session_control.session_id` | `string` | Unique session identifier. Present on every turn. | | |
| | `session_control.action` | `string` | `"open"` or `"close"`. Omit on intermediate turns. | | |
| | `session_control.timeout` | `integer` | Inactivity timeout in seconds (default 30). Only used with `action: "open"`. | |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@docs/backends/sglang/agents.md` around lines 299 - 303, The docs state
session_control.timeout default is 300 but the actual default is 30 (see
default_session_timeout()), so update the table entry for
`session_control.timeout` to indicate default 30 seconds to match the code and
tests; ensure the description for the `session_control.timeout` field mentions
"default 30" and aligns with the behavior of default_session_timeout() and the
new serde test.
| resp2 = client.chat.completions.create( | ||
| model="Qwen/Qwen3-14B-FP8", | ||
| messages=[ | ||
| {"role": "system", "content": SYSTEM}, | ||
| {"role": "user", "content": "Rank Federer's 20 Grand Slam titles by artistic beauty."}, | ||
| {"role": "assistant", "content": t1}, | ||
| {"role": "user", "content": "Now explain why the 2017 Australian Open final against Nadal was the single greatest moment in competitive sports. Include the fifth set backhand winner."}, | ||
| ], | ||
| stream=True, | ||
| extra_body={ | ||
| "nvext": { | ||
| "session_control": {"session_id": SESSION_ID} | ||
| } | ||
| } | ||
| ) | ||
|
|
||
| # Later turns reuse the pinned prefix -- even after heavy load from | ||
| # other requests, the KV cache for this conversation is preserved. | ||
| response = client.chat.completions.create( | ||
| # Turn 3: Close session (KV freed after generation completes) | ||
| resp3 = client.chat.completions.create( | ||
| model="Qwen/Qwen3-14B-FP8", | ||
| messages=[ | ||
| {"role": "system", "content": system_prompt}, | ||
| {"role": "user", "content": "Analyze this codebase and suggest improvements."}, | ||
| {"role": "assistant", "content": assistant_response}, | ||
| {"role": "user", "content": "Now focus on the database layer."}, | ||
| {"role": "system", "content": SYSTEM}, | ||
| {"role": "user", "content": "Compose a closing argument for why Federer's career transcends sport and enters the realm of fine art."}, | ||
| ], | ||
| stream=True, | ||
| extra_body={ | ||
| "nvext": { | ||
| "cache_control": { | ||
| "type": "ephemeral", | ||
| "ttl": "1h" | ||
| "session_control": { | ||
| "session_id": SESSION_ID, | ||
| "action": "close" | ||
| } | ||
| } | ||
| } | ||
| ) |
There was a problem hiding this comment.
The follow-up session example stops matching the session contract.
Turn 3 no longer extends the prior transcript even though the limitations below describe append-only streaming sessions, and both resp2 and resp3 are left unread despite stream=True. Please either show true append-style turns and consume the streams, or switch those calls to stream=False.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@docs/backends/sglang/agents.md` around lines 368 - 400, The example breaks
the append-only session contract by making Turn 3 (resp3) not extend the prior
transcript and leaving both resp2 and resp3 unread while using stream=True; fix
by either making resp3 append to the prior messages (include prior
assistant/user turns like t1 and the same SESSION_ID) and consuming the streamed
responses from resp2 and resp3, or change both client.chat.completions.create
calls to stream=False (or set resp2/resp3 stream handling to read the stream
fully) so the session control semantics and KV lifecycle (SESSION_ID with action
"close") are correct; update references to resp2, resp3, SESSION_ID, and the
messages arrays accordingly.
| CONTEXT_LENGTH="${CONTEXT_LENGTH:-131072}" | ||
| MODEL_OVERRIDE_ARGS=$(cat <<EOF | ||
| {"rope_theta":1000000,"max_position_embeddings":${CONTEXT_LENGTH},"rope_scaling":{"rope_type":"yarn","rope_theta":1000000,"factor":4.0,"original_max_position_embeddings":32768}} | ||
| EOF |
There was a problem hiding this comment.
Derive YaRN scaling from CONTEXT_LENGTH.
The script now advertises CONTEXT_LENGTH as an override, but rope_scaling.factor stays hard-coded at 4.0. Any value other than 131072 will produce inconsistent rope settings and a misconfigured long-context launch.
🐛 Suggested fix
CONTEXT_LENGTH="${CONTEXT_LENGTH:-131072}"
-MODEL_OVERRIDE_ARGS=$(cat <<EOF
-{"rope_theta":1000000,"max_position_embeddings":${CONTEXT_LENGTH},"rope_scaling":{"rope_type":"yarn","rope_theta":1000000,"factor":4.0,"original_max_position_embeddings":32768}}
-EOF
-)
+MODEL_OVERRIDE_ARGS="$(python3 - "$CONTEXT_LENGTH" <<'PY'
+import json
+import sys
+
+context_length = int(sys.argv[1])
+original_max_position_embeddings = 32768
+
+print(json.dumps({
+ "rope_theta": 1000000,
+ "max_position_embeddings": context_length,
+ "rope_scaling": {
+ "rope_type": "yarn",
+ "rope_theta": 1000000,
+ "factor": context_length / original_max_position_embeddings,
+ "original_max_position_embeddings": original_max_position_embeddings,
+ },
+}))
+PY
+)"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| CONTEXT_LENGTH="${CONTEXT_LENGTH:-131072}" | |
| MODEL_OVERRIDE_ARGS=$(cat <<EOF | |
| {"rope_theta":1000000,"max_position_embeddings":${CONTEXT_LENGTH},"rope_scaling":{"rope_type":"yarn","rope_theta":1000000,"factor":4.0,"original_max_position_embeddings":32768}} | |
| EOF | |
| CONTEXT_LENGTH="${CONTEXT_LENGTH:-131072}" | |
| MODEL_OVERRIDE_ARGS="$(python3 - "$CONTEXT_LENGTH" <<'PY' | |
| import json | |
| import sys | |
| context_length = int(sys.argv[1]) | |
| original_max_position_embeddings = 32768 | |
| print(json.dumps({ | |
| "rope_theta": 1000000, | |
| "max_position_embeddings": context_length, | |
| "rope_scaling": { | |
| "rope_type": "yarn", | |
| "rope_theta": 1000000, | |
| "factor": context_length / original_max_position_embeddings, | |
| "original_max_position_embeddings": original_max_position_embeddings, | |
| }, | |
| })) | |
| PY | |
| )" |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/backends/sglang/launch/agg_router.sh` around lines 56 - 59, The
MODEL_OVERRIDE_ARGS here embeds a hard-coded rope_scaling.factor of 4.0 while
CONTEXT_LENGTH is configurable; compute rope_scaling.factor dynamically as
CONTEXT_LENGTH divided by the original_max_position_embeddings (32768) and
inject that value into the here-doc so rope_scaling.factor =
${CONTEXT_LENGTH}/32768 (as a float). Update the block that defines
MODEL_OVERRIDE_ARGS to reference CONTEXT_LENGTH and derive the factor variable
(keep rope_theta and original_max_position_embeddings unchanged) so any
non-default CONTEXT_LENGTH produces consistent rope settings.
Summary
Validation
Summary by CodeRabbit
New Features
Documentation
Chores