-
Notifications
You must be signed in to change notification settings - Fork 46
DBOS runtime #157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: adrian/store-interface
Are you sure you want to change the base?
DBOS runtime #157
Conversation
f96df50 to
82c5546
Compare
82c5546 to
3000120
Compare
070fba2 to
862274b
Compare
3000120 to
9ee38b4
Compare
🦋 Changeset detectedLatest commit: 35194c5 The changes in this PR will be included in the next version bump. Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
64bfd6d to
ff3da31
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
packages/llama-index-workflows/src/workflows/context/internal_context.py
Show resolved
Hide resolved
| @property | ||
| def status(self) -> Status: | ||
| """Get the current status by inspecting the handler state.""" | ||
| if not self.run_handler.done(): | ||
| """Get the current status by inspecting the terminal event or handler state. | ||
| Status is derived from the terminal event type when available: | ||
| - WorkflowCancelledEvent -> "cancelled" | ||
| - WorkflowTimedOutEvent -> "failed" (timeout is a failure mode) | ||
| - WorkflowFailedEvent -> "failed" | ||
| - Plain StopEvent -> "completed" | ||
| Falls back to checking handler state if no terminal event yet. | ||
| """ | ||
| # First check if we have a terminal event - derive status from event type | ||
| if self._terminal_event is not None: | ||
| if isinstance(self._terminal_event, WorkflowCancelledEvent): | ||
| return "cancelled" | ||
| elif isinstance(self._terminal_event, WorkflowTimedOutEvent): | ||
| return "failed" | ||
| elif isinstance(self._terminal_event, WorkflowFailedEvent): | ||
| return "failed" | ||
| else: | ||
| return "completed" | ||
|
|
||
| # Fall back to handler state check if no terminal event yet | ||
| if not self.run_handler.is_done(): | ||
| return "running" | ||
| # done - check if cancelled first | ||
| if self.run_handler.cancelled(): | ||
| return "cancelled" | ||
| # then check for exception | ||
| exc = self.run_handler.exception() | ||
| if exc is not None: | ||
| return "failed" | ||
| return "completed" | ||
| # If handler is done but we don't have a terminal event, it was likely | ||
| # cancelled externally or failed before emitting a terminal event | ||
| return "running" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 WorkflowServer status can report "running" even after handler completion if no terminal event was observed
In _WorkflowHandler.status, if no terminal event was recorded and run_handler.is_done() is true, the code returns "running" unconditionally.
Actual behavior: completed/failed/cancelled runs can be reported as "running" in persistence/API if the terminal StopEvent was not observed/recorded by _stream_events for any reason.
Expected behavior: if the handler is done and there is no terminal event, the status should be derived from handler completion state (cancelled vs exception vs completed), not forced to "running".
Code: workflows/server/server.py:1685-1713
Recommendation: When run_handler.is_done() is true and _terminal_event is None, fall back to run_handler.cancelled() / run_handler.exception() to classify as cancelled/failed/completed (or at least "failed"/"cancelled"), rather than returning "running".
Was this helpful? React with 👍 or 👎 to provide feedback.
b231c6d to
2574a33
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def __init__(self) -> None: | ||
| self._queues: dict[str, AsyncioAdapterQueues] = {} | ||
| self._max_concurrent_runs: weakref.WeakValueDictionary[ | ||
| str, asyncio.Semaphore | ||
| ] = weakref.WeakValueDictionary() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔴 BasicRuntime concurrency limiting can silently stop working due to WeakValueDictionary semaphore storage
BasicRuntime stores per-workflow semaphores in a weakref.WeakValueDictionary:
self._max_concurrent_runs: weakref.WeakValueDictionary[str, asyncio.Semaphore](basic.py:184-188).
Because the only long-lived reference to each semaphore is the weak dictionary entry, semaphores may be garbage-collected at any time when not currently being awaited. When that happens, the next call to _maybe_acquire_max_concurrent_runs() will create a new semaphore, effectively resetting concurrency limits and allowing more than the configured num_concurrent_runs.
Actual: concurrency limit can be bypassed intermittently/non-deterministically.
Expected: concurrency limit should be enforced consistently for the process lifetime (or at least until runtime.destroy()).
Impact: can exceed intended concurrency caps, causing resource exhaustion and incorrect load-shedding behavior.
Recommendation: Use a normal dict (strong refs) for _max_concurrent_runs, and clear it in destroy(); or otherwise keep strong references for semaphore lifetime management.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| async def cancel_handlers_and_tasks(self, *, graceful: bool = True) -> None: | ||
| """Cancel the handler and release it from the store. | ||
| Args: | ||
| graceful: If True, request graceful cancellation and wait for | ||
| WorkflowCancelledEvent. If False, force immediate cancellation | ||
| (used for idle release where we don't want to emit cancel event). | ||
| """ | ||
| if not self.run_handler.is_done(): | ||
| if graceful: | ||
| try: | ||
| # Request graceful cancellation - this will emit WorkflowCancelledEvent | ||
| await self.run_handler.cancel_run() | ||
| except Exception: | ||
| pass | ||
| try: | ||
| # Wait for the workflow to complete after cancellation | ||
| # This gives time for WorkflowCancelledEvent to be emitted | ||
| await asyncio.wait_for(self.run_handler, timeout=2.0) | ||
| except asyncio.TimeoutError: | ||
| # Force cancel if graceful cancellation didn't complete in time | ||
| self.run_handler.cancel() | ||
| except asyncio.CancelledError: | ||
| pass | ||
| except Exception: | ||
| pass | ||
| else: | ||
| # Force immediate cancellation without waiting | ||
| try: | ||
| await self.run_handler.cancel_run() | ||
| except Exception: | ||
| pass | ||
| try: | ||
| self.run_handler.cancel() | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 Idle release uses graceful cancellation even when graceful=False, contradicting intended semantics
_WorkflowHandler.cancel_handlers_and_tasks(graceful=False) is documented/used for idle release “where we don't want to emit cancel event”, but it still calls await self.run_handler.cancel_run() before hard-cancelling.
Actual: idle release sends a TickCancelRun into the workflow (cancel_run()), which can cause the workflow to emit WorkflowCancelledEvent and transition persisted status to cancelled.
Expected: idle release should stop in-memory execution without changing logical workflow outcome (it should remain resumable/running), or at least avoid emitting cancellation signals.
Code:
- In non-graceful branch:
await self.run_handler.cancel_run()
...
self.run_handler.cancel()workflows/server/server.py:1937-1945
Impact: idle release may incorrectly cancel runs instead of just unloading them, breaking resumability and causing incorrect persisted status.
Recommendation: For graceful=False, avoid calling cancel_run(); instead only stop the streaming task and close adapter/resources (or implement a runtime-specific ‘detach/unload’ that doesn’t enqueue TickCancelRun).
Was this helpful? React with 👍 or 👎 to provide feedback.
2b2fe68 to
e31a92d
Compare
3f995dc to
380caf0
Compare
e6ed713 to
3149d66
Compare
7019925 to
9a025f9
Compare
bc769c2 to
1a9de11
Compare
8c4d60a to
30278d7
Compare
69da0b3 to
9a296b1
Compare
53dab5b to
ee2c407
Compare
6631af8 to
67c0fdc
Compare
6d6370c to
250d793
Compare
* rename llama-index-workflows-client -> llama-agents-client * integration tests too
250d793 to
35194c5
Compare
|
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
Uh oh!
There was an error while loading. Please reload this page.