Skip to content

Conversation

@adrianlyjak
Copy link
Contributor

@adrianlyjak adrianlyjak commented Oct 21, 2025

  • Removes the broker
  • Splits up the context into three separate components, hidden behind the existing ctx interface
  • extends plugins and renames them to runtimes
  • still in progress dbos integration as an alternate runtime

Open with Devin

@adrianlyjak adrianlyjak force-pushed the adrian/dbos branch 3 times, most recently from f96df50 to 82c5546 Compare October 22, 2025 18:34
@adrianlyjak adrianlyjak marked this pull request as ready for review October 22, 2025 18:36
@adrianlyjak adrianlyjak force-pushed the adrian/context-refact branch 2 times, most recently from 070fba2 to 862274b Compare October 28, 2025 17:31
Base automatically changed from adrian/context-refact to main October 28, 2025 18:32
@changeset-bot
Copy link

changeset-bot bot commented Jan 15, 2026

🦋 Changeset detected

Latest commit: 35194c5

The changes in this PR will be included in the next version bump.

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@coveralls
Copy link

coveralls commented Jan 15, 2026

Pull Request Test Coverage Report for Build 21414581014

Warning: This coverage report may be inaccurate.

This pull request's base commit is no longer the HEAD commit of its target branch. This means it includes changes from outside the original pull request, including, potentially, unrelated coverage changes.

Details

  • 597 of 769 (77.63%) changed or added relevant lines in 16 files are covered.
  • 274 unchanged lines in 21 files lost coverage.
  • Overall coverage decreased (-0.1%) to 91.128%

Changes Missing Coverage Covered Lines Changed/Added Lines %
packages/llama-index-workflows/src/workflows/context/pre_context.py 27 29 93.1%
packages/llama-index-workflows/src/workflows/workflow.py 17 25 68.0%
packages/llama-index-workflows/src/workflows/context/internal_context.py 76 88 86.36%
packages/llama-index-workflows/src/workflows/runtime/types/plugin.py 97 109 88.99%
packages/llama-index-workflows/src/workflows/context/state_store.py 17 37 45.95%
packages/llama-index-workflows/src/workflows/plugins/basic.py 126 148 85.14%
packages/llama-index-workflows/src/workflows/context/external_context.py 67 91 73.63%
packages/llama-index-workflows/src/workflows/context/context.py 60 94 63.83%
packages/llama-index-workflows/src/workflows/handler.py 58 96 60.42%
Files with Coverage Reduction New Missed Lines %
packages/llama-index-workflows/src/workflows/context/context.py 1 65.81%
packages/llama-index-workflows/src/workflows/runtime/types/plugin.py 1 89.08%
src/workflows/resource.py 1 99.46%
tests/server/test_server_endpoints.py 1 99.86%
tests/test_handler.py 1 95.65%
tests/test_streaming.py 1 98.46%
src/workflows/runtime/types/plugin.py 4 96.64%
tests/server/test_idle_release.py 4 98.28%
src/workflows/runtime/types/step_function.py 6 93.33%
src/workflows/workflow.py 6 97.67%
Totals Coverage Status
Change from base Build 21397140588: -0.1%
Covered Lines: 14010
Relevant Lines: 15374

💛 - Coveralls

@adrianlyjak adrianlyjak changed the title WIP: dbos integration with plugins Runtime refactor to support pluggable runtimes Jan 22, 2026
Copy link

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 4 potential issues.

View issues and 14 additional flags in Devin Review.

Open in Devin Review

Comment on lines 1685 to 1713
@property
def status(self) -> Status:
"""Get the current status by inspecting the handler state."""
if not self.run_handler.done():
"""Get the current status by inspecting the terminal event or handler state.
Status is derived from the terminal event type when available:
- WorkflowCancelledEvent -> "cancelled"
- WorkflowTimedOutEvent -> "failed" (timeout is a failure mode)
- WorkflowFailedEvent -> "failed"
- Plain StopEvent -> "completed"
Falls back to checking handler state if no terminal event yet.
"""
# First check if we have a terminal event - derive status from event type
if self._terminal_event is not None:
if isinstance(self._terminal_event, WorkflowCancelledEvent):
return "cancelled"
elif isinstance(self._terminal_event, WorkflowTimedOutEvent):
return "failed"
elif isinstance(self._terminal_event, WorkflowFailedEvent):
return "failed"
else:
return "completed"

# Fall back to handler state check if no terminal event yet
if not self.run_handler.is_done():
return "running"
# done - check if cancelled first
if self.run_handler.cancelled():
return "cancelled"
# then check for exception
exc = self.run_handler.exception()
if exc is not None:
return "failed"
return "completed"
# If handler is done but we don't have a terminal event, it was likely
# cancelled externally or failed before emitting a terminal event
return "running"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 WorkflowServer status can report "running" even after handler completion if no terminal event was observed

In _WorkflowHandler.status, if no terminal event was recorded and run_handler.is_done() is true, the code returns "running" unconditionally.

Actual behavior: completed/failed/cancelled runs can be reported as "running" in persistence/API if the terminal StopEvent was not observed/recorded by _stream_events for any reason.

Expected behavior: if the handler is done and there is no terminal event, the status should be derived from handler completion state (cancelled vs exception vs completed), not forced to "running".

Code: workflows/server/server.py:1685-1713

Recommendation: When run_handler.is_done() is true and _terminal_event is None, fall back to run_handler.cancelled() / run_handler.exception() to classify as cancelled/failed/completed (or at least "failed"/"cancelled"), rather than returning "running".

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@adrianlyjak adrianlyjak force-pushed the adrian/dbos branch 2 times, most recently from b231c6d to 2574a33 Compare January 22, 2026 22:15
Copy link

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View issue and 19 additional flags in Devin Review.

Open in Devin Review

Comment on lines 184 to 217
def __init__(self) -> None:
self._queues: dict[str, AsyncioAdapterQueues] = {}
self._max_concurrent_runs: weakref.WeakValueDictionary[
str, asyncio.Semaphore
] = weakref.WeakValueDictionary()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 BasicRuntime concurrency limiting can silently stop working due to WeakValueDictionary semaphore storage

BasicRuntime stores per-workflow semaphores in a weakref.WeakValueDictionary:

self._max_concurrent_runs: weakref.WeakValueDictionary[str, asyncio.Semaphore]

(basic.py:184-188).

Because the only long-lived reference to each semaphore is the weak dictionary entry, semaphores may be garbage-collected at any time when not currently being awaited. When that happens, the next call to _maybe_acquire_max_concurrent_runs() will create a new semaphore, effectively resetting concurrency limits and allowing more than the configured num_concurrent_runs.

Actual: concurrency limit can be bypassed intermittently/non-deterministically.
Expected: concurrency limit should be enforced consistently for the process lifetime (or at least until runtime.destroy()).

Impact: can exceed intended concurrency caps, causing resource exhaustion and incorrect load-shedding behavior.

Recommendation: Use a normal dict (strong refs) for _max_concurrent_runs, and clear it in destroy(); or otherwise keep strong references for semaphore lifetime management.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View issue and 31 additional flags in Devin Review.

Open in Devin Review

Comment on lines 1911 to 1946
async def cancel_handlers_and_tasks(self, *, graceful: bool = True) -> None:
"""Cancel the handler and release it from the store.
Args:
graceful: If True, request graceful cancellation and wait for
WorkflowCancelledEvent. If False, force immediate cancellation
(used for idle release where we don't want to emit cancel event).
"""
if not self.run_handler.is_done():
if graceful:
try:
# Request graceful cancellation - this will emit WorkflowCancelledEvent
await self.run_handler.cancel_run()
except Exception:
pass
try:
# Wait for the workflow to complete after cancellation
# This gives time for WorkflowCancelledEvent to be emitted
await asyncio.wait_for(self.run_handler, timeout=2.0)
except asyncio.TimeoutError:
# Force cancel if graceful cancellation didn't complete in time
self.run_handler.cancel()
except asyncio.CancelledError:
pass
except Exception:
pass
else:
# Force immediate cancellation without waiting
try:
await self.run_handler.cancel_run()
except Exception:
pass
try:
self.run_handler.cancel()
except Exception:
pass

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Idle release uses graceful cancellation even when graceful=False, contradicting intended semantics

_WorkflowHandler.cancel_handlers_and_tasks(graceful=False) is documented/used for idle release “where we don't want to emit cancel event”, but it still calls await self.run_handler.cancel_run() before hard-cancelling.

Actual: idle release sends a TickCancelRun into the workflow (cancel_run()), which can cause the workflow to emit WorkflowCancelledEvent and transition persisted status to cancelled.
Expected: idle release should stop in-memory execution without changing logical workflow outcome (it should remain resumable/running), or at least avoid emitting cancellation signals.

Code:

  • In non-graceful branch:
await self.run_handler.cancel_run()
...
self.run_handler.cancel()

workflows/server/server.py:1937-1945

Impact: idle release may incorrectly cancel runs instead of just unloading them, breaking resumability and causing incorrect persisted status.

Recommendation: For graceful=False, avoid calling cancel_run(); instead only stop the streaming task and close adapter/resources (or implement a runtime-specific ‘detach/unload’ that doesn’t enqueue TickCancelRun).

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@adrianlyjak adrianlyjak force-pushed the adrian/dbos branch 4 times, most recently from 2b2fe68 to e31a92d Compare January 29, 2026 20:13
@adrianlyjak adrianlyjak changed the base branch from main to adrian/runtime-split January 29, 2026 20:14
@adrianlyjak adrianlyjak changed the title Runtime refactor to support pluggable runtimes DBOS runtime Jan 29, 2026
@adrianlyjak adrianlyjak marked this pull request as draft January 29, 2026 20:14
@adrianlyjak adrianlyjak force-pushed the adrian/dbos branch 7 times, most recently from bc769c2 to 1a9de11 Compare January 31, 2026 18:10
@adrianlyjak adrianlyjak force-pushed the adrian/dbos branch 3 times, most recently from 69da0b3 to 9a296b1 Compare February 1, 2026 03:28
@adrianlyjak adrianlyjak changed the base branch from adrian/ticky to adrian/store-interface February 1, 2026 03:38
@adrianlyjak adrianlyjak force-pushed the adrian/dbos branch 2 times, most recently from 53dab5b to ee2c407 Compare February 2, 2026 05:37
@adrianlyjak adrianlyjak force-pushed the adrian/store-interface branch 2 times, most recently from 6631af8 to 67c0fdc Compare February 2, 2026 20:22
@adrianlyjak adrianlyjak force-pushed the adrian/dbos branch 2 times, most recently from 6d6370c to 250d793 Compare February 2, 2026 20:47
@review-notebook-app
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants