Skip to content

Add Elixir/OTP implementation of AbxBus event system#12

Open
pirate wants to merge 54 commits intomainfrom
claude/elixir-otp-rewrite-dAaQn
Open

Add Elixir/OTP implementation of AbxBus event system#12
pirate wants to merge 54 commits intomainfrom
claude/elixir-otp-rewrite-dAaQn

Conversation

@pirate
Copy link
Copy Markdown
Member

@pirate pirate commented Apr 1, 2026

Summary

This PR introduces a complete Elixir/OTP implementation of AbxBus, an event bus system with queue-jump, multi-bus forwarding, and lineage tracking. The implementation ports the core functionality from the Python reference implementation while leveraging Elixir's process model and OTP patterns for concurrency control.

Key Changes

  • Core event bus engine (BusServer): GenServer managing event queues, concurrency enforcement (parallel/bus-serial/global-serial), and handler dispatch
  • Event worker (EventWorker): Per-event process executing handlers with timeout enforcement and result tracking
  • Event store (EventStore): ETS-backed registry for O(1) event lookup, parent-child indexing, and completion notification
  • Lock manager (LockManager): Centralized concurrency policy enforcement including global-serial coordination and named semaphores
  • Event model (Event): Base event struct with reserved metadata fields and helper functions
  • Handler tracking (HandlerEntry, HandlerResult): Registration and result tracking for event handlers
  • Public API (AbxBus): High-level functions for bus lifecycle, event emission, handler registration, and awaiting
  • Comprehensive test suite: 10+ test files covering locking, timeouts, forwarding, parent tracking, queue-jump mechanics, and cross-runtime parity

Notable Implementation Details

  • Queue-jump without re-entrant locks: Leverages BEAM's process model—parent handler blocks in receive (not holding GenServer state), allowing child events to process independently without deadlock
  • Process dictionary for handler context: Automatic parent tracking via Process.put(:abx_current_event_id, ...) replaces Python's ContextVar approach
  • ETS-based event registry: Provides O(1) lookups and efficient parent-child relationship tracking
  • Timeout enforcement: Event-level and handler-level timeouts with proper cancellation semantics
  • Forwarding support: Events can be forwarded to other buses with same event_id (not parent-child)
  • Backpressure handling: History size limits with configurable drop behavior
  • Idle detection: wait_until_idle/1 blocks until all pending and in-flight events complete

The implementation maintains API parity with the Python reference while adapting concurrency patterns to OTP idioms (GenServer for bus-serial, LockManager for global-serial, bare processes for parallel).

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx


Open with Devin

Summary by cubic

Introduces an OTP‑native event bus :abxbus with queue‑jump, multi‑bus forwarding, lineage tracking, and cross‑runtime JSON parity. Adds deterministic UUIDv5 handler IDs, result type enforcement, and populates eventbus_id on per‑handler results; find() is faster and scales via a type‑indexed ETS lookup.

  • New Features

    • Recursion‑depth guard (max_handler_recursion_depth, default 3) to stop infinite emit→await loops.
    • WAL middleware (Abxbus.Middlewares.WAL) appends completed events to a JSONL‑like log; per‑bus config in ETS.
    • JSON parity helpers: Abxbus.to_json/1, from_json/2, to_json_string/1, from_json_string/2 (uses Jason if present).
    • Real‑time handler result status updates in ETS; find() returns the most recent match.
    • Deterministic handler IDs via UUIDv5 (cross‑runtime consistent), optional explicit ID, handler_registered_at as ISO datetime; eventbus_id now set on EventResult.
    • Result type enforcement for event_result_type (:string, :integer, :number, :boolean, :atom, :list, :map, :nil, :any, or struct module); mismatches recorded as :error with Abxbus.EventHandlerResultSchemaError.
  • Bug Fixes

    • Retries respect retry_on_errors for exits; robust forwarding merge across buses preserves status/results and merges event_path.
    • Global locks/semaphores: synchronous global release + broadcast; monitor holders and auto‑release on :DOWN; precise per‑holder release; bus‑scoped semaphore keys.
    • Await/find: register waiters before checks; clean waiters on timeout/already‑completed; re‑read from ETS before notifying; normalize string patterns; support transitive child_of; return most‑recent past match.
    • Depth tracking cleaned only when event_pending_bus_count is 0; safe JSON atom handling; wildcard forwarding loop prevention.
    • find() performance: added a type‑indexed ETS secondary table to avoid O(n) scans; queries now scale with events per type and no longer flake under large tables.
    • Handler lookup guard fixed to include string‑pattern handlers by comparing key counts (not entry counts), preventing silent drops.

Written for commit 6b6499c. Summary will update on new commits.

claude added 2 commits April 1, 2026 02:25
Full idiomatic Elixir implementation leveraging BEAM primitives:
- GenServer-per-bus with process-per-event workers (no re-entrant locks needed)
- Queue-jump via direct message passing (bypasses bus-serial concurrency)
- ETS-backed EventStore with find() supporting past/future search windows
- Global serial lock via GenServer mailbox (replaces Python's ContextVar semaphores)
- Automatic parent-child tracking via process dictionary (replaces ContextVars)
- Handler concurrency (parallel/serial) via Task.async/yield
- Handler completion modes (first/all) with proper cancellation
- Event/handler/bus-level timeout enforcement via Task.yield
- Multi-bus forwarding with path tracking and circular detection
- Named semaphores for cross-bus handler concurrency limits
- DynamicSupervisor tree for crash isolation and recovery

Comprehensive test suite porting key patterns from Python:
- Locking: global-serial, bus-serial, parallel, overrides, queue-jump
- Forwarding: completion race, self-parent prevention, defaults inheritance
- Comprehensive patterns: no-overshoot, dispatch-multiple-await-one
- Timeouts: hard caps, handler vs event, aborted vs cancelled
- Parent tracking: auto lineage, cross-bus, explicit preservation
- Handler modes: first/all completion, serial/parallel concurrency
- Find: past/future windows, child_of, where predicates, cross-bus lineage
- Cross-runtime: lineage preservation, zero-history, context propagation

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
- Remove external deps (uuid, telemetry) for offline builds
- Fix :queue.peek return format (returns {:value, item} not tuple)
- Fix ETS search (use tab2list instead of non-existent foldl)
- Fix monotonic time cutoff in find() (negative timestamps)
- Fix process dictionary propagation across Task.async and spawn
- Fix global-serial lock: broadcast check_pending to all buses on release
- Fix first-mode result extraction for falsy values (false != nil)
- Fix event timeout to preserve partial handler results via shared ETS
- Fix LockManager try_acquire to use caller ref not self()
- Add .gitignore for build artifacts
- Adjust test timeouts for BEAM process spawn overhead

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
devin-ai-integration[bot]

This comment was marked as resolved.

Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

19 issues found across 26 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="abx_bus/test/cross_runtime_features_test.exs">

<violation number="1" location="abx_bus/test/cross_runtime_features_test.exs:112">
P2: The test claims to validate per-event serial handler execution, but it only asserts global concurrency and can miss regressions in event-level handler serialness.</violation>

<violation number="2" location="abx_bus/test/cross_runtime_features_test.exs:122">
P2: Sleeping exactly the configured timeout creates a flaky boundary-condition test; use a duration clearly above the timeout.</violation>
</file>

<file name="abx_bus/lib/abx_bus/event.ex">

<violation number="1" location="abx_bus/lib/abx_bus/event.ex:104">
P1: `new/2` passes metadata override keys into `struct!/2`, causing runtime errors when callers provide allowed `event_*` overrides.</violation>
</file>

<file name="abx_bus/test/event_bus_find_test.exs">

<violation number="1" location="abx_bus/test/event_bus_find_test.exs:95">
P2: This test uses a fixed sleep for synchronization, which makes it timing-sensitive and can cause flaky failures in CI.</violation>
</file>

<file name="abx_bus/test/event_handler_test.exs">

<violation number="1" location="abx_bus/test/event_handler_test.exs:20">
P2: This test claims to verify cancellation of non-winning handlers, but it only checks the winning result. Add an assertion that slow/losing handlers are canceled; otherwise cancellation regressions can pass unnoticed.</violation>
</file>

<file name="abx_bus/lib/abx_bus.ex">

<violation number="1" location="abx_bus/lib/abx_bus.ex:124">
P1: Race condition (TOCTOU) between the completion check and waiter registration. If the event completes between `EventStore.get` and `EventStore.add_waiter`, the waiter is registered after `notify_waiters` already fired, so `await` hangs forever.

Fix: `add_waiter` should atomically check completion status and either return the completed event immediately or register the waiter under a single serialization point (e.g., a GenServer call to EventStore, or re-check after insert).</violation>
</file>

<file name="abx_bus/test/parent_tracking_test.exs">

<violation number="1" location="abx_bus/test/parent_tracking_test.exs:81">
P2: This test has no assertion for the stated behavior, so regressions in explicit `parent_id` handling would go undetected.</violation>
</file>

<file name="abx_bus/config/config.exs">

<violation number="1" location="abx_bus/config/config.exs:4">
P2: These `default_*` config values are currently unused, so this new config block does not affect bus behavior and can mislead users into thinking defaults are configurable here.</violation>
</file>

<file name="abx_bus/test/dispatch_defaults_test.exs">

<violation number="1" location="abx_bus/test/dispatch_defaults_test.exs:52">
P2: The max-concurrency tracker uses a non-atomic compare/set and can lose updates, making this test flaky and potentially masking real concurrency violations.</violation>
</file>

<file name="abx_bus/lib/abx_bus/event_store.ex">

<violation number="1" location="abx_bus/lib/abx_bus/event_store.ex:40">
P1: Non-atomic read-modify-write on a public ETS table. `update/2` does `get` → `Map.merge` → `put`, but since these are called directly from `EventWorker` and `BusServer` processes concurrently, a racing update will silently overwrite the other's changes (e.g., handler results stomp a status transition or vice versa). Serialize updates through the GenServer, or restructure to use `:ets.update_element/3` for field-level atomic updates.</violation>

<violation number="2" location="abx_bus/lib/abx_bus/event_store.ex:53">
P1: Same non-atomic read-modify-write race as `update/2`. `update_fun/2` is used in `BusServer` to decrement `event_pending_bus_count` — a lost update here means the counter never reaches zero and the event is never marked completed.</violation>

<violation number="3" location="abx_bus/lib/abx_bus/event_store.ex:195">
P1: Race condition between past search and future waiter registration. An event emitted between `search_past` returning nil and `add_find_waiter` being called will be missed entirely — it's not in the past scan results and the waiter isn't registered yet for `resolve_find_waiters` to notify. Register the waiter *before* scanning past events, then clean it up if a past match is found.</violation>
</file>

<file name="abx_bus/test/event_bus_forwarding_test.exs">

<violation number="1" location="abx_bus/test/event_bus_forwarding_test.exs:28">
P2: The circular-forwarding test sets up only a linear chain, so it does not validate stale in-flight cleanup under an actual forwarding cycle.</violation>

<violation number="2" location="abx_bus/test/event_bus_forwarding_test.exs:113">
P2: This conditional makes the concurrency assertion optional; the test can pass without validating the expected behavior.</violation>
</file>

<file name="abx_bus/test/event_bus_timeout_test.exs">

<violation number="1" location="abx_bus/test/event_bus_timeout_test.exs:94">
P2: This test uses handler sleep durations equal to the event timeout, which makes the assertion nondeterministic at the timeout boundary.</violation>
</file>

<file name="abx_bus/lib/abx_bus/bus_server.ex">

<violation number="1" location="abx_bus/lib/abx_bus/bus_server.ex:185">
P2: TOCTOU race in `ensure_bus_pid_table/0`: two concurrent `init/1` calls can both see `:undefined` and race on `:ets.new`, crashing the second caller. Wrap the creation in a `try`/`catch` to handle the already-exists case.</violation>

<violation number="2" location="abx_bus/lib/abx_bus/bus_server.ex:232">
P1: History list is never trimmed when `max_history_drop` is `true` (the default). The `check_backpressure` function allows events through, but no code ever drops old entries. This is an unbounded memory leak. Trim the list after prepending.</violation>

<violation number="3" location="abx_bus/lib/abx_bus/bus_server.ex:448">
P1: `spawn_link` without trapping exits will crash the BusServer GenServer if any EventWorker fails unexpectedly. Use `Process.monitor/1` with `spawn/1` instead so the GenServer receives a `{:DOWN, ref, ...}` info message that can be handled gracefully, rather than being killed by the linked exit.</violation>

<violation number="4" location="abx_bus/lib/abx_bus/bus_server.ex:518">
P2: `length(state.history)` is O(n) and is called on every `emit`. As history grows this becomes a bottleneck. Track the count in a separate field or use a bounded data structure.</violation>
</file>
Architecture diagram
sequenceDiagram
    participant Client
    participant API as AbxBus (Public API)
    participant Bus as BusServer (GenServer)
    participant Store as EventStore (ETS)
    participant Lock as LockManager
    participant Worker as EventWorker (Process)
    participant Handler as User Callback

    Note over Client,Handler: NEW: Event Emission Flow
    Client->>API: emit(bus, event)
    API->>API: NEW: Resolve parent_id (Process Dictionary)
    API->>Bus: call :emit
    Bus->>Store: NEW: put(event) & index to bus
    Bus->>Bus: Enqueue event (O(1) :queue)
    
    Note over Bus,Worker: NEW: Execution Cycle
    Bus->>Lock: NEW: Resolve concurrency policy
    opt Concurrency available (e.g., :bus_serial)
        Bus->>Worker: NEW: spawn_link(run/5)
        Worker->>Worker: NEW: Set Context (EventID/Bus)
        Worker->>Handler: execute callback
        
        Note over Handler,Worker: NEW: Queue-Jump Logic
        opt Nested Emission & Await
            Handler->>API: emit(child_event)
            API->>Bus: call :emit (Child)
            Handler->>API: await(child_event)
            API->>Bus: NEW: cast :jump_queue (event_id)
            Bus->>Bus: NEW: Dequeue child & spawn Worker immediately
            Bus->>Worker: spawn_link(child)
            Worker->>Store: update status :completed
            Store-->>API: Notify completion (ref)
            API-->>Handler: Return child result
        end

        Handler-->>Worker: result
        Worker-->>Bus: NEW: send :event_worker_done
    end

    alt NEW: Timeout Path
        Worker->>Worker: Monitors event/handler timers
        opt Timeout reached
            Worker->>Worker: Terminate task
            Worker->>Store: NEW: Mark as :error (Aborted/Cancelled)
            Worker-->>Bus: send :event_worker_done
        end
    end

    Bus->>Bus: NEW: Dequeue next event
    Note over Bus,Store: History management & Backpressure
Loading

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Comment thread abxbus-elixir/lib/abxbus/event.ex Outdated
Comment thread abx_bus/lib/abx_bus.ex Outdated
Comment thread abx_bus/lib/abx_bus/event_store.ex Outdated
Comment thread abxbus-elixir/lib/abxbus/event_store.ex
Comment thread abxbus-elixir/lib/abxbus/event_store.ex
Comment thread abx_bus/test/event_bus_forwarding_test.exs Outdated
Comment thread abx_bus/test/event_bus_forwarding_test.exs Outdated
Comment thread abxbus-elixir/test/event_bus_timeout_test.exs Outdated
Comment thread abx_bus/lib/abx_bus/bus_server.ex Outdated
Comment thread abxbus-elixir/lib/abxbus/bus_server.ex
claude added 2 commits April 1, 2026 04:48
New features:
- event_version and event_result_type fields on all events
- event_reset() to create fresh pending copy for re-emission
- defevent macro in AbxBus.Event for quick event module definitions
- off() handler unregistration (by type, function ref, or handler ID)
- log_tree() ASCII tree renderer for event hierarchy visualization
- event_is_child_of/2, event_is_parent_of/2 lineage checks
- event_result/2, event_results_list/2 with filtering options
- EventBusMiddleware behaviour (on_event_change, on_event_result_change, on_bus_handlers_change)
- Bridge behaviour for transport integrations
- Retry enhancements: retry_after, retry_backoff_factor, retry_on_errors
- Slow event/handler warning monitors via Logger
- History trimming on event completion (bounded memory)
- String event type pattern support in on() and find()
- handler_file_path detection on HandlerEntry
- handler_slow_timeout per-handler override
- history_count field for O(1) backpressure checks

Bug fixes from PR review:
- Fix TOCTOU race in await/event_completed (register waiter before status check)
- Fix handler_timeout mailbox corruption (add unique ref to messages)
- Fix ETS :abx_worker_results table creation race (create in EventStore.init)
- Fix spawn_link crash propagation (use spawn + Process.monitor)
- Fix ETS table creation TOCTOU (try/rescue instead of check-then-create)
- Fix do_await_first for comprehension scoping (use Enum.reduce)
- Fix find() TOCTOU (register future waiter before past search)
- Serialize EventStore updates through GenServer (prevent read-modify-write races)

Architecture changes:
- Remove all built-in forward_to/forwarding_targets machinery
  Forwarding is now done idiomatically via on("*", fn e -> emit(other, e) end)
  This is dynamic, composable, and can be registered/unregistered at any time
- Remove unused config.exs defaults

Naming alignment with Python/TS:
- stop_bus -> stop (matches bus.stop())
- wait_for_completion -> event_completed (matches event.event_completed())
- bus_label -> label (matches bus.label)
- child_of? -> event_is_child_of (matches bus.event_is_child_of())
- parent_of? -> event_is_parent_of (matches bus.event_is_parent_of())
- reset -> event_reset (matches event.event_reset())
- Remove bus_config, handler_count from public API (not in Python/TS)

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
Module renames:
- HandlerEntry → EventHandler (matches Python's EventHandler class)
- HandlerResult → EventResult (matches Python's EventResult class)

Struct field renames in BusServer:
- pending_queue → pending_event_queue
- in_flight → in_flight_event_ids
- processing → processing_event_ids
- history → event_history
- history_count → event_history_count

Struct field renames in EventHandler (was HandlerEntry):
- event_type → event_pattern (matches Python's event_pattern)
- handler_fn → handler (matches Python's handler)
- bus_name → eventbus_name (matches Python's eventbus_name)
- registered_at → handler_registered_at
- Added: eventbus_id field
- Added: label/1 function

Struct field additions in EventResult (was HandlerResult):
- Added: id (own UUID, not just handler_id)
- Added: event_id
- Added: result_type
- Added: eventbus_id

New public API methods:
- dispatch/2 — alias for emit (matches Python's bus.dispatch())
- events_pending/1 — list pending events
- events_started/1 — list in-progress events
- events_completed/1 — list completed events

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
cubic-dev-ai[bot]

This comment was marked as resolved.

claude added 2 commits April 1, 2026 05:12
1. raise_if_any no-op (P1): Check unfiltered results for errors BEFORE
   applying the :include filter. Matches Python's behavior of checking
   self.event_results.values() directly.

2. raise_if_any/raise_if_none defaults (P2): Changed defaults from false
   to true to match Python and TypeScript reference implementations.

3. notify_waiters TOCTOU race (P1): Replace :ets.lookup + :ets.delete
   with :ets.take/2 which atomically reads and deletes, closing the
   window where a waiter could be inserted and silently dropped.

4. stop(clear: true) incomplete reset (P1): Reset event_history_count
   and worker_monitors when clearing bus state. Prevents stale
   history_count from incorrectly rejecting new events, and stale
   monitors from misattributing worker crashes.

5. Handler timeout spawn leak (P1): Change spawn to spawn_link with
   trap_exit for handler timeout processes. Unlinked processes survived
   as orphans when event-level timeout killed the parent. Now linked
   processes are properly terminated.

6. defevent reserved field validation (P2): Add the same event_* field
   validation to the defevent macro that __before_compile__ enforces
   for use AbxBus.Event modules.

7. per_event_max test assertion (P3): Add per-event concurrency tracking
   via ETS counters and assert max handler concurrency == 1 per event,
   not just global overlap. Previously the test only checked global
   overlap and would pass even with concurrent handlers within one event.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
Directory structure now follows the multi-language convention:
  abxbus/       — Python implementation
  abxbus-ts/    — TypeScript implementation
  abxbus-elixir/ — Elixir/OTP implementation

All naming unified to "abxbus" (no hyphens or underscores):
- Mix app: :abx_bus → :abxbus
- Module prefix: AbxBus → Abxbus
- ETS tables: :abx_events → :abxbus_events, etc.
- Process dictionary keys: :abx_current_* → :abxbus_current_*
- Module attributes: :abx_* → :abxbus_*

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
cubic-dev-ai[bot]

This comment was marked as resolved.

P1 fixes (3):
1. event.ex reset/1 crash: Use Map.merge instead of %{base | ...} so
   structs defined via `use Abxbus.Event` (which don't have event_*
   keys in their defstruct) don't raise KeyError.

2. event.ex new/2 crash on overrides: Split metadata override keys
   from user field keys before passing to struct!, so valid overrides
   like event_timeout don't raise "key not found in struct".

3. event.ex use Abxbus.Event silently ignoring options: Generate
   event_version/0 and event_result_type/0 functions in
   __before_compile__ so the :result_type and :version options
   passed to `use Abxbus.Event` actually take effect.

P2 fixes (9):
4. middleware.ex dispatch/3 only rescues: Add catch clause for
   throw/exit so middleware callbacks can't crash the bus.

5. test_events.ex duplicate defevent: Replace with delegation to
   Abxbus.Event.defevent so tests use production validations.

6. parent_tracking_test.exs missing assertion: Add assertion that
   explicit event_parent_id is actually preserved after emit/await.

7. parent_tracking_test.exs weak path assertion: Change >= 1 to >= 2
   to validate both buses processed the forwarded event.

8. event_bus_timeout_test.exs permissive serial assertion: Assert
   specific handler statuses (second=error, pending=error) instead
   of accepting either :error or :cancelled.

9. event_bus_timeout_test.exs missing parent timeout assertion:
   Assert parent event actually timed out before testing followup.

10. event_bus_find_test.exs flaky sleep: Replace Process.sleep(10)
    with atomics-based flag + spin_until for deterministic sync.

11. cross_runtime_features_test.exs racy global_max: Use threshold
    check (g >= 2) instead of read-compare-write pattern.

12. cross_runtime_features_test.exs boundary-flaky timeout: Increase
    handler sleep from 100ms to 500ms (5x the 100ms timeout) to
    eliminate timing boundary sensitivity.

Also fixed: defevent validation now allows known meta field overrides
(event_timeout, event_concurrency, etc.) while still rejecting unknown
event_* fields.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
devin-ai-integration[bot]

This comment was marked as resolved.

1. find("*", past: true) silently returns nil (event_store.ex:155):
   search_past received raw "*" string instead of normalized :wildcard
   atom, so matches_type? never matched. Now normalize before search.

2. release_semaphore crashes on empty holders (lock_manager.ex:194):
   tl([]) raises ArgumentError, crashing the singleton LockManager.
   Guard against empty list before calling tl.

3. Semaphore slot permanently leaked on process kill (event_worker.ex:354):
   When event timeout kills a handler process holding a semaphore via
   Process.exit(pid, :kill), the try/after block in maybe_with_semaphore
   never runs, permanently consuming the slot.

   Fix: LockManager now monitors semaphore holders via Process.monitor.
   When a holder dies (any reason — kill, crash, normal exit), the
   :DOWN handler automatically releases the slot and promotes the next
   waiter. This handles all crash scenarios, not just event timeouts.

   Changes to LockManager:
   - acquire_semaphore: monitor caller pid, track {from, mon_ref} in holders
   - release_semaphore: demonitor released holder, handle new tuple format
   - handle_info(:DOWN): auto-release dead holder's slot, promote waiters
   - init: add sem_monitors map to state

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
cubic-dev-ai[bot]

This comment was marked as resolved.

1. dispatch_defaults_test.exs: HandlerOverrideEvent was dead code.
   Added test proving event-level handler_concurrency: :serial
   override forces serial execution on a parallel bus. (P2)

2. middleware.ex dispatch/3: Now logs {:error, reason} returns from
   callbacks instead of silently discarding them. (P2)

3. event_bus_forwarding_test.exs: Replaced Process.sleep timing with
   barrier synchronization (counters + spin_until). Both handlers
   must reach the barrier before either proceeds, proving parallelism
   without relying on elapsed-time assumptions. (P2)

4. event_bus_timeout_test.exs: Increased parallel handler sleep from
   100ms to 500ms (5x the 100ms event timeout) to eliminate the
   boundary race where handler runtime == timeout. (P2)

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
devin-ai-integration[bot]

This comment was marked as resolved.

1. Child tracking records nil event_id (abxbus.ex:76-79):
   maybe_track_child was called BEFORE BusServer.emit, which is where
   ensure_event_id generates the ID. Events emitted without calling
   new() (e.g. %MyEvent{}) had nil event_id, inserting {parent_id, nil}
   into children ETS. This caused maybe_mark_tree_complete to treat the
   nil child as complete (EventStore.get(nil) → nil → assumed done),
   prematurely completing the parent.
   Fix: call maybe_track_child AFTER BusServer.emit returns the event
   with its assigned ID.

2. Semaphore release removes wrong holder when limit > 1 (lock_manager.ex:191):
   release_semaphore was a cast with no caller identity — it always
   popped the head of the LIFO holders list. With limit >= 2, if the
   earlier acquirer released first, it removed the later acquirer's
   entry and demonitored the wrong ref, leaking the actual releaser's
   slot and leaving the other holder unmonitored.
   Fix: pass caller_pid in the cast message, use Enum.split_with to
   find and remove the specific holder matching the caller.

3. History never trimmed when max_history_size=0 (bus_server.ex:487):
   Guard `when max <= 0` returned early, bypassing trimming entirely.
   With max_history_size: 0 and max_history_drop: true, the intended
   behavior is "don't keep history" but event_history grew without
   bound. Enum.take(list, 0) correctly returns [], so the guard just
   needed to be `when max < 0`.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
devin-ai-integration[bot]

This comment was marked as resolved.

claude added 2 commits April 1, 2026 10:52
1. EventStore.put overwrites forwarded event state (bus_server.ex:166):
   When the same event is forwarded to multiple buses, each bus's emit
   did a full EventStore.put (replacing the entire ETS record) followed
   by a non-atomic count update. The second bus's put would overwrite
   status/results set by the first bus, and event_pending_bus_count
   would always be 1 instead of N.

   Fix: replace put + update with atomic put_or_merge. If the event_id
   already exists in ETS, only update event_path and increment
   event_pending_bus_count — never overwrite runtime state (status,
   results, timestamps). If new, insert with count=1.

2. Worker crash leaves waiters hanging + global lock held (bus_server.ex:370):
   The :DOWN handler for crashed EventWorkers only set status to :error
   and cleaned up processing/in_flight IDs, but was missing three
   cleanup steps present in the normal completion path:
   - No EventStore.notify_waiters → await/event_completed hang forever
   - No LockManager.release_global → global_serial lock permanently held
   - No maybe_mark_tree_complete → parent events never complete

   Fix: mirror the full cleanup from handle_info(:event_worker_done)
   into the :DOWN handler: release global lock, dispatch middleware
   completion callback, and call maybe_mark_tree_complete.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
When an EventWorker crashes, the :DOWN handler sets status to :error.
maybe_mark_tree_complete then unconditionally overwrote it to :completed
when notifying waiters. Now preserves :error status for crashed events
while still correctly notifying waiters and propagating parent completion.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
cubic-dev-ai[bot]

This comment was marked as resolved.

claude added 3 commits April 1, 2026 11:22
Identified by cubic: when the same event is forwarded to multiple buses,
put_or_merge replaced the stored event_path with the incoming one,
dropping earlier bus labels. This breaks loop-prevention (which checks
if a bus label is already in event_path) and queue-jump (which sends
jump messages to all buses in event_path).

Fix: merge existing + incoming paths with Enum.uniq to preserve all
bus labels while preventing duplicates.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
Parent tracking (5 → 14 tests):
- multiple children from same parent have correct parent_id
- parallel handler concurrency parent tracking
- sync handler parent tracking
- error handler parent tracking
- child events tracked in parent's children list
- nested event children tracking (multi-level)
- multiple handlers event children
- event children empty when no children dispatched
- parent completion waits for all children

Event handler (5 → 11 tests):
- explicit completion override beats bus default
- first() preserves falsy values like 0
- first() skips event struct results and uses next winner
- first() returns nil when all handlers fail
- handler concurrency nil on dispatch, resolves during processing
- per-event handler concurrency override controls execution

Dispatch defaults (3 → 4 tests):
- handler defaults nil on dispatch, resolve during processing

Also fix: first/2 now filters out event struct results (maps with
:event_id key), matching Python's BaseEvent filtering behavior.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
New tests:
- nested timeout scenario with cascading timeouts
- parallel handlers aborted status on event timeout
- multi-bus timeout recorded on target bus
- forwarded timeout path does not stall followup events
- bus timeout defaults don't mutate event fields
- handler > event > bus timeout precedence
- event_handler_detect_file_paths toggle
- slow handler warning via Logger
- slow event warning via Logger

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
cubic-dev-ai[bot]

This comment was marked as resolved.

claude added 2 commits April 1, 2026 20:44
…t error handling

New tests (66 → 89):
- base_event: bus property single/multi/forwarding, result tracking,
  reserved field rejection, children tracking (5 → 11)
- comprehensive: comprehensive forwarding patterns, await forwarded
  event, race condition stress, await already completed, multiple
  concurrent awaits (4 → 9)
- cross_runtime: zero history with find(future), context propagation
  through forwarding, history backpressure rejects overflow (4 → 7)

Bug fix: Abxbus.emit now handles {:error, ...} returns from BusServer
gracefully instead of crashing in maybe_track_child.

Bug fix: first/2 now filters out event struct results (maps with
:event_id key) matching Python's BaseEvent filtering behavior.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
Identified by cubic: two timeout test assertions were weaker than the
Python reference implementation.

Fix 1: resolve_handler_timeout now uses priority chain (handler >
event > bus) instead of picking the minimum. Handler-specific timeout
takes precedence unconditionally when set, matching Python behavior.

Fix 2: timeout defaults test now explicitly asserts result.timeout
is nil when bus only sets event_timeout (not event_handler_timeout).

Fix 3: precedence test now uses strict equality (0.12) instead of
weak upper-bound check, matching Python's assertion.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
cubic-dev-ai[bot]

This comment was marked as resolved.

Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

11 issues found across 29 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="abxbus-elixir/test/event_bus_find_test.exs">

<violation number="1" location="abxbus-elixir/test/event_bus_find_test.exs:96">
P2: This readiness check races with `find/2` waiter registration, so the future-event test can still fail intermittently in CI.</violation>
</file>

<file name="abxbus-elixir/lib/abxbus/bus_server.ex">

<violation number="1" location="abxbus-elixir/lib/abxbus/bus_server.ex:153">
P1: The `started` flag is set to `false` by `stop/2` but never checked in `emit` or `maybe_process_next`. The bus continues to accept and process events after being stopped. Either guard `emit` with `if not state.started, do: {:reply, {:error, :stopped}, state}`, or use `{:stop, :normal, :ok, state}` to actually terminate the GenServer.</violation>
</file>

<file name="abxbus-elixir/test/event_bus_locking_test.exs">

<violation number="1" location="abxbus-elixir/test/event_bus_locking_test.exs:80">
P2: These concurrency assertions are timing-based, so the new tests can fail nondeterministically on slower CI runners.</violation>
</file>

<file name="abxbus-elixir/lib/abxbus/event_worker.ex">

<violation number="1" location="abxbus-elixir/lib/abxbus/event_worker.ex:336">
P2: `spawn_link` is called before `Process.flag(:trap_exit, true)`, creating a small race window. If the linked process crashes before trap_exit is enabled, the parent is killed. Move `Process.flag(:trap_exit, true)` before the `spawn_link` call.</violation>

<violation number="2" location="abxbus-elixir/lib/abxbus/event_worker.ex:384">
P1: `Process.flag(:trap_exit, false)` unconditionally disables exit trapping instead of restoring the previous value. `maybe_with_handler_timeout` in the same file does this correctly with `old_trap = Process.flag(:trap_exit, true)` … `Process.flag(:trap_exit, old_trap)`. Apply the same pattern here.</violation>
</file>

<file name="abxbus-elixir/lib/abxbus.ex">

<violation number="1" location="abxbus-elixir/lib/abxbus.ex:97">
P1: The fast-path completion check only matches `:completed` status. If an event has errored before `await` is called (and waiters were already notified), this waiter will never receive a message, causing the caller to hang indefinitely. The check should also match `:error` (and any other terminal status).</violation>
</file>

<file name="abxbus-elixir/lib/abxbus/lock_manager.ex">

<violation number="1" location="abxbus-elixir/lib/abxbus/lock_manager.ex:152">
P1: Monitor global-lock holders; a crashing BusServer can leave global-serial mode stuck busy forever.</violation>

<violation number="2" location="abxbus-elixir/lib/abxbus/lock_manager.ex:164">
P1: Keying semaphores only by name makes `:bus` scope global across buses.</violation>
</file>

<file name="abxbus-elixir/lib/abxbus/event.ex">

<violation number="1" location="abxbus-elixir/lib/abxbus/event.ex:191">
P1: Allow `event_parent_id` through the metadata override path; otherwise `use Abxbus.Event` events cannot preserve an explicit parent ID.</violation>

<violation number="2" location="abxbus-elixir/lib/abxbus/event.ex:266">
P1: Generate a real UUID event_id here; the current timestamp-random format breaks cross-runtime event_id validation.</violation>
</file>

<file name="abxbus-elixir/lib/abxbus/event_store.ex">

<violation number="1" location="abxbus-elixir/lib/abxbus/event_store.ex:129">
P2: TOCTOU race in `resolve_find_waiters`: `lookup` + `delete_object` on the shared ETS table is not atomic. When multiple buses emit matching events concurrently, the same waiter can receive duplicate `:find_match` messages. Consider serializing this through the EventStore GenServer or using `:ets.take/2` (as done in `notify_waiters`) to atomically read-and-delete.</violation>
</file>
Architecture diagram
sequenceDiagram
    participant C as Client
    participant API as Abxbus API
    participant Store as EventStore (ETS)
    participant Bus as BusServer (GenServer)
    participant Lock as LockManager (GenServer)
    participant Worker as EventWorker (Process)

    Note over C,Worker: NEW: Elixir/OTP Event Bus Flow (Bus-Serial Example)

    C->>API: emit(event)
    API->>API: NEW: Extract Parent ID from P-Dict
    API->>Store: NEW: put_or_merge(event)
    API->>Bus: NEW: GenServer.call(:emit)
    
    alt Bus is Idle
        Bus->>Worker: NEW: spawn_link(run/5)
    else Bus is Busy
        Bus->>Bus: Enqueue event (:queue)
    end
    Bus-->>API: Return Event Metadata
    API-->>C: Return Event

    Note over Worker: Execution Context
    Worker->>Worker: NEW: Set P-Dict (:abx_current_event_id)
    Worker->>Lock: NEW: resolve_concurrency_policy()
    Lock-->>Worker: config (timeouts, concurrency)

    rect rgb(240, 240, 240)
        Note over Worker,API: Handler Execution
        Worker->>Worker: Execute Handler Fn
        
        opt Handler emits child
            Worker->>API: emit(child_event)
            API->>Store: NEW: update lineage (Parent -> Child)
            API->>Bus: GenServer.call(:emit)
        end

        opt Handler awaits child (Queue Jump)
            Worker->>API: await(child_event)
            API->>Store: NEW: add_waiter(child_id, self())
            API->>Bus: NEW: trigger_queue_jump(child_event)
            Bus->>Worker: NEW: spawn child worker immediately
            API->>API: NEW: receive (Blocks parent handler)
        end
    end

    Worker->>Bus: NEW: send(:event_worker_done)
    Bus->>Store: Mark event :completed
    Store->>API: NEW: notify_waiters (Signal unblock)
    
    Bus->>Bus: NEW: Dequeue next event
    
    opt Global Serial Mode
        Bus->>Lock: NEW: acquire_global() / release_global()
    end

    alt Failure Path (Timeout)
        Worker->>Worker: NEW: timer.exit after event_timeout
        Bus->>Store: NEW: Mark handlers as :error (Aborted)
    end
Loading

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Comment thread abxbus-elixir/lib/abxbus/bus_server.ex Outdated
Comment thread abxbus-elixir/lib/abxbus/event_worker.ex Outdated
Comment thread abxbus-elixir/lib/abxbus.ex Outdated
Comment thread abxbus-elixir/lib/abxbus/lock_manager.ex
Comment thread abxbus-elixir/lib/abxbus/lock_manager.ex Outdated
Comment thread abxbus-elixir/lib/abxbus/event.ex Outdated
Comment thread abxbus-elixir/test/event_bus_find_test.exs
@@ -0,0 +1,306 @@
defmodule Abxbus.EventBusLockingTest do
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: These concurrency assertions are timing-based, so the new tests can fail nondeterministically on slower CI runners.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At abxbus-elixir/test/event_bus_locking_test.exs, line 80:

<comment>These concurrency assertions are timing-based, so the new tests can fail nondeterministically on slower CI runners.</comment>

<file context>
@@ -0,0 +1,306 @@
+      max_b = :atomics.new(1, [])
+      max_global = :atomics.new(1, [])
+
+      sync_event = :erlang.make_ref()
+
+      make_handler = fn source_counter, source_max ->
</file context>
Fix with Cubic

Comment thread abxbus-elixir/lib/abxbus/event_worker.ex
Comment thread abxbus-elixir/lib/abxbus/event_store.ex Outdated
cubic-dev-ai[bot]

This comment was marked as resolved.

A worker crash should always mark the event as :error, even if another
bus already marked it :completed. Errors take precedence over success.

Issue identified by cubic.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 issues found across 1 file (changes from recent commits).

Requires human review: Auto-approval blocked by 14 unresolved issues from previous reviews.

… → 266)

New feature:
- max_handler_recursion_depth option for start_bus (default: 3)
- Tracks handler nesting depth across emit→await chains via ETS
- When depth exceeded, handlers get :error with "Infinite loop detected"
- Matches Python/TS max_handler_recursion_depth behavior

New middleware:
- lib/abxbus/middlewares/wal.ex — basic WAL middleware that appends
  completed events as JSONL lines to a file

New tests in eventbus_test.exs:
- custom/default handler recursion depth
- event with complex data, event_version overrides
- results by handler_name, event_completed pattern
- find waiter cleanup, find past most recent
- complex multi-bus forwarding with results

New tests in middleware_test.exs:
- hook statuses never emit error
- hooks cover string and wildcard patterns
- lifecycle monotonic on timeout

New tests in event_result_test.exs:
- handler_file_path detection, status transitions, mark_error

New tests in retry_integration_test.exs:
- retry with semaphore limits, retry timeout

New tests in debounce_test.exs:
- debounce prefers recent, or-chain patterns

New tests in event_bus_find_test.exs:
- find waiter cleanup after resolve and timeout

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
cubic-dev-ai[bot]

This comment was marked as resolved.

claude added 2 commits April 4, 2026 04:53
Production code:
- bus_server: clean up :abxbus_event_depth ETS entries after event
  completion (prevents unbounded memory growth)
- event_worker: preserve full handler metadata (file_path, timeout,
  registered_at) in recursion-limit error results

Test improvements (identified by cubic):
- eventbus_test: tighten event_completed assertion to == :completed
- eventbus_test: verify find_past returns one of the emitted events
- event_result_test: assert handler_file_path is non-nil binary
- event_bus_find_test: replace Process.sleep with spin_until for
  waiter cleanup verification
- middleware_test: replace Process.sleep(800) with event_completed,
  assert timeout outcome
- retry_integration_test: use atomics compare_exchange for racy
  max-concurrency tracker
- debounce_test: add timing assertion for "without waiting" tests,
  add wait_until_idle after or-chain emit

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
Replace dynamic Module.create (which generated a unique atom per call,
leaking atoms) with ETS-based config lookup. The WAL module itself
is now the middleware; per-bus config stored in :abxbus_wal_config ETS.

Issue identified by cubic.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 issues found across 9 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="abxbus-elixir/lib/abxbus/middlewares/wal.ex">

<violation number="1" location="abxbus-elixir/lib/abxbus/middlewares/wal.ex:70">
P1: `build/1` is no longer backward compatible for multiple WAL instances; later calls overwrite the global default agent for earlier buses.</violation>

<violation number="2" location="abxbus-elixir/lib/abxbus/middlewares/wal.ex:76">
P2: Make ETS table creation race-safe; the current `whereis`/`new` sequence can crash when two registrations happen concurrently.</violation>
</file>

<file name="abxbus-elixir/lib/abxbus/bus_server.ex">

<violation number="1" location="abxbus-elixir/lib/abxbus/bus_server.ex:347">
P2: Depth tracking is deleted too early for forwarded multi-bus events; this can reset recursion depth to 0 on still-pending buses.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

"""
def build(agent) do
ensure_config_table()
:ets.insert(@wal_config_table, {:__wal_default__, agent})
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: build/1 is no longer backward compatible for multiple WAL instances; later calls overwrite the global default agent for earlier buses.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At abxbus-elixir/lib/abxbus/middlewares/wal.ex, line 70:

<comment>`build/1` is no longer backward compatible for multiple WAL instances; later calls overwrite the global default agent for earlier buses.</comment>

<file context>
@@ -37,42 +39,96 @@ defmodule Abxbus.Middlewares.WAL do
+  """
+  def build(agent) do
+    ensure_config_table()
+    :ets.insert(@wal_config_table, {:__wal_default__, agent})
+    __MODULE__
+  end
</file context>
Fix with Cubic

Comment thread abxbus-elixir/lib/abxbus/middlewares/wal.ex
Comment thread abxbus-elixir/lib/abxbus/bus_server.ex Outdated
- wal.ex: make ETS table creation race-safe with try/rescue
- bus_server: only delete depth tracking when event_pending_bus_count
  is 0 (prevents resetting depth for forwarded multi-bus events)

WAL build/1 global default overwrite is acknowledged but acceptable
for single-bus usage; multi-bus should use register/2.

Issues identified by cubic.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 issues found across 2 files (changes from recent commits).

Requires human review: Auto-approval blocked by 16 unresolved issues from previous reviews.

devin-ai-integration[bot]

This comment was marked as resolved.

:abxbus_handler_depth was missing from Process.put in 3 spawn sites:
- maybe_with_event_timeout (spawn_link)
- maybe_with_handler_timeout (spawn_link)
- run_handlers_parallel (Task.async)

Without this, handlers in timeout-wrapped or parallel mode would read
depth as 0, defeating the recursion guard.

Issue identified by Devin review.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
Copy link
Copy Markdown
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 issues found across 1 file (changes from recent commits).

Requires human review: Auto-approval blocked by 15 unresolved issues from previous reviews.

claude added 2 commits April 5, 2026 06:02
EventWorker now writes :started, :completed, and :error status for each
handler result to ETS as it happens, not just at event completion.

Also fix find() to return most recent match by event_created_at.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
…new tests (WIP)

- EventWorker writes :started/:completed/:error to ETS per-handler as
  they happen (not just at event completion)
- find() search_past returns most recent match by event_created_at
- Adding missing tests (in progress)

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
@cubic-dev-ai
Copy link
Copy Markdown
Contributor

cubic-dev-ai bot commented Apr 5, 2026

You're iterating quickly on this pull request. To help protect your rate limits, cubic has paused automatic reviews on new pushes for now—when you're ready for another review, comment @cubic-dev-ai review.

claude added 3 commits April 5, 2026 06:08
…n guard, real-time ETS updates, find ordering

eventbus_test.exs: string handler matching, multiple error aggregation,
  single error preservation, 3-level hierarchy bubbling, circular
  subscription recursion guard, result indexing by handler ID,
  results by eventbus_name, automatic event_type, idle recovery,
  parallel result integrity, handler started visible in ETS

event_result_test.exs: no casting without result_type, handler_id
  matches entry ID

event_bus_find_test.exs: find past returns most recent match

base_event_test.exs: forwarded handler sees correct bus per bus

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
- WAL writes completed events to JSONL file
- WAL creates parent directories
- WAL skips incomplete (in-progress) events

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
New modules:
- lib/abxbus/json.ex: minimal JSON encoder/decoder (no deps, delegates
  to Jason if available). Handles strings, numbers, booleans, nil,
  lists, maps — sufficient for event round-tripping.
- Event.to_json/1, Event.from_json/2: serialize events to/from maps
  with string keys. Field names match Python/TS exactly — no aliasing.
- Event.to_json_string/1, Event.from_json_string/2: string encoding.
- Public API: Abxbus.to_json/1, from_json/2, to_json_string/1,
  from_json_string/2.

Implementation fixes:
- event_store: find() returns most recent match by event_created_at
- event_worker: writes handler result status to ETS in real-time
  (:started, :completed, :error) for middleware observability

New test file:
- serialization_test.exs: 10 tests covering toJSON field names, fromJSON
  round-trip, null preservation, string encoding, cross-runtime field
  compatibility (all Python/TS event_* fields present, no aliasing)

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
devin-ai-integration[bot]

This comment was marked as resolved.

- safe_to_atom: fall back to keeping string (not creating new atom)
  when atom doesn't exist. Prevents atom table exhaustion from
  untrusted JSON input via bridges. Issue identified by Devin.
- applicable_handlers: fix wildcard forwarding loop detection. The old
  filter (entry.eventbus_name != state.name) was always false. Now
  counts bus visits in event_path — skips wildcard handlers when bus
  appears more than once (re-visit). Issue identified by Devin.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
devin-ai-integration[bot]

This comment was marked as resolved.

- release_global: change from cast to synchronous call, ensuring the
  lock is freed before notify_all_buses_check_pending sends :check_pending.
  Prevents permanent stall where BusB's try_acquire races ahead of
  BusA's release. Issue identified by Devin.
- notify_waiters: re-read event from ETS before notifying, so waiters
  get the latest snapshot with all merged handler results from
  concurrent forwarded-event completions. Issue identified by Devin.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
devin-ai-integration[bot]

This comment was marked as resolved.

claude added 4 commits April 6, 2026 02:41
The catch :exit path in run_with_retries unconditionally retried all
exit signals without consulting retry_on_errors. Now checks
should_retry? consistently with the rescue path.

Issue identified by Devin review.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
UUIDv5 handler IDs (matches Python/TS algorithm):
- Add Event.uuid5/2 — RFC 4122 UUIDv5 via SHA-1 hash of namespace+name
- Add Event.handler_id_namespace/0 — uuid5(NAMESPACE_DNS, "abxbus-handler")
- EventHandler.compute_handler_id/5 — generates deterministic ID from
  eventbus_id, handler_name, file_path, registered_at, event_pattern
- Verified against Python: same seed produces same UUID
  ("19ea9fe8-cfbe-541e-8a35-2579e4e9efff")
- handler_registered_at now an ISO datetime string for stability
- Allow explicit id override via opts (matches Python from_callable)

Result type enforcement:
- enforce_result_type/2 in event_worker checks handler results against
  event_result_type (event-level field)
- Supports :string, :integer, :number, :boolean, :atom, :list, :map,
  :nil, :any, plus module-based struct matching
- Mismatched results get :error status with EventHandlerResultSchemaError
- Add Abxbus.EventHandlerResultSchemaError exception with expected/actual fields

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
… 326)

New tests:
- eventbus_test: result type enforcement with :map and :list, results
  filterable by eventbus_id
- eventbus_on_off_test: deterministic handler IDs (matches expected
  uuidv5 hash, determinism, uniqueness)
- event_result_test: EventResult update ordering, result_type field
- middleware_test: WAL creates parent dirs, WAL skips incomplete events
- eventbus_context_test: context propagates to parallel handlers
- retry_integration_test: retry succeeds after multiple attempts
- event_bus_find_test: debounce future match before dispatch fallback
- base_event_test: event runtime state (started_at/completed_at
  timestamps, no-handler events still get timestamps)

Production fix:
- event_worker: pass eventbus_id when creating EventResult structs
  (was declared but never populated, blocking eventbus_id filter test)

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
Root cause: EventStore.search_past() called :ets.tab2list/1 which does
a full O(n) scan of the :abxbus_events table. When benchmark tests
(serial_50k, performance_test, ets_*) filled the table with 50k+
entries, the linear scan took 5+ seconds, blowing past test timeouts.

Fix: add a secondary index table :abxbus_events_by_type keyed by
event_type atom. select_by_type/1 now does:
- Atom type: O(k) lookup where k = events of that type (not total)
- Wildcard: full scan (rare case)
- String type: iterate distinct keys in index (small set of atoms)

This makes find() performance independent of test pollution. Full
test suite now runs deterministically — 5/5 runs pass with 0 failures
including all benchmark tests.

Also fix pre-existing broken test in ets_table_size_test.exs that
called :erlang.process_flag/3 with :min_heap_size (only supported
via Process.flag/2 from the target process).

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
devin-ai-integration[bot]

This comment was marked as resolved.

The optimization guard at applicable_handlers compared map_size (key
count) against length(type_handlers) + length(wildcard_handlers)
(entry count). When the type key had multiple entries (e.g. 3 handlers
for MyEvent) and a string handler also existed for "MyEvent", the
guard 2 > 3 evaluated false, skipping the string handler lookup.

Fix: compare key counts to key counts. Count whether each of the two
known keys has any entries (0 or 1 each), then check map_size against
that sum.

Issue identified by Devin review.

https://claude.ai/code/session_01Qf9t3FPgh38PQUfKUcUeGx
Copy link
Copy Markdown

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 2 new potential issues.

View 33 additional findings in Devin Review.

Open in Devin Review

defp decode_string(<<"\"", rest::binary>>, acc), do: {acc, rest}
defp decode_string(<<c, rest::binary>>, acc), do: decode_string(rest, acc <> <<c>>)

defp decode_array(<<"}", rest::binary>>, acc), do: {Enum.reverse(acc), rest}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 JSON decoder decode_array incorrectly accepts } as array terminator

The decode_array function at abxbus-elixir/lib/abxbus/json.ex:98 matches "}" (closing brace) as a valid terminator for JSON arrays, in addition to the correct "]" at line 99. In JSON, } should only close objects, while ] closes arrays. This means malformed JSON like [1, 2, 3} would be silently accepted and parsed as [1, 2, 3] instead of producing an error. More critically, in edge cases involving nested structures, this could cause the parser to consume a } that belongs to an enclosing object, leading to incorrect parse results or errors downstream. While this built-in JSON parser is only used when Jason is unavailable, it's still a correctness issue in the parser logic.

Suggested change
defp decode_array(<<"}", rest::binary>>, acc), do: {Enum.reverse(acc), rest}
defp decode_array(<<"]", rest::binary>>, acc), do: {Enum.reverse(acc), rest}
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +99 to +109
case EventStore.get(event.event_id) do
%{event_status: status} = completed when status in [:completed, :error] ->
# Already terminal — drain any notification sent between registration and check
receive do
{:event_completed, ^ref, _} -> :ok
after
0 ->
# Waiter was added after notify_waiters ran — clean it up
EventStore.remove_waiter(event.event_id, ref)
end
completed
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 await early-return bypasses tree-completion check, returning before children complete

When await/2 is called on an event whose event_status is already :completed or :error, the early-return path at abxbus-elixir/lib/abxbus.ex:100-109 returns immediately without checking whether the event's children have also completed. This violates the documented contract: "Blocks the calling process until the event (and all its children) are complete."

How the bug gets triggered

The normal (non-early) path works correctly — waiters registered before the event completes only get notified via notify_waiters in do_mark_tree_complete (abxbus-elixir/lib/abxbus/bus_server.ex:702-704), which checks that all children are terminal before notifying. But the early-return path at line 100 only checks the event's own status, not its children.

Triggering scenario:

  1. Parent handler emits a child event without awaiting it (fire-and-forget)
  2. Parent handler returns → parent's event_status set to :completed
  3. maybe_mark_tree_complete finds children NOT complete → does NOT call notify_waiters
  4. External code calls await(parent) → reads :completed status → returns immediately
  5. Child is still running but the caller already got the "completed" event
Prompt for agents
The early-return path in Abxbus.await/2 (abxbus.ex:99-109) checks only the event's own status to decide whether to return immediately, but doesn't verify that all children have also completed. The tree-aware notification logic in BusServer.do_mark_tree_complete (bus_server.ex:686-714) correctly waits for children, but it only controls the notification-based path. The early-return path bypasses it entirely.

To fix this, the early-return check at line 100 needs to also verify that all children of the event are in a terminal state before returning immediately. One approach: add a helper like `tree_complete?(event_id)` that checks EventStore.children_of and recursively verifies all descendants are terminal, and include it in the guard condition. If the tree is NOT complete, fall through to the waiter-based path instead of returning early.

The same issue exists in `event_completed/2` at abxbus.ex:142-149 which has the same early-return pattern.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants