Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ jobs:
llama-agents-dev,
llama-agents-client,
llama-agents-server,
"llama-agents-dbos"
]
exclude:
- package: llama-agents-dbos
python-version: "3.9"
# Integration tests on 3.14 run in test-docker job with cross-package coverage
- package: llama-agents-integration-tests
python-version: "3.9"
Expand Down Expand Up @@ -56,6 +59,8 @@ jobs:
package_dir: packages/llama-agents-client
- package: llama-agents-server
package_dir: packages/llama-agents-server
- package: llama-agents-dbos
package_dir: packages/llama-agents-dbos

steps:
- uses: actions/checkout@v4
Expand Down
12 changes: 10 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ We use **pytest** with idiomatic pytest patterns. Follow these guidelines:
## Coding Style

- Always use `from __future__ import annotations` at the top of each test file. Never use string annotations.
- Include the standard SPDX license header at the top of each test file.
- Include the standard SPDX license header at the top of each file:
```python
# SPDX-License-Identifier: MIT
# Copyright (c) 2026 LlamaIndex Inc.
```
- Comments are useful, but avoid fluff.
- Never use inline imports unless required to prevent circular dependencies.
- Import etiquette
- **Never use inline imports**
- **Never use `if TYPE_CHECKING` imports**
- Exceptions to these rules are made only when there are A) Acceptable circular imports or B) real startup performance issues
- Only add `__init__.py` `__all__` exports when a file is legitimately needed for public library consumption. Module level imports should not be used internally. For the most part you should never do this unless explicitly requested to do so
2 changes: 2 additions & 0 deletions examples/dbos_durability/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.last_run_id
*.sqlite3
141 changes: 141 additions & 0 deletions examples/dbos_durability/counter_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#!/usr/bin/env python3
"""
DBOS Counter Example

Simple looping workflow that increments a counter until it reaches 20.

Usage:
python -m examples.dbos_durability.counter_example # Start new
python -m examples.dbos_durability.counter_example --resume # Resume last
python -m examples.dbos_durability.counter_example --clean # Reset state

Try Ctrl+C mid-run to test resume behavior.
"""

from __future__ import annotations

import argparse
import asyncio
import os
import signal
import threading
import time
import uuid
from pathlib import Path

from dbos import DBOS
from llama_agents.dbos import DBOSRuntime
from pydantic import Field
from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StopEvent

_DIR = Path(__file__).parent
_DB_FILE = _DIR / ".dbos_data.sqlite3"
_RUN_FILE = _DIR / ".last_run_id"


class Tick(Event):
count: int = Field(description="Current count")


class CounterResult(StopEvent):
final_count: int = Field(description="Final counter value")


class CounterWorkflow(Workflow):
"""Looping counter workflow - increments until reaching 20."""

@step
async def start(self, ctx: Context, ev: StartEvent) -> Tick:
await ctx.store.set("count", 0)
print("[Start] Initializing counter to 0")
return Tick(count=0)

@step
async def increment(self, ctx: Context, ev: Tick) -> Tick | CounterResult:
count = ev.count + 1
await ctx.store.set("count", count)
print(f"[Tick {count:2d}] count = {count}")

if count >= 20:
return CounterResult(final_count=count)

await asyncio.sleep(0.5)
return Tick(count=count)


def run(run_id: str) -> None:
"""Run the counter workflow."""
DBOS(
config={
"name": "counter-example",
"system_database_url": f"sqlite+pysqlite:///{_DB_FILE}?check_same_thread=false",
"run_admin_server": False,
}
)

runtime = DBOSRuntime()
workflow = CounterWorkflow(runtime=runtime)
runtime.launch()

interrupted = False

def handle_sigint(signum: int, frame: object) -> None:
nonlocal interrupted
if interrupted:
# Second Ctrl+C - force exit
os._exit(130)
interrupted = True
print("\nInterrupted - workflow state saved. Use --resume to continue.")

def delayed_exit() -> None:
time.sleep(0.1)
os._exit(130)

threading.Thread(target=delayed_exit, daemon=True).start()

# Install signal handler before running
signal.signal(signal.SIGINT, handle_sigint)

async def _run() -> None:
result = await workflow.run(run_id=run_id)
print(f"\nResult: final_count = {result.final_count}")

try:
asyncio.run(_run())
except (KeyboardInterrupt, SystemExit):
pass # Already handled by signal handler
finally:
if not interrupted:
try:
runtime.destroy()
except Exception:
pass


def main() -> None:
parser = argparse.ArgumentParser(description="DBOS Counter Example")
parser.add_argument("--resume", action="store_true", help="Resume last workflow")
parser.add_argument("--clean", action="store_true", help="Remove state files")
args = parser.parse_args()

if args.clean:
for f in [_DB_FILE, _RUN_FILE]:
if f.exists():
f.unlink()
print(f"Removed {f}")
return

if args.resume and _RUN_FILE.exists():
run_id = _RUN_FILE.read_text().strip()
print(f"Resuming: {run_id}")
else:
run_id = f"counter-{uuid.uuid4().hex[:8]}"
_RUN_FILE.write_text(run_id)
print(f"Starting: {run_id}")

run(run_id)


if __name__ == "__main__":
main()
44 changes: 44 additions & 0 deletions packages/llama-agents-dbos/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# LlamaAgents DBOS Runtime

DBOS durable runtime plugin for LlamaIndex Workflows.

## Installation

```bash
pip install llama-agents-dbos
```

## Usage

```python
from llama_agents.dbos import DBOSRuntime
from dbos import DBOS, DBOSConfig
from workflows import Workflow, step, StartEvent, StopEvent

# Configure DBOS
config: DBOSConfig = {
"name": "my-app",
"system_database_url": "postgresql://...",
}
DBOS(config=config)

# Create runtime and workflow
runtime = DBOSRuntime()

class MyWorkflow(Workflow):
@step
async def my_step(self, ev: StartEvent) -> StopEvent:
return StopEvent(result="done")

workflow = MyWorkflow(runtime=runtime)

# Launch runtime and run workflow
runtime.launch()
result = await workflow.run()
```

## Features

- Durable workflow execution backed by DBOS
- Automatic step recording and replay
- Distributed workers and recovery support
32 changes: 32 additions & 0 deletions packages/llama-agents-dbos/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Root conftest.py - shared test utilities for all test directories.

This file is discovered by pytest and provides common utilities
for both tests/ (SQLite) and tests_postgres/ (PostgreSQL).
"""

from __future__ import annotations

from pathlib import Path

from dbos import DBOSConfig


def make_test_dbos_config(
name: str,
db_path: Path,
) -> DBOSConfig:
"""Create a DBOS config for testing with sensible defaults (SQLite backend).

Args:
name: The application name for DBOS.
db_path: Path to the SQLite database file.

Returns:
A DBOSConfig dictionary ready for use with DBOS().
"""
system_db_url = f"sqlite+pysqlite:///{db_path}?check_same_thread=false"
return {
"name": name,
"system_database_url": system_db_url,
"run_admin_server": False,
}
7 changes: 7 additions & 0 deletions packages/llama-agents-dbos/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name": "llama-agents-dbos",
"version": "0.1.0",
"private": true,
"license": "MIT",
"scripts": {}
}
43 changes: 43 additions & 0 deletions packages/llama-agents-dbos/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[build-system]
requires = ["uv_build>=0.9.10,<0.10.0"]
build-backend = "uv_build"

[dependency-groups]
dev = [
"basedpyright>=1.31.1",
"pytest>=8.4.0",
"pytest-asyncio>=1.0.0",
"pytest-cov>=6.1.1",
"pytest-timeout>=2.4.0",
"pytest-xdist>=3.0.0",
"ty>=0.0.1,<0.0.9"
]

[project]
name = "llama-agents-dbos"
version = "0.1.0"
description = "DBOS durable runtime plugin for LlamaIndex Workflows"
readme = "README.md"
license = "MIT"
requires-python = ">=3.9"
dependencies = [
"dbos>=2.10.0; python_full_version >= '3.10.0'",
"llama-index-workflows>=2.12.0,<3.0.0"
]

[tool.basedpyright]
typeCheckingMode = "standard"
pythonVersion = "3.10"

[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "module"
asyncio_default_test_loop_scope = "module"
testpaths = ["tests"]
addopts = "-nauto --timeout=10"

[tool.uv.build-backend]
module-name = "llama_agents.dbos"

[tool.uv.sources]
llama-index-workflows = {workspace = true}
13 changes: 13 additions & 0 deletions packages/llama-agents-dbos/src/llama_agents/dbos/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2026 LlamaIndex Inc.
"""
DBOS plugin for LlamaIndex Workflows.

Provides durable workflow execution backed by DBOS with SQL state storage.
"""

from __future__ import annotations

from .runtime import DBOSRuntime

__all__ = ["DBOSRuntime"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# SPDX-License-Identifier: MIT
# Copyright (c) 2026 LlamaIndex Inc.
"""Journal module for recording task completion order."""
Loading