This example demonstrates workflows that are automatically triggered by events.
- Event-triggered workflows with
trigger_on_event - Event batching for high-throughput scenarios
- Publishing events with
ctx.step.publish_event - Waiting for events with
ctx.step.wait_for_event
- Order processing pipelines
- User onboarding flows
- Real-time notifications
- Event-driven architectures
workflows.py- Event-triggered workflow definitionsworker.py- Worker that registers workflows
-
Start the Polos server:
polos server start
-
Install dependencies:
# Using uv (recommended) uv sync # Or using pip pip install -e .
-
Set up environment variables:
cp .env.example .env
-
Run the worker:
python worker.py
@workflow(id="on_order_created", trigger_on_event="orders/created")
async def on_order_created(ctx, payload):
events = payload.get("events", [])
event = events[0] # Get the first event
order_data = event.get("data", {})
# Process the order...@workflow(
id="batch_processor",
trigger_on_event="data/updates",
batch_size=10, # Up to 10 events per batch
batch_timeout_seconds=30, # Or trigger after 30 seconds
)
async def batch_processor(ctx, payload):
events = payload.get("events", [])
for event in events:
# Process each event in the batch
passFrom within a workflow:
await ctx.step.publish_event(
"step_key",
topic="orders/created",
data={"order_id": "123", "total": 99.99},
event_type="created",
)Pause workflow until an event arrives:
event = await ctx.step.wait_for_event(
"wait_for_approval",
topic="approvals/order-123",
timeout=3600, # 1 hour timeout
)
# event.topic, event.event_type, event.dataTopics are hierarchical strings:
| Topic Pattern | Description |
|---|---|
orders/created |
Order creation events |
users/signup |
User signup events |
approvals/{id} |
Approval for specific ID |
notifications |
General notifications |
When an event triggers a workflow, the payload contains:
{
"events": [
{
"sequence_id": 123,
"topic": "orders/created",
"event_type": "created",
"data": {...}, # Your event data
"created_at": "2024-01-01T12:00:00Z"
}
]
}For batched workflows, events contains multiple event objects.
@workflow(id="requester")
async def requester(ctx, payload):
request_id = await ctx.step.uuid("request_id")
# Publish request
await ctx.step.publish_event(
"send_request",
topic=f"requests/{request_id}",
data={"action": "process"},
)
# Wait for response
response = await ctx.step.wait_for_event(
"wait_response",
topic=f"responses/{request_id}",
timeout=300,
)
return response.data- Use specific topics - Avoid overly generic topics
- Set appropriate timeouts - For wait_for_event calls
- Handle batches - Design for receiving multiple events
- Idempotency - Events may be delivered more than once