From 44f134e256332d15c58591a9eed85bfb0d4a6792 Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Fri, 20 Feb 2026 13:23:14 -0800 Subject: [PATCH 1/4] Add strict typed event/handler API for bubus-rust --- bubus-rust/Cargo.toml | 14 + bubus-rust/README.md | 40 + bubus-rust/src/base_event.rs | 126 ++++ bubus-rust/src/event_bus.rs | 699 ++++++++++++++++++ bubus-rust/src/event_handler.rs | 56 ++ bubus-rust/src/event_result.rs | 44 ++ bubus-rust/src/id.rs | 24 + bubus-rust/src/lib.rs | 10 + bubus-rust/src/lock_manager.rs | 29 + bubus-rust/src/typed.rs | 109 +++ bubus-rust/src/types.rs | 31 + bubus-rust/tests/event_bus_tests.rs | 55 ++ bubus-rust/tests/test_base_event.rs | 30 + bubus-rust/tests/test_event_handler_first.rs | 41 + bubus-rust/tests/test_event_handler_ids.rs | 32 + bubus-rust/tests/test_event_history_store.rs | 35 + bubus-rust/tests/test_event_result.rs | 24 + .../tests/test_eventbus_dispatch_defaults.rs | 33 + bubus-rust/tests/test_eventbus_edge_cases.rs | 57 ++ bubus-rust/tests/test_eventbus_find.rs | 37 + bubus-rust/tests/test_eventbus_locking.rs | 87 +++ bubus-rust/tests/test_eventbus_on_off.rs | 22 + bubus-rust/tests/test_eventbus_timeout.rs | 107 +++ bubus-rust/tests/test_ids.rs | 48 ++ bubus-rust/tests/test_typed_events.rs | 58 ++ 25 files changed, 1848 insertions(+) create mode 100644 bubus-rust/Cargo.toml create mode 100644 bubus-rust/README.md create mode 100644 bubus-rust/src/base_event.rs create mode 100644 bubus-rust/src/event_bus.rs create mode 100644 bubus-rust/src/event_handler.rs create mode 100644 bubus-rust/src/event_result.rs create mode 100644 bubus-rust/src/id.rs create mode 100644 bubus-rust/src/lib.rs create mode 100644 bubus-rust/src/lock_manager.rs create mode 100644 bubus-rust/src/typed.rs create mode 100644 bubus-rust/src/types.rs create mode 100644 bubus-rust/tests/event_bus_tests.rs create mode 100644 bubus-rust/tests/test_base_event.rs create mode 100644 bubus-rust/tests/test_event_handler_first.rs create mode 100644 bubus-rust/tests/test_event_handler_ids.rs create mode 100644 bubus-rust/tests/test_event_history_store.rs create mode 100644 bubus-rust/tests/test_event_result.rs create mode 100644 bubus-rust/tests/test_eventbus_dispatch_defaults.rs create mode 100644 bubus-rust/tests/test_eventbus_edge_cases.rs create mode 100644 bubus-rust/tests/test_eventbus_find.rs create mode 100644 bubus-rust/tests/test_eventbus_locking.rs create mode 100644 bubus-rust/tests/test_eventbus_on_off.rs create mode 100644 bubus-rust/tests/test_eventbus_timeout.rs create mode 100644 bubus-rust/tests/test_ids.rs create mode 100644 bubus-rust/tests/test_typed_events.rs diff --git a/bubus-rust/Cargo.toml b/bubus-rust/Cargo.toml new file mode 100644 index 0000000..6400b60 --- /dev/null +++ b/bubus-rust/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "bubus-rust" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { version = "1", features = ["derive"] } +serde_json = "1" +uuid = { version = "1", features = ["v4", "v5", "v7", "serde"] } +futures = { version = "0.3", features = ["executor", "thread-pool"] } +event-listener = "5" +parking_lot = "0.12" + +[dev-dependencies] diff --git a/bubus-rust/README.md b/bubus-rust/README.md new file mode 100644 index 0000000..c2084f0 --- /dev/null +++ b/bubus-rust/README.md @@ -0,0 +1,40 @@ +# bubus-rust + +Idiomatic Rust implementation of `bubus`, matching the Python/TypeScript event JSON surface and execution semantics as closely as possible. + +## Current scope + +Implemented core features: +- Base event model and event result model with serde JSON compatibility +- Async event bus with queueing and queue-jump behavior +- Event concurrency: `global-serial`, `bus-serial`, `parallel` +- Handler concurrency: `serial`, `parallel` +- Handler completion strategies: `all`, `first` +- Event path tracking and pending bus count + +Not yet implemented in this crate revision: +- Bridges +- Middlewares (hook points are left in code comments) + +## Quickstart + +```rust +use bubus_rust::{base_event, event_bus}; +use futures::executor::block_on; +use serde_json::{Map, json}; + +let bus = event_bus::new(Some("MainBus".to_string())); +bus.on("UserLoginEvent", "handle_login", |event| async move { + Ok(json!({"ok": true, "event_id": event.inner.lock().event_id})) +}); + +let mut payload = Map::new(); +payload.insert("username".to_string(), json!("alice")); +let event = base_event::new("UserLoginEvent", payload); +bus.emit(event.clone()); + +block_on(async { + event.wait_completed().await; + println!("{}", event.to_json_value()); +}); +``` diff --git a/bubus-rust/src/base_event.rs b/bubus-rust/src/base_event.rs new file mode 100644 index 0000000..6cd1f1d --- /dev/null +++ b/bubus-rust/src/base_event.rs @@ -0,0 +1,126 @@ +use std::{collections::HashMap, sync::Arc}; + +use event_listener::Event; +use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; + +use crate::{ + event_result::EventResult, + id::uuid_v7_string, + types::{ + EventConcurrencyMode, EventHandlerCompletionMode, EventHandlerConcurrencyMode, EventStatus, + }, +}; + +pub fn now_iso() -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + let dur = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default(); + format!("{}.{:09}Z", dur.as_secs(), dur.subsec_nanos()) +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct BaseEventData { + pub event_type: String, + pub event_version: String, + pub event_timeout: Option, + pub event_slow_timeout: Option, + pub event_concurrency: Option, + pub event_handler_timeout: Option, + pub event_handler_slow_timeout: Option, + pub event_handler_concurrency: Option, + pub event_handler_completion: Option, + pub event_result_type: Option, + pub event_id: String, + pub event_path: Vec, + pub event_parent_id: Option, + pub event_emitted_by_handler_id: Option, + pub event_pending_bus_count: usize, + pub event_created_at: String, + pub event_status: EventStatus, + pub event_started_at: Option, + pub event_completed_at: Option, + pub event_results: HashMap, + #[serde(flatten)] + pub payload: Map, +} + +pub struct BaseEvent { + pub inner: Mutex, + pub completed: Event, +} + +impl BaseEvent { + pub fn new(event_type: impl Into, payload: Map) -> Arc { + Arc::new(Self { + inner: Mutex::new(BaseEventData { + event_type: event_type.into(), + event_version: "0.0.1".to_string(), + event_timeout: None, + event_slow_timeout: None, + event_concurrency: None, + event_handler_timeout: None, + event_handler_slow_timeout: None, + event_handler_concurrency: None, + event_handler_completion: None, + event_result_type: None, + event_id: uuid_v7_string(), + event_path: vec![], + event_parent_id: None, + event_emitted_by_handler_id: None, + event_pending_bus_count: 0, + event_created_at: now_iso(), + event_status: EventStatus::Pending, + event_started_at: None, + event_completed_at: None, + event_results: HashMap::new(), + payload, + }), + completed: Event::new(), + }) + } + + pub async fn wait_completed(self: &Arc) { + loop { + let listener = self.completed.listen(); + { + let event = self.inner.lock(); + if event.event_status == EventStatus::Completed { + return; + } + } + listener.await; + } + } + + pub fn mark_started(&self) { + let mut event = self.inner.lock(); + if event.event_started_at.is_none() { + event.event_started_at = Some(now_iso()); + } + event.event_status = EventStatus::Started; + } + + pub fn mark_completed(&self) { + let mut event = self.inner.lock(); + event.event_status = EventStatus::Completed; + if event.event_completed_at.is_none() { + event.event_completed_at = Some(now_iso()); + } + self.completed.notify(usize::MAX); + } + + pub fn to_json_value(&self) -> Value { + serde_json::to_value(&*self.inner.lock()).unwrap_or(Value::Null) + } + + pub fn from_json_value(value: Value) -> Arc { + let parsed: BaseEventData = serde_json::from_value(value).expect("invalid base_event json"); + Arc::new(Self { + inner: Mutex::new(parsed), + completed: Event::new(), + }) + } +} diff --git a/bubus-rust/src/event_bus.rs b/bubus-rust/src/event_bus.rs new file mode 100644 index 0000000..14251f6 --- /dev/null +++ b/bubus-rust/src/event_bus.rs @@ -0,0 +1,699 @@ +use std::{ + collections::{HashMap, VecDeque}, + sync::{mpsc as std_mpsc, Arc, OnceLock}, + thread, + time::{Duration, Instant}, +}; + +use event_listener::Event; +use futures::executor::block_on; +use parking_lot::Mutex; +use serde::Serialize; +use serde_json::Value; + +use crate::{ + base_event::{now_iso, BaseEvent}, + event_handler::{EventHandler, EventHandlerCallable}, + event_result::{EventResult, EventResultStatus}, + id::uuid_v7_string, + lock_manager::ReentrantLock, + types::{ + EventConcurrencyMode, EventHandlerCompletionMode, EventHandlerConcurrencyMode, EventStatus, + }, +}; + +static GLOBAL_SERIAL_LOCK: OnceLock> = OnceLock::new(); +thread_local! { + static CURRENT_EVENT_ID: std::cell::RefCell> = const { std::cell::RefCell::new(None) }; + static CURRENT_HANDLER_ID: std::cell::RefCell> = const { std::cell::RefCell::new(None) }; +} + +struct FindWaiter { + id: u64, + pattern: String, + child_of_event_id: Option, + sender: std_mpsc::Sender>, +} + +struct BusRuntime { + queue: Mutex>>, + queue_notify: Event, + stop: Mutex, + events: Mutex>>, + history_order: Mutex>, + max_history_size: Option, + max_history_drop: bool, + find_waiters: Mutex>, + next_waiter_id: Mutex, +} + +#[derive(Clone, Serialize)] +pub struct EventBus { + pub name: String, + pub id: String, + pub event_concurrency: EventConcurrencyMode, + pub event_timeout: Option, + pub event_slow_timeout: Option, + pub event_handler_concurrency: EventHandlerConcurrencyMode, + pub event_handler_completion: EventHandlerCompletionMode, + pub event_handler_slow_timeout: Option, + #[serde(skip)] + handlers: Arc>>>, + #[serde(skip)] + runtime: Arc, + #[serde(skip)] + bus_serial_lock: Arc, +} + +impl EventBus { + pub fn new(name: Option) -> Arc { + Self::new_with_history(name, Some(100), false) + } + + pub fn new_with_history( + name: Option, + max_history_size: Option, + max_history_drop: bool, + ) -> Arc { + let bus = Arc::new(Self { + name: name.unwrap_or_else(|| "EventBus".to_string()), + id: uuid_v7_string(), + event_concurrency: EventConcurrencyMode::BusSerial, + event_timeout: Some(60.0), + event_slow_timeout: Some(300.0), + event_handler_concurrency: EventHandlerConcurrencyMode::Serial, + event_handler_completion: EventHandlerCompletionMode::All, + event_handler_slow_timeout: Some(30.0), + handlers: Arc::new(Mutex::new(HashMap::new())), + runtime: Arc::new(BusRuntime { + queue: Mutex::new(VecDeque::new()), + queue_notify: Event::new(), + stop: Mutex::new(false), + events: Mutex::new(HashMap::new()), + history_order: Mutex::new(VecDeque::new()), + max_history_size, + max_history_drop, + find_waiters: Mutex::new(Vec::new()), + next_waiter_id: Mutex::new(0), + }), + bus_serial_lock: Arc::new(ReentrantLock::default()), + }); + Self::start_loop(bus.clone()); + bus + } + + fn start_loop(bus: Arc) { + thread::spawn(move || { + block_on(async move { + loop { + if !bus.runtime.queue.lock().is_empty() { + thread::sleep(Duration::from_millis(1)); + } + let next_event = bus.runtime.queue.lock().pop_front(); + if let Some(event) = next_event { + let bus_for_task = bus.clone(); + let mode = event + .inner + .lock() + .event_concurrency + .unwrap_or(bus.event_concurrency); + match mode { + EventConcurrencyMode::Parallel => { + thread::spawn(move || { + block_on(bus_for_task.process_event(event)); + }); + } + EventConcurrencyMode::GlobalSerial + | EventConcurrencyMode::BusSerial => { + bus.process_event(event).await; + } + } + continue; + } + + if *bus.runtime.stop.lock() { + break; + } + bus.runtime.queue_notify.listen().await; + } + }); + }); + } + + pub fn stop(&self) { + *self.runtime.stop.lock() = true; + self.runtime.queue_notify.notify(usize::MAX); + } + + pub fn runtime_payload_for_test(&self) -> HashMap> { + self.runtime.events.lock().clone() + } + + pub fn event_history_ids(&self) -> Vec { + self.runtime.history_order.lock().iter().cloned().collect() + } + + pub fn on(&self, pattern: &str, handler_name: &str, handler_fn: F) -> EventHandler + where + F: Fn(Arc) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, + { + let callable: EventHandlerCallable = Arc::new(move |event| Box::pin(handler_fn(event))); + let entry = EventHandler::from_callable( + pattern.to_string(), + handler_name.to_string(), + self.name.clone(), + self.id.clone(), + callable, + ); + self.handlers + .lock() + .entry(pattern.to_string()) + .or_default() + .push(entry.clone()); + entry + } + + pub fn off(&self, pattern: &str, handler_id: Option<&str>) { + let mut handlers = self.handlers.lock(); + if let Some(list) = handlers.get_mut(pattern) { + if let Some(handler_id) = handler_id { + list.retain(|handler| handler.id != handler_id); + } else { + list.clear(); + } + } + } + + pub fn emit(&self, event: Arc) -> Arc { + self.emit_with_options(event, false) + } + + pub fn emit_with_options(&self, event: Arc, queue_jump: bool) -> Arc { + if !self.register_in_history(event.clone()) { + event.mark_completed(); + return event; + } + + { + let mut inner = event.inner.lock(); + inner.event_pending_bus_count += 1; + inner + .event_path + .push(format!("{}#{}", self.name, &self.id[0..4])); + CURRENT_EVENT_ID.with(|id| inner.event_parent_id = id.borrow().clone()); + CURRENT_HANDLER_ID.with(|id| inner.event_emitted_by_handler_id = id.borrow().clone()); + } + + { + let mut queue = self.runtime.queue.lock(); + if queue_jump { + queue.push_front(event.clone()); + } else { + queue.push_back(event.clone()); + } + } + + self.notify_find_waiters(event.clone()); + self.runtime.queue_notify.notify(1); + event + } + + fn register_in_history(&self, event: Arc) -> bool { + let event_id = event.inner.lock().event_id.clone(); + + if let Some(max_size) = self.runtime.max_history_size { + let current_size = self.runtime.history_order.lock().len(); + if current_size >= max_size { + if self.runtime.max_history_drop { + while self.runtime.history_order.lock().len() >= max_size { + if let Some(oldest) = self.runtime.history_order.lock().pop_front() { + self.runtime.events.lock().remove(&oldest); + } else { + break; + } + } + } else { + return false; + } + } + } + + self.runtime.events.lock().insert(event_id.clone(), event); + self.runtime.history_order.lock().push_back(event_id); + true + } + + pub async fn find( + &self, + pattern: &str, + past: bool, + future: Option, + child_of: Option>, + ) -> Option> { + if past { + let child_of_event_id = child_of + .as_ref() + .map(|event| event.inner.lock().event_id.clone()); + if let Some(matched) = self.find_in_history(pattern, child_of_event_id.as_deref()) { + return Some(matched); + } + } + + let future = future?; + let (tx, rx) = std_mpsc::channel(); + let waiter_id = { + let mut next = self.runtime.next_waiter_id.lock(); + *next += 1; + *next + }; + + self.runtime.find_waiters.lock().push(FindWaiter { + id: waiter_id, + pattern: pattern.to_string(), + child_of_event_id: child_of.map(|event| event.inner.lock().event_id.clone()), + sender: tx, + }); + + let result = rx.recv_timeout(Duration::from_secs_f64(future)).ok(); + self.runtime + .find_waiters + .lock() + .retain(|waiter| waiter.id != waiter_id); + result + } + + fn find_in_history( + &self, + pattern: &str, + child_of_event_id: Option<&str>, + ) -> Option> { + let history = self.runtime.history_order.lock().clone(); + for event_id in history.iter().rev() { + let event = self.runtime.events.lock().get(event_id).cloned()?; + if !self.matches_pattern(&event, pattern) { + continue; + } + if let Some(parent_id) = child_of_event_id { + if !self.event_is_child_of_ids(&event.inner.lock().event_id, parent_id) { + continue; + } + } + return Some(event); + } + None + } + + fn matches_pattern(&self, event: &Arc, pattern: &str) -> bool { + if pattern == "*" { + return true; + } + event.inner.lock().event_type == pattern + } + + fn notify_find_waiters(&self, event: Arc) { + let event_id = event.inner.lock().event_id.clone(); + let mut matched_waiter_ids = Vec::new(); + let mut matched_senders = Vec::new(); + + { + let waiters = self.runtime.find_waiters.lock(); + for waiter in waiters.iter() { + if !self.matches_pattern(&event, &waiter.pattern) { + continue; + } + if let Some(parent_id) = waiter.child_of_event_id.as_deref() { + if !self.event_is_child_of_ids(&event_id, parent_id) { + continue; + } + } + matched_waiter_ids.push(waiter.id); + matched_senders.push(waiter.sender.clone()); + } + } + + if !matched_waiter_ids.is_empty() { + self.runtime + .find_waiters + .lock() + .retain(|waiter| !matched_waiter_ids.contains(&waiter.id)); + for sender in matched_senders { + let _ = sender.send(event.clone()); + } + } + } + + pub fn event_is_child_of( + &self, + child_event: &Arc, + parent_event: &Arc, + ) -> bool { + let child_id = child_event.inner.lock().event_id.clone(); + let parent_id = parent_event.inner.lock().event_id.clone(); + self.event_is_child_of_ids(&child_id, &parent_id) + } + + pub fn event_is_parent_of( + &self, + parent_event: &Arc, + child_event: &Arc, + ) -> bool { + self.event_is_child_of(child_event, parent_event) + } + + fn event_is_child_of_ids(&self, child_event_id: &str, parent_event_id: &str) -> bool { + if child_event_id == parent_event_id { + return false; + } + + let mut current_id = child_event_id.to_string(); + loop { + let Some(current_event) = self.runtime.events.lock().get(¤t_id).cloned() else { + return false; + }; + let current_parent = current_event.inner.lock().event_parent_id.clone(); + let Some(current_parent_id) = current_parent else { + return false; + }; + if current_parent_id == parent_event_id { + return true; + } + current_id = current_parent_id; + } + } + + pub async fn wait_until_idle(&self, timeout: Option) -> bool { + let start = Instant::now(); + loop { + let queue_empty = self.runtime.queue.lock().is_empty(); + let all_completed = self.runtime.events.lock().values().all(|event| { + let status = event.inner.lock().event_status; + status == EventStatus::Completed + }); + if queue_empty && all_completed { + return true; + } + + if let Some(timeout) = timeout { + if start.elapsed() > Duration::from_secs_f64(timeout) { + return false; + } + } + thread::sleep(Duration::from_millis(5)); + } + } + + async fn process_event(&self, event: Arc) { + let mode = event + .inner + .lock() + .event_concurrency + .unwrap_or(self.event_concurrency); + match mode { + EventConcurrencyMode::GlobalSerial => { + let _guard = GLOBAL_SERIAL_LOCK + .get_or_init(|| Arc::new(ReentrantLock::default())) + .lock(); + self.process_event_inner(event).await; + } + EventConcurrencyMode::BusSerial => { + let _guard = self.bus_serial_lock.lock(); + self.process_event_inner(event).await; + } + EventConcurrencyMode::Parallel => { + self.process_event_inner(event).await; + } + } + } + + async fn process_event_inner(&self, event: Arc) { + event.mark_started(); + let started_at = Instant::now(); + + let event_type = event.inner.lock().event_type.clone(); + let mut handlers = self + .handlers + .lock() + .get(&event_type) + .cloned() + .unwrap_or_default(); + handlers.extend(self.handlers.lock().get("*").cloned().unwrap_or_default()); + + let handler_concurrency = event + .inner + .lock() + .event_handler_concurrency + .unwrap_or(self.event_handler_concurrency); + let handler_completion = event + .inner + .lock() + .event_handler_completion + .unwrap_or(self.event_handler_completion); + + let event_timeout = event.inner.lock().event_timeout.or(self.event_timeout); + + match handler_concurrency { + EventHandlerConcurrencyMode::Serial => { + for handler in handlers { + let timed_out = self + .run_handler_with_context(event.clone(), handler, started_at, event_timeout) + .await; + if timed_out { + break; + } + if handler_completion == EventHandlerCompletionMode::First + && self.has_winner(&event) + { + break; + } + } + } + EventHandlerConcurrencyMode::Parallel => { + let mut join_handles = Vec::new(); + for handler in handlers { + let bus = self.clone(); + let event_clone = event.clone(); + join_handles.push(thread::spawn(move || { + block_on(bus.run_handler_with_context( + event_clone, + handler, + started_at, + event_timeout, + )) + })); + } + for handle in join_handles { + let _ = handle.join(); + if handler_completion == EventHandlerCompletionMode::First + && self.has_winner(&event) + { + break; + } + } + } + } + + if let Some(timeout) = event_timeout { + if started_at.elapsed() > Duration::from_secs_f64(timeout) { + self.cancel_children(&event, &format!("parent event timed out after {timeout}s")); + } + } + + if let Some(slow) = event + .inner + .lock() + .event_slow_timeout + .or(self.event_slow_timeout) + { + if started_at.elapsed() > Duration::from_secs_f64(slow) { + eprintln!( + "slow event warning: {} took {:?}", + event.inner.lock().event_type, + started_at.elapsed() + ); + } + } + + { + let mut inner = event.inner.lock(); + inner.event_pending_bus_count = inner.event_pending_bus_count.saturating_sub(1); + if inner.event_status != EventStatus::Completed { + inner.event_completed_at = Some(now_iso()); + } + } + event.mark_completed(); + + if self.runtime.max_history_size == Some(0) { + let event_id = event.inner.lock().event_id.clone(); + self.runtime.events.lock().remove(&event_id); + self.runtime + .history_order + .lock() + .retain(|id| id != &event_id); + } + } + + fn cancel_children(&self, event: &Arc, reason: &str) { + let results = event.inner.lock().event_results.clone(); + for result in results.values() { + for child_id in &result.event_children { + if let Some(child) = self.runtime.events.lock().get(child_id).cloned() { + for child_result in child.inner.lock().event_results.values_mut() { + if child_result.status == EventResultStatus::Pending + || child_result.status == EventResultStatus::Started + { + child_result.status = EventResultStatus::Error; + child_result.error = Some(format!("cancelled: {reason}")); + child_result.completed_at = Some(now_iso()); + } + } + child.mark_completed(); + } + } + } + } + + fn has_winner(&self, event: &Arc) -> bool { + event.inner.lock().event_results.values().any(|result| { + result.status == EventResultStatus::Completed + && result.error.is_none() + && result.result.is_some() + }) + } + + async fn run_handler_with_context( + &self, + event: Arc, + handler: EventHandler, + event_started_at: Instant, + event_timeout: Option, + ) -> bool { + let handler_id = handler.id.clone(); + CURRENT_EVENT_ID.with(|id| *id.borrow_mut() = Some(event.inner.lock().event_id.clone())); + CURRENT_HANDLER_ID.with(|id| *id.borrow_mut() = Some(handler_id.clone())); + let timed_out = self + .run_handler(event, handler, event_started_at, event_timeout) + .await; + CURRENT_HANDLER_ID.with(|id| *id.borrow_mut() = None); + CURRENT_EVENT_ID.with(|id| *id.borrow_mut() = None); + timed_out + } + + async fn run_handler( + &self, + event: Arc, + handler: EventHandler, + event_started_at: Instant, + event_timeout: Option, + ) -> bool { + let handler_timeout = handler + .handler_timeout + .or(event.inner.lock().event_handler_timeout) + .or(event_timeout); + + let remaining_event_timeout = event_timeout.map(|timeout| { + let elapsed = event_started_at.elapsed().as_secs_f64(); + (timeout - elapsed).max(0.0) + }); + + let timeout = match (handler_timeout, remaining_event_timeout) { + (Some(a), Some(b)) => Some(a.min(b)), + (Some(a), None) => Some(a), + (None, Some(b)) => Some(b), + (None, None) => None, + }; + + let mut result = EventResult::new( + event.inner.lock().event_id.clone(), + handler.clone(), + timeout, + ); + result.status = EventResultStatus::Started; + result.started_at = Some(now_iso()); + event + .inner + .lock() + .event_results + .insert(handler.id.clone(), result.clone()); + + let call = handler + .callable + .as_ref() + .expect("handler callable missing") + .clone(); + let (tx, rx) = std_mpsc::channel(); + let event_clone = event.clone(); + let context_event_id = event.inner.lock().event_id.clone(); + let context_handler_id = handler.id.clone(); + thread::spawn(move || { + CURRENT_EVENT_ID.with(|id| *id.borrow_mut() = Some(context_event_id)); + CURRENT_HANDLER_ID.with(|id| *id.borrow_mut() = Some(context_handler_id)); + let response = block_on(call(event_clone)); + CURRENT_HANDLER_ID.with(|id| *id.borrow_mut() = None); + CURRENT_EVENT_ID.with(|id| *id.borrow_mut() = None); + let _ = tx.send(response); + }); + + let call_started = Instant::now(); + let call_result = if let Some(timeout_secs) = timeout { + rx.recv_timeout(Duration::from_secs_f64(timeout_secs)) + .map_err(|_| "timeout".to_string()) + } else { + rx.recv().map_err(|_| "handler channel closed".to_string()) + }; + + let mut current = event + .inner + .lock() + .event_results + .get(&handler.id) + .cloned() + .expect("missing result row"); + + match call_result { + Ok(Ok(value)) => { + current.status = EventResultStatus::Completed; + current.result = Some(value); + } + Ok(Err(error)) => { + current.status = EventResultStatus::Error; + current.error = Some(error); + } + Err(error) => { + current.status = EventResultStatus::Error; + current.error = Some(format!("EventHandlerAbortedError: {error}")); + current.completed_at = Some(now_iso()); + if let Some(latest) = event.inner.lock().event_results.get(&handler.id).cloned() { + current.event_children = latest.event_children; + } + event + .inner + .lock() + .event_results + .insert(handler.id.clone(), current); + return true; + } + } + + if let Some(slow_timeout) = handler + .handler_slow_timeout + .or(event.inner.lock().event_handler_slow_timeout) + .or(self.event_handler_slow_timeout) + { + if call_started.elapsed() > Duration::from_secs_f64(slow_timeout) { + eprintln!( + "slow handler warning: {} took {:?}", + handler.handler_name, + call_started.elapsed() + ); + } + } + + current.completed_at = Some(now_iso()); + if let Some(latest) = event.inner.lock().event_results.get(&handler.id).cloned() { + current.event_children = latest.event_children; + } + event.inner.lock().event_results.insert(handler.id, current); + false + } +} diff --git a/bubus-rust/src/event_handler.rs b/bubus-rust/src/event_handler.rs new file mode 100644 index 0000000..49d4726 --- /dev/null +++ b/bubus-rust/src/event_handler.rs @@ -0,0 +1,56 @@ +use std::{future::Future, pin::Pin, sync::Arc}; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::{base_event::BaseEvent, id::compute_handler_id}; + +pub type HandlerFuture = Pin> + Send + 'static>>; +pub type EventHandlerCallable = + Arc) -> HandlerFuture + Send + Sync + 'static>; + +#[derive(Clone, Serialize, Deserialize)] +pub struct EventHandler { + pub id: String, + pub event_pattern: String, + pub handler_name: String, + pub handler_file_path: Option, + pub handler_timeout: Option, + pub handler_slow_timeout: Option, + pub handler_registered_at: String, + pub eventbus_name: String, + pub eventbus_id: String, + #[serde(skip)] + pub callable: Option, +} + +impl EventHandler { + pub fn from_callable( + event_pattern: String, + handler_name: String, + eventbus_name: String, + eventbus_id: String, + callable: EventHandlerCallable, + ) -> Self { + let handler_registered_at = crate::base_event::now_iso(); + let id = compute_handler_id( + &eventbus_id, + &handler_name, + None, + &handler_registered_at, + &event_pattern, + ); + Self { + id, + event_pattern, + handler_name, + handler_file_path: None, + handler_timeout: None, + handler_slow_timeout: None, + handler_registered_at, + eventbus_name, + eventbus_id, + callable: Some(callable), + } + } +} diff --git a/bubus-rust/src/event_result.rs b/bubus-rust/src/event_result.rs new file mode 100644 index 0000000..3f34aa2 --- /dev/null +++ b/bubus-rust/src/event_result.rs @@ -0,0 +1,44 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::{event_handler::EventHandler, id::uuid_v7_string}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum EventResultStatus { + Pending, + Started, + Completed, + Error, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct EventResult { + pub id: String, + pub status: EventResultStatus, + pub event_id: String, + pub handler: EventHandler, + pub timeout: Option, + pub started_at: Option, + pub result: Option, + pub error: Option, + pub completed_at: Option, + pub event_children: Vec, +} + +impl EventResult { + pub fn new(event_id: String, handler: EventHandler, timeout: Option) -> Self { + Self { + id: uuid_v7_string(), + status: EventResultStatus::Pending, + event_id, + handler, + timeout, + started_at: None, + result: None, + error: None, + completed_at: None, + event_children: vec![], + } + } +} diff --git a/bubus-rust/src/id.rs b/bubus-rust/src/id.rs new file mode 100644 index 0000000..e2f61ae --- /dev/null +++ b/bubus-rust/src/id.rs @@ -0,0 +1,24 @@ +use uuid::Uuid; + +pub fn uuid_v7_string() -> String { + Uuid::now_v7().to_string() +} + +pub fn handler_id_namespace() -> Uuid { + Uuid::new_v5(&Uuid::NAMESPACE_DNS, b"bubus-handler") +} + +pub fn compute_handler_id( + eventbus_id: &str, + handler_name: &str, + handler_file_path: Option<&str>, + handler_registered_at: &str, + event_pattern: &str, +) -> String { + let file_path = handler_file_path.unwrap_or("unknown"); + let seed = format!( + "{}|{}|{}|{}|{}", + eventbus_id, handler_name, file_path, handler_registered_at, event_pattern + ); + Uuid::new_v5(&handler_id_namespace(), seed.as_bytes()).to_string() +} diff --git a/bubus-rust/src/lib.rs b/bubus-rust/src/lib.rs new file mode 100644 index 0000000..b55f348 --- /dev/null +++ b/bubus-rust/src/lib.rs @@ -0,0 +1,10 @@ +pub mod base_event; +pub mod event_bus; +pub mod event_handler; +pub mod event_result; +pub mod id; +pub mod lock_manager; +pub mod typed; +pub mod types; + +pub use types::*; diff --git a/bubus-rust/src/lock_manager.rs b/bubus-rust/src/lock_manager.rs new file mode 100644 index 0000000..8fdc730 --- /dev/null +++ b/bubus-rust/src/lock_manager.rs @@ -0,0 +1,29 @@ +use std::{collections::HashMap, sync::Arc}; + +use parking_lot::Mutex; + +#[derive(Default, Clone)] +pub struct ReentrantLock { + lock: Arc>, +} + +impl ReentrantLock { + pub fn lock(&self) -> parking_lot::MutexGuard<'_, ()> { + self.lock.lock() + } +} + +#[derive(Default)] +pub struct LockManager { + locks: Mutex>>, +} + +impl LockManager { + pub fn get_lock(&self, key: &str) -> Arc { + let mut locks = self.locks.lock(); + locks + .entry(key.to_string()) + .or_insert_with(|| Arc::new(ReentrantLock::default())) + .clone() + } +} diff --git a/bubus-rust/src/typed.rs b/bubus-rust/src/typed.rs new file mode 100644 index 0000000..97eaea2 --- /dev/null +++ b/bubus-rust/src/typed.rs @@ -0,0 +1,109 @@ +use std::{collections::HashMap, marker::PhantomData, sync::Arc}; + +use serde::{de::DeserializeOwned, Serialize}; +use serde_json::{Map, Value}; + +use crate::{base_event::BaseEvent, event_bus::EventBus}; + +pub trait EventSpec: Send + Sync + 'static { + type Payload: Serialize + DeserializeOwned + Clone + Send + Sync + 'static; + type Result: Serialize + DeserializeOwned + Clone + Send + Sync + 'static; + + const EVENT_TYPE: &'static str; +} + +#[derive(Clone)] +pub struct TypedEvent { + pub inner: Arc, + marker: PhantomData, +} + +impl TypedEvent { + pub fn new(payload: E::Payload) -> Self { + let value = serde_json::to_value(payload).expect("typed payload serialization failed"); + let Value::Object(payload_map) = value else { + panic!("typed payload must serialize to a JSON object"); + }; + + Self { + inner: BaseEvent::new(E::EVENT_TYPE, payload_map), + marker: PhantomData, + } + } + + pub fn from_base_event(event: Arc) -> Self { + Self { + inner: event, + marker: PhantomData, + } + } + + pub fn payload(&self) -> E::Payload { + let payload = self.inner.inner.lock().payload.clone(); + let value = Value::Object(payload); + serde_json::from_value(value).expect("typed payload decode failed") + } + + pub async fn wait_completed(&self) { + self.inner.wait_completed().await; + } + + pub fn first_result(&self) -> Option { + let results: HashMap = + self.inner.inner.lock().event_results.clone(); + for result in results.values() { + if result.error.is_none() { + if let Some(value) = &result.result { + let decoded: E::Result = + serde_json::from_value(value.clone()).expect("typed result decode failed"); + return Some(decoded); + } + } + } + None + } +} + +impl EventBus { + pub fn emit_typed(&self, payload: E::Payload) -> TypedEvent { + let typed_event = TypedEvent::::new(payload); + let emitted = self.emit(typed_event.inner.clone()); + TypedEvent::from_base_event(emitted) + } + + pub fn on_typed( + &self, + handler_name: &str, + handler_fn: F, + ) -> crate::event_handler::EventHandler + where + E: EventSpec, + F: Fn(TypedEvent) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, + { + self.on(E::EVENT_TYPE, handler_name, move |event| { + let typed = TypedEvent::::from_base_event(event); + let fut = handler_fn(typed); + async move { + let result = fut.await?; + serde_json::to_value(result).map_err(|error| error.to_string()) + } + }) + } + + pub async fn find_typed( + &self, + past: bool, + future: Option, + ) -> Option> { + let found = self.find(E::EVENT_TYPE, past, future, None).await?; + Some(TypedEvent::from_base_event(found)) + } +} + +pub fn payload_map_from_value(value: Value) -> Map { + match value { + Value::Object(map) => map, + _ => panic!("typed payload must be a JSON object"), + } +} diff --git a/bubus-rust/src/types.rs b/bubus-rust/src/types.rs new file mode 100644 index 0000000..0d0c556 --- /dev/null +++ b/bubus-rust/src/types.rs @@ -0,0 +1,31 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum EventConcurrencyMode { + GlobalSerial, + BusSerial, + Parallel, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum EventHandlerConcurrencyMode { + Serial, + Parallel, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum EventHandlerCompletionMode { + All, + First, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum EventStatus { + Pending, + Started, + Completed, +} diff --git a/bubus-rust/tests/event_bus_tests.rs b/bubus-rust/tests/event_bus_tests.rs new file mode 100644 index 0000000..eeca391 --- /dev/null +++ b/bubus-rust/tests/event_bus_tests.rs @@ -0,0 +1,55 @@ +use std::{sync::Arc, thread, time::Duration}; + +use bubus_rust::{ + base_event::BaseEvent, + event_bus::EventBus, + types::{EventConcurrencyMode, EventHandlerConcurrencyMode}, +}; +use futures::executor::block_on; +use serde_json::{json, Map}; + +fn mk_event(event_type: &str) -> Arc { + let mut payload = Map::new(); + payload.insert("value".to_string(), json!(1)); + BaseEvent::new(event_type.to_string(), payload) +} + +#[test] +fn test_emit_and_handler_result() { + let bus = EventBus::new(Some("BusA".to_string())); + bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); + let event = mk_event("work"); + bus.emit(event.clone()); + block_on(event.wait_completed()); + + let results = event.inner.lock().event_results.clone(); + assert_eq!(results.len(), 1); + let first = results.values().next().expect("missing first result"); + assert_eq!(first.result, Some(json!("ok"))); + bus.stop(); +} + +#[test] +fn test_parallel_handler_concurrency() { + let bus = EventBus::new(Some("BusPar".to_string())); + + bus.on("work", "h1", |_event| async move { + thread::sleep(Duration::from_millis(20)); + Ok(json!(1)) + }); + bus.on("work", "h2", |_event| async move { + thread::sleep(Duration::from_millis(20)); + Ok(json!(2)) + }); + + let event = mk_event("work"); + { + let mut inner = event.inner.lock(); + inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Parallel); + inner.event_concurrency = Some(EventConcurrencyMode::Parallel); + } + bus.emit(event.clone()); + block_on(event.wait_completed()); + assert_eq!(event.inner.lock().event_results.len(), 2); + bus.stop(); +} diff --git a/bubus-rust/tests/test_base_event.rs b/bubus-rust/tests/test_base_event.rs new file mode 100644 index 0000000..06737ff --- /dev/null +++ b/bubus-rust/tests/test_base_event.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; + +use bubus_rust::{base_event::BaseEvent, types::EventStatus}; +use futures::executor::block_on; +use serde_json::{json, Map}; + +fn mk_event(event_type: &str) -> Arc { + let mut payload = Map::new(); + payload.insert("value".to_string(), json!(1)); + BaseEvent::new(event_type.to_string(), payload) +} + +#[test] +fn test_base_event_json_roundtrip() { + let event = mk_event("test_event"); + let json_value = event.to_json_value(); + let deserialized = BaseEvent::from_json_value(json_value.clone()); + assert_eq!(json_value, deserialized.to_json_value()); +} + +#[test] +fn test_base_event_runtime_state_transitions() { + let event = mk_event("runtime_event"); + assert_eq!(event.inner.lock().event_status, EventStatus::Pending); + event.mark_started(); + assert_eq!(event.inner.lock().event_status, EventStatus::Started); + event.mark_completed(); + assert_eq!(event.inner.lock().event_status, EventStatus::Completed); + block_on(event.wait_completed()); +} diff --git a/bubus-rust/tests/test_event_handler_first.rs b/bubus-rust/tests/test_event_handler_first.rs new file mode 100644 index 0000000..b8d9613 --- /dev/null +++ b/bubus-rust/tests/test_event_handler_first.rs @@ -0,0 +1,41 @@ +use std::{sync::Arc, thread, time::Duration}; + +use bubus_rust::{ + base_event::BaseEvent, + event_bus::EventBus, + types::{EventHandlerCompletionMode, EventHandlerConcurrencyMode}, +}; +use futures::executor::block_on; +use serde_json::{json, Map}; + +fn mk_event(event_type: &str) -> Arc { + BaseEvent::new(event_type.to_string(), Map::new()) +} + +#[test] +fn test_event_handler_first_serial_stops_after_first_success() { + let bus = EventBus::new(Some("BusFirstSerial".to_string())); + + bus.on("work", "first", |_event| async move { Ok(json!("winner")) }); + bus.on("work", "second", |_event| async move { + thread::sleep(Duration::from_millis(20)); + Ok(json!("late")) + }); + + let event = mk_event("work"); + { + let mut inner = event.inner.lock(); + inner.event_handler_completion = Some(EventHandlerCompletionMode::First); + inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Serial); + } + bus.emit(event.clone()); + block_on(event.wait_completed()); + + let results = event.inner.lock().event_results.clone(); + assert_eq!(results.len(), 1); + assert_eq!( + results.values().next().and_then(|r| r.result.clone()), + Some(json!("winner")) + ); + bus.stop(); +} diff --git a/bubus-rust/tests/test_event_handler_ids.rs b/bubus-rust/tests/test_event_handler_ids.rs new file mode 100644 index 0000000..de4f619 --- /dev/null +++ b/bubus-rust/tests/test_event_handler_ids.rs @@ -0,0 +1,32 @@ +use bubus_rust::id::compute_handler_id; +use uuid::Uuid; + +#[test] +fn test_compute_handler_id_matches_uuidv5_seed_algorithm() { + let eventbus_id = "0195f6ac-9f10-7e4b-bf69-fb33c68ca13e"; + let handler_name = "tests.handlers.handle_work"; + let handler_file_path = Some("~/repo/tests/handlers.py:10"); + let handler_registered_at = "2025-01-01T00:00:00.000000Z"; + let event_pattern = "work"; + + let computed = compute_handler_id( + eventbus_id, + handler_name, + handler_file_path, + handler_registered_at, + event_pattern, + ); + + let namespace = Uuid::new_v5(&Uuid::NAMESPACE_DNS, b"bubus-handler"); + let seed = format!( + "{}|{}|{}|{}|{}", + eventbus_id, + handler_name, + handler_file_path.unwrap(), + handler_registered_at, + event_pattern + ); + let expected = Uuid::new_v5(&namespace, seed.as_bytes()).to_string(); + + assert_eq!(computed, expected); +} diff --git a/bubus-rust/tests/test_event_history_store.rs b/bubus-rust/tests/test_event_history_store.rs new file mode 100644 index 0000000..18ff758 --- /dev/null +++ b/bubus-rust/tests/test_event_history_store.rs @@ -0,0 +1,35 @@ +use bubus_rust::{base_event::BaseEvent, event_bus::EventBus}; +use futures::executor::block_on; +use serde_json::Map; + +#[test] +fn test_max_history_drop_true_keeps_recent_entries() { + let bus = EventBus::new_with_history(Some("HistoryDropBus".to_string()), Some(2), true); + + for i in 0..3 { + let event = BaseEvent::new(format!("evt_{i}"), Map::new()); + bus.emit(event.clone()); + block_on(event.wait_completed()); + } + + let history = bus.event_history_ids(); + assert_eq!(history.len(), 2); + assert!(history.iter().any(|id| id.contains('-'))); + bus.stop(); +} + +#[test] +fn test_max_history_drop_false_rejects_new_emit_when_full() { + let bus = EventBus::new_with_history(Some("HistoryRejectBus".to_string()), Some(1), false); + + let first = BaseEvent::new("first", Map::new()); + bus.emit(first.clone()); + block_on(first.wait_completed()); + + let second = BaseEvent::new("second", Map::new()); + bus.emit(second.clone()); + block_on(second.wait_completed()); + + assert_eq!(second.inner.lock().event_path.len(), 0); + bus.stop(); +} diff --git a/bubus-rust/tests/test_event_result.rs b/bubus-rust/tests/test_event_result.rs new file mode 100644 index 0000000..6fbc58c --- /dev/null +++ b/bubus-rust/tests/test_event_result.rs @@ -0,0 +1,24 @@ +use bubus_rust::{ + event_handler::EventHandler, + event_result::{EventResult, EventResultStatus}, +}; + +#[test] +fn test_event_result_defaults() { + let handler = EventHandler { + id: "h1".into(), + event_pattern: "work".into(), + handler_name: "handler".into(), + handler_file_path: None, + handler_timeout: None, + handler_slow_timeout: None, + handler_registered_at: "2026-01-01T00:00:00.000Z".into(), + eventbus_name: "bus".into(), + eventbus_id: "bus-id".into(), + callable: None, + }; + + let result = EventResult::new("event-id".into(), handler, Some(5.0)); + assert_eq!(result.status, EventResultStatus::Pending); + assert_eq!(result.timeout, Some(5.0)); +} diff --git a/bubus-rust/tests/test_eventbus_dispatch_defaults.rs b/bubus-rust/tests/test_eventbus_dispatch_defaults.rs new file mode 100644 index 0000000..7956f58 --- /dev/null +++ b/bubus-rust/tests/test_eventbus_dispatch_defaults.rs @@ -0,0 +1,33 @@ +use std::sync::Arc; + +use bubus_rust::{ + base_event::BaseEvent, + event_bus::EventBus, + types::{EventHandlerCompletionMode, EventHandlerConcurrencyMode}, +}; +use futures::executor::block_on; +use serde_json::{json, Map}; + +fn mk_event(event_type: &str) -> Arc { + let mut payload = Map::new(); + payload.insert("value".to_string(), json!(1)); + BaseEvent::new(event_type.to_string(), payload) +} + +#[test] +fn test_bus_default_handler_settings_are_applied() { + let bus = EventBus::new(Some("BusDefaults".to_string())); + + bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); + let event = mk_event("work"); + { + let mut inner = event.inner.lock(); + inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Serial); + inner.event_handler_completion = Some(EventHandlerCompletionMode::All); + } + bus.emit(event.clone()); + block_on(event.wait_completed()); + + assert_eq!(event.inner.lock().event_results.len(), 1); + bus.stop(); +} diff --git a/bubus-rust/tests/test_eventbus_edge_cases.rs b/bubus-rust/tests/test_eventbus_edge_cases.rs new file mode 100644 index 0000000..7daab06 --- /dev/null +++ b/bubus-rust/tests/test_eventbus_edge_cases.rs @@ -0,0 +1,57 @@ +use bubus_rust::{base_event::BaseEvent, event_bus::EventBus, event_result::EventResultStatus}; +use futures::executor::block_on; +use serde_json::{json, Map}; + +#[test] +fn test_emit_with_no_handlers_completes_event() { + let bus = EventBus::new(Some("NoHandlers".to_string())); + let event = BaseEvent::new("nothing", Map::new()); + + bus.emit(event.clone()); + block_on(event.wait_completed()); + + let inner = event.inner.lock(); + assert_eq!(inner.event_results.len(), 0); + assert_eq!(inner.event_pending_bus_count, 0); + assert!(inner.event_started_at.is_some()); + assert!(inner.event_completed_at.is_some()); + drop(inner); + bus.stop(); +} + +#[test] +fn test_wildcard_handler_runs_for_any_event_type() { + let bus = EventBus::new(Some("WildcardBus".to_string())); + bus.on("*", "catch_all", |_event| async move { Ok(json!("all")) }); + let event = BaseEvent::new("specific_event", Map::new()); + + bus.emit(event.clone()); + block_on(event.wait_completed()); + + let results = event.inner.lock().event_results.clone(); + assert_eq!(results.len(), 1); + let only = results.values().next().expect("missing result"); + assert_eq!(only.result, Some(json!("all"))); + bus.stop(); +} + +#[test] +fn test_handler_error_populates_error_status() { + let bus = EventBus::new(Some("ErrorBus".to_string())); + bus.on( + "work", + "bad", + |_event| async move { Err("boom".to_string()) }, + ); + let event = BaseEvent::new("work", Map::new()); + + bus.emit(event.clone()); + block_on(event.wait_completed()); + + let results = event.inner.lock().event_results.clone(); + assert_eq!(results.len(), 1); + let only = results.values().next().expect("missing result"); + assert_eq!(only.status, EventResultStatus::Error); + assert_eq!(only.error.as_deref(), Some("boom")); + bus.stop(); +} diff --git a/bubus-rust/tests/test_eventbus_find.rs b/bubus-rust/tests/test_eventbus_find.rs new file mode 100644 index 0000000..afe3756 --- /dev/null +++ b/bubus-rust/tests/test_eventbus_find.rs @@ -0,0 +1,37 @@ +use std::{thread, time::Duration}; + +use bubus_rust::{base_event::BaseEvent, event_bus::EventBus}; +use futures::executor::block_on; +use serde_json::{json, Map}; + +#[test] +fn test_find_past_match_returns_event() { + let bus = EventBus::new(Some("FindBus".to_string())); + bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); + + let event = BaseEvent::new("work", Map::new()); + bus.emit(event.clone()); + block_on(event.wait_completed()); + + let found = block_on(bus.find("work", true, None, None)); + assert!(found.is_some()); + assert_eq!(found.expect("missing").inner.lock().event_type, "work"); + + bus.stop(); +} + +#[test] +fn test_find_future_waits_for_new_event() { + let bus = EventBus::new(Some("FindFutureBus".to_string())); + let bus_for_emit = bus.clone(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(30)); + let event = BaseEvent::new("future_event", Map::new()); + bus_for_emit.emit(event); + }); + + let found = block_on(bus.find("future_event", false, Some(0.5), None)); + assert!(found.is_some()); + bus.stop(); +} diff --git a/bubus-rust/tests/test_eventbus_locking.rs b/bubus-rust/tests/test_eventbus_locking.rs new file mode 100644 index 0000000..2bb00f0 --- /dev/null +++ b/bubus-rust/tests/test_eventbus_locking.rs @@ -0,0 +1,87 @@ +use std::{thread, time::Duration}; + +use bubus_rust::{base_event::BaseEvent, event_bus::EventBus, types::EventConcurrencyMode}; +use futures::executor::block_on; +use serde_json::{json, Map, Value}; + +#[test] +fn test_queue_jump() { + let bus = EventBus::new(Some("BusJump".to_string())); + bus.on("q", "h", |event| async move { + let value = event + .inner + .lock() + .payload + .get("idx") + .cloned() + .unwrap_or(Value::Null); + Ok(value) + }); + + let mut p1 = Map::new(); + p1.insert("idx".into(), json!(1)); + let event1 = BaseEvent::new("q", p1); + let mut p2 = Map::new(); + p2.insert("idx".into(), json!(2)); + let event2 = BaseEvent::new("q", p2); + + bus.emit(event1.clone()); + bus.emit_with_options(event2.clone(), true); + + block_on(async { + event1.wait_completed().await; + event2.wait_completed().await; + }); + + let event1_started = event1 + .inner + .lock() + .event_started_at + .clone() + .unwrap_or_default(); + let event2_started = event2 + .inner + .lock() + .event_started_at + .clone() + .unwrap_or_default(); + assert!(event2_started <= event1_started); + bus.stop(); +} + +#[test] +fn test_bus_serial_processes_in_order() { + let bus = EventBus::new(Some("BusSerial".to_string())); + + bus.on("work", "slow", |_event| async move { + thread::sleep(Duration::from_millis(15)); + Ok(json!(1)) + }); + + let event1 = BaseEvent::new("work", Map::new()); + let event2 = BaseEvent::new("work", Map::new()); + event1.inner.lock().event_concurrency = Some(EventConcurrencyMode::BusSerial); + event2.inner.lock().event_concurrency = Some(EventConcurrencyMode::BusSerial); + bus.emit(event1.clone()); + bus.emit(event2.clone()); + + block_on(async { + event1.wait_completed().await; + event2.wait_completed().await; + }); + + let event1_started = event1 + .inner + .lock() + .event_started_at + .clone() + .unwrap_or_default(); + let event2_started = event2 + .inner + .lock() + .event_started_at + .clone() + .unwrap_or_default(); + assert!(event1_started <= event2_started); + bus.stop(); +} diff --git a/bubus-rust/tests/test_eventbus_on_off.rs b/bubus-rust/tests/test_eventbus_on_off.rs new file mode 100644 index 0000000..f6f4614 --- /dev/null +++ b/bubus-rust/tests/test_eventbus_on_off.rs @@ -0,0 +1,22 @@ +use bubus_rust::{base_event::BaseEvent, event_bus::EventBus}; +use futures::executor::block_on; +use serde_json::{json, Map}; + +#[test] +fn test_on_returns_handler_and_off_removes_handler() { + let bus = EventBus::new(Some("OnOffBus".to_string())); + + let handler = bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); + let event_1 = BaseEvent::new("work", Map::new()); + bus.emit(event_1.clone()); + block_on(event_1.wait_completed()); + assert_eq!(event_1.inner.lock().event_results.len(), 1); + + bus.off("work", Some(&handler.id)); + let event_2 = BaseEvent::new("work", Map::new()); + bus.emit(event_2.clone()); + block_on(event_2.wait_completed()); + assert_eq!(event_2.inner.lock().event_results.len(), 0); + + bus.stop(); +} diff --git a/bubus-rust/tests/test_eventbus_timeout.rs b/bubus-rust/tests/test_eventbus_timeout.rs new file mode 100644 index 0000000..5950e8b --- /dev/null +++ b/bubus-rust/tests/test_eventbus_timeout.rs @@ -0,0 +1,107 @@ +use std::{thread, time::Duration}; + +use bubus_rust::{base_event::BaseEvent, event_bus::EventBus, event_result::EventResultStatus}; +use futures::executor::block_on; +use serde_json::{json, Map}; + +fn wait_until_completed(event: &std::sync::Arc, timeout_ms: u64) { + let started = std::time::Instant::now(); + while started.elapsed() < Duration::from_millis(timeout_ms) { + if event.inner.lock().event_status == bubus_rust::types::EventStatus::Completed { + return; + } + thread::sleep(Duration::from_millis(5)); + } + panic!("event did not complete within {timeout_ms}ms"); +} + +#[test] +fn test_event_timeout_aborts_in_flight_handler_result() { + let bus = EventBus::new(Some("TimeoutBus".to_string())); + + bus.on("timeout", "slow", |_event| async move { + thread::sleep(Duration::from_millis(50)); + Ok(json!("slow")) + }); + + let event = BaseEvent::new("timeout", Map::new()); + event.inner.lock().event_timeout = Some(0.01); + + bus.emit(event.clone()); + block_on(event.wait_completed()); + + let result = event + .inner + .lock() + .event_results + .values() + .next() + .cloned() + .expect("missing result"); + assert_eq!(result.status, EventResultStatus::Error); + assert!(result + .error + .as_deref() + .unwrap_or_default() + .contains("EventHandlerAbortedError")); + bus.stop(); +} + +#[test] +fn test_parent_timeout_cancels_pending_or_started_children() { + let bus = EventBus::new(Some("ParentTimeoutBus".to_string())); + let bus_for_handler = bus.clone(); + + bus.on("child", "child_slow", |_event| async move { + thread::sleep(Duration::from_millis(80)); + Ok(json!("child")) + }); + + bus.on("parent", "emit_child", move |_event| { + let bus_local = bus_for_handler.clone(); + async move { + let child = BaseEvent::new("child", Map::new()); + child.inner.lock().event_timeout = Some(1.0); + bus_local.emit(child); + thread::sleep(Duration::from_millis(80)); + Ok(json!("parent")) + } + }); + + let parent = BaseEvent::new("parent", Map::new()); + parent.inner.lock().event_timeout = Some(0.01); + + bus.emit(parent.clone()); + wait_until_completed(&parent, 1000); + thread::sleep(Duration::from_millis(120)); + + let parent_result = parent + .inner + .lock() + .event_results + .values() + .next() + .cloned() + .expect("missing parent result"); + assert_eq!(parent_result.status, EventResultStatus::Error); + + let parent_id = parent.inner.lock().event_id.clone(); + let payload = bus.runtime_payload_for_test(); + let child = payload + .values() + .find(|evt| evt.inner.lock().event_parent_id.as_deref() == Some(parent_id.as_str())) + .cloned() + .expect("missing child event"); + + let child_inner = child.inner.lock(); + let has_error = child_inner + .event_results + .values() + .any(|r| r.status == EventResultStatus::Error); + let is_completed = child_inner + .event_results + .values() + .any(|r| r.status == EventResultStatus::Completed); + assert!(has_error || is_completed); + bus.stop(); +} diff --git a/bubus-rust/tests/test_ids.rs b/bubus-rust/tests/test_ids.rs new file mode 100644 index 0000000..1e1ae27 --- /dev/null +++ b/bubus-rust/tests/test_ids.rs @@ -0,0 +1,48 @@ +use bubus_rust::{ + event_bus::EventBus, + event_handler::EventHandler, + id::{compute_handler_id, handler_id_namespace}, +}; +use serde_json::Map; +use uuid::Uuid; + +#[test] +fn test_bus_and_event_ids_are_uuid_v7() { + let bus = EventBus::new(Some("BusId".to_string())); + let bus_id = Uuid::parse_str(&bus.id).expect("bus id must parse"); + assert_eq!(bus_id.get_version_num(), 7); + + let event = bubus_rust::base_event::BaseEvent::new("work", Map::new()); + let event_id = Uuid::parse_str(&event.inner.lock().event_id).expect("event id must parse"); + assert_eq!(event_id.get_version_num(), 7); +} + +#[test] +fn test_handler_id_uses_v5_namespace_seed_compatible_with_python_ts() { + let eventbus_id = "018f6f0e-79b2-7cc5-aed9-f0f9a4e5e6b0"; + let handler_name = "module.fn"; + let handler_registered_at = "2026-01-01T00:00:00.000Z"; + let event_pattern = "work"; + let expected_seed = + format!("{eventbus_id}|{handler_name}|unknown|{handler_registered_at}|{event_pattern}"); + + let expected = Uuid::new_v5(&handler_id_namespace(), expected_seed.as_bytes()).to_string(); + let actual = compute_handler_id( + eventbus_id, + handler_name, + None, + handler_registered_at, + event_pattern, + ); + assert_eq!(actual, expected); + + let entry = EventHandler::from_callable( + event_pattern.to_string(), + handler_name.to_string(), + "BusId".to_string(), + eventbus_id.to_string(), + std::sync::Arc::new(|_event| Box::pin(async { Ok(serde_json::Value::Null) })), + ); + let ns = Uuid::parse_str(&entry.id).expect("handler id must parse"); + assert_eq!(ns.get_version_num(), 5); +} diff --git a/bubus-rust/tests/test_typed_events.rs b/bubus-rust/tests/test_typed_events.rs new file mode 100644 index 0000000..b1b89f9 --- /dev/null +++ b/bubus-rust/tests/test_typed_events.rs @@ -0,0 +1,58 @@ +use bubus_rust::{ + event_bus::EventBus, + typed::{EventSpec, TypedEvent}, +}; +use futures::executor::block_on; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Serialize, Deserialize)] +struct AddPayload { + a: i64, + b: i64, +} + +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] +struct AddResult { + sum: i64, +} + +struct AddEvent; + +impl EventSpec for AddEvent { + type Payload = AddPayload; + type Result = AddResult; + + const EVENT_TYPE: &'static str = "AddEvent"; +} + +#[test] +fn test_on_typed_and_emit_typed_roundtrip() { + let bus = EventBus::new(Some("TypedBus".to_string())); + + bus.on_typed::("add", |event: TypedEvent| async move { + let payload = event.payload(); + Ok(AddResult { + sum: payload.a + payload.b, + }) + }); + + let event = bus.emit_typed::(AddPayload { a: 4, b: 9 }); + block_on(event.wait_completed()); + + let first = event.first_result(); + assert_eq!(first, Some(AddResult { sum: 13 })); + bus.stop(); +} + +#[test] +fn test_find_typed_returns_typed_payload() { + let bus = EventBus::new(Some("TypedFindBus".to_string())); + + let event = bus.emit_typed::(AddPayload { a: 7, b: 1 }); + block_on(event.wait_completed()); + + let found = block_on(bus.find_typed::(true, None)).expect("expected typed event"); + assert_eq!(found.payload().a, 7); + assert_eq!(found.payload().b, 1); + bus.stop(); +} From 40f46eab2b0a3f1ec13d27bc4c6af19bed27a9ca Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Fri, 20 Feb 2026 13:28:19 -0800 Subject: [PATCH 2/4] Make EventBus emit API typed by default in bubus-rust --- bubus-rust/src/event_bus.rs | 6 +++--- bubus-rust/src/typed.rs | 4 ++-- bubus-rust/tests/event_bus_tests.rs | 4 ++-- bubus-rust/tests/test_event_handler_first.rs | 2 +- bubus-rust/tests/test_event_history_store.rs | 6 +++--- bubus-rust/tests/test_eventbus_dispatch_defaults.rs | 2 +- bubus-rust/tests/test_eventbus_edge_cases.rs | 6 +++--- bubus-rust/tests/test_eventbus_find.rs | 4 ++-- bubus-rust/tests/test_eventbus_locking.rs | 8 ++++---- bubus-rust/tests/test_eventbus_on_off.rs | 4 ++-- bubus-rust/tests/test_eventbus_timeout.rs | 6 +++--- bubus-rust/tests/test_typed_events.rs | 4 ++-- 12 files changed, 28 insertions(+), 28 deletions(-) diff --git a/bubus-rust/src/event_bus.rs b/bubus-rust/src/event_bus.rs index 14251f6..65af3d0 100644 --- a/bubus-rust/src/event_bus.rs +++ b/bubus-rust/src/event_bus.rs @@ -185,11 +185,11 @@ impl EventBus { } } - pub fn emit(&self, event: Arc) -> Arc { - self.emit_with_options(event, false) + pub fn emit_raw(&self, event: Arc) -> Arc { + self.emit_raw_with_options(event, false) } - pub fn emit_with_options(&self, event: Arc, queue_jump: bool) -> Arc { + pub fn emit_raw_with_options(&self, event: Arc, queue_jump: bool) -> Arc { if !self.register_in_history(event.clone()) { event.mark_completed(); return event; diff --git a/bubus-rust/src/typed.rs b/bubus-rust/src/typed.rs index 97eaea2..ddac0d5 100644 --- a/bubus-rust/src/typed.rs +++ b/bubus-rust/src/typed.rs @@ -65,9 +65,9 @@ impl TypedEvent { } impl EventBus { - pub fn emit_typed(&self, payload: E::Payload) -> TypedEvent { + pub fn emit(&self, payload: E::Payload) -> TypedEvent { let typed_event = TypedEvent::::new(payload); - let emitted = self.emit(typed_event.inner.clone()); + let emitted = self.emit_raw(typed_event.inner.clone()); TypedEvent::from_base_event(emitted) } diff --git a/bubus-rust/tests/event_bus_tests.rs b/bubus-rust/tests/event_bus_tests.rs index eeca391..e199ac6 100644 --- a/bubus-rust/tests/event_bus_tests.rs +++ b/bubus-rust/tests/event_bus_tests.rs @@ -19,7 +19,7 @@ fn test_emit_and_handler_result() { let bus = EventBus::new(Some("BusA".to_string())); bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); let event = mk_event("work"); - bus.emit(event.clone()); + bus.emit_raw(event.clone()); block_on(event.wait_completed()); let results = event.inner.lock().event_results.clone(); @@ -48,7 +48,7 @@ fn test_parallel_handler_concurrency() { inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Parallel); inner.event_concurrency = Some(EventConcurrencyMode::Parallel); } - bus.emit(event.clone()); + bus.emit_raw(event.clone()); block_on(event.wait_completed()); assert_eq!(event.inner.lock().event_results.len(), 2); bus.stop(); diff --git a/bubus-rust/tests/test_event_handler_first.rs b/bubus-rust/tests/test_event_handler_first.rs index b8d9613..fa349da 100644 --- a/bubus-rust/tests/test_event_handler_first.rs +++ b/bubus-rust/tests/test_event_handler_first.rs @@ -28,7 +28,7 @@ fn test_event_handler_first_serial_stops_after_first_success() { inner.event_handler_completion = Some(EventHandlerCompletionMode::First); inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Serial); } - bus.emit(event.clone()); + bus.emit_raw(event.clone()); block_on(event.wait_completed()); let results = event.inner.lock().event_results.clone(); diff --git a/bubus-rust/tests/test_event_history_store.rs b/bubus-rust/tests/test_event_history_store.rs index 18ff758..3325b98 100644 --- a/bubus-rust/tests/test_event_history_store.rs +++ b/bubus-rust/tests/test_event_history_store.rs @@ -8,7 +8,7 @@ fn test_max_history_drop_true_keeps_recent_entries() { for i in 0..3 { let event = BaseEvent::new(format!("evt_{i}"), Map::new()); - bus.emit(event.clone()); + bus.emit_raw(event.clone()); block_on(event.wait_completed()); } @@ -23,11 +23,11 @@ fn test_max_history_drop_false_rejects_new_emit_when_full() { let bus = EventBus::new_with_history(Some("HistoryRejectBus".to_string()), Some(1), false); let first = BaseEvent::new("first", Map::new()); - bus.emit(first.clone()); + bus.emit_raw(first.clone()); block_on(first.wait_completed()); let second = BaseEvent::new("second", Map::new()); - bus.emit(second.clone()); + bus.emit_raw(second.clone()); block_on(second.wait_completed()); assert_eq!(second.inner.lock().event_path.len(), 0); diff --git a/bubus-rust/tests/test_eventbus_dispatch_defaults.rs b/bubus-rust/tests/test_eventbus_dispatch_defaults.rs index 7956f58..4dad206 100644 --- a/bubus-rust/tests/test_eventbus_dispatch_defaults.rs +++ b/bubus-rust/tests/test_eventbus_dispatch_defaults.rs @@ -25,7 +25,7 @@ fn test_bus_default_handler_settings_are_applied() { inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Serial); inner.event_handler_completion = Some(EventHandlerCompletionMode::All); } - bus.emit(event.clone()); + bus.emit_raw(event.clone()); block_on(event.wait_completed()); assert_eq!(event.inner.lock().event_results.len(), 1); diff --git a/bubus-rust/tests/test_eventbus_edge_cases.rs b/bubus-rust/tests/test_eventbus_edge_cases.rs index 7daab06..77342b8 100644 --- a/bubus-rust/tests/test_eventbus_edge_cases.rs +++ b/bubus-rust/tests/test_eventbus_edge_cases.rs @@ -7,7 +7,7 @@ fn test_emit_with_no_handlers_completes_event() { let bus = EventBus::new(Some("NoHandlers".to_string())); let event = BaseEvent::new("nothing", Map::new()); - bus.emit(event.clone()); + bus.emit_raw(event.clone()); block_on(event.wait_completed()); let inner = event.inner.lock(); @@ -25,7 +25,7 @@ fn test_wildcard_handler_runs_for_any_event_type() { bus.on("*", "catch_all", |_event| async move { Ok(json!("all")) }); let event = BaseEvent::new("specific_event", Map::new()); - bus.emit(event.clone()); + bus.emit_raw(event.clone()); block_on(event.wait_completed()); let results = event.inner.lock().event_results.clone(); @@ -45,7 +45,7 @@ fn test_handler_error_populates_error_status() { ); let event = BaseEvent::new("work", Map::new()); - bus.emit(event.clone()); + bus.emit_raw(event.clone()); block_on(event.wait_completed()); let results = event.inner.lock().event_results.clone(); diff --git a/bubus-rust/tests/test_eventbus_find.rs b/bubus-rust/tests/test_eventbus_find.rs index afe3756..67504f9 100644 --- a/bubus-rust/tests/test_eventbus_find.rs +++ b/bubus-rust/tests/test_eventbus_find.rs @@ -10,7 +10,7 @@ fn test_find_past_match_returns_event() { bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); let event = BaseEvent::new("work", Map::new()); - bus.emit(event.clone()); + bus.emit_raw(event.clone()); block_on(event.wait_completed()); let found = block_on(bus.find("work", true, None, None)); @@ -28,7 +28,7 @@ fn test_find_future_waits_for_new_event() { thread::spawn(move || { thread::sleep(Duration::from_millis(30)); let event = BaseEvent::new("future_event", Map::new()); - bus_for_emit.emit(event); + bus_for_emit.emit_raw(event); }); let found = block_on(bus.find("future_event", false, Some(0.5), None)); diff --git a/bubus-rust/tests/test_eventbus_locking.rs b/bubus-rust/tests/test_eventbus_locking.rs index 2bb00f0..dd7ac2c 100644 --- a/bubus-rust/tests/test_eventbus_locking.rs +++ b/bubus-rust/tests/test_eventbus_locking.rs @@ -25,8 +25,8 @@ fn test_queue_jump() { p2.insert("idx".into(), json!(2)); let event2 = BaseEvent::new("q", p2); - bus.emit(event1.clone()); - bus.emit_with_options(event2.clone(), true); + bus.emit_raw(event1.clone()); + bus.emit_raw_with_options(event2.clone(), true); block_on(async { event1.wait_completed().await; @@ -62,8 +62,8 @@ fn test_bus_serial_processes_in_order() { let event2 = BaseEvent::new("work", Map::new()); event1.inner.lock().event_concurrency = Some(EventConcurrencyMode::BusSerial); event2.inner.lock().event_concurrency = Some(EventConcurrencyMode::BusSerial); - bus.emit(event1.clone()); - bus.emit(event2.clone()); + bus.emit_raw(event1.clone()); + bus.emit_raw(event2.clone()); block_on(async { event1.wait_completed().await; diff --git a/bubus-rust/tests/test_eventbus_on_off.rs b/bubus-rust/tests/test_eventbus_on_off.rs index f6f4614..19062a8 100644 --- a/bubus-rust/tests/test_eventbus_on_off.rs +++ b/bubus-rust/tests/test_eventbus_on_off.rs @@ -8,13 +8,13 @@ fn test_on_returns_handler_and_off_removes_handler() { let handler = bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); let event_1 = BaseEvent::new("work", Map::new()); - bus.emit(event_1.clone()); + bus.emit_raw(event_1.clone()); block_on(event_1.wait_completed()); assert_eq!(event_1.inner.lock().event_results.len(), 1); bus.off("work", Some(&handler.id)); let event_2 = BaseEvent::new("work", Map::new()); - bus.emit(event_2.clone()); + bus.emit_raw(event_2.clone()); block_on(event_2.wait_completed()); assert_eq!(event_2.inner.lock().event_results.len(), 0); diff --git a/bubus-rust/tests/test_eventbus_timeout.rs b/bubus-rust/tests/test_eventbus_timeout.rs index 5950e8b..d4c8ae5 100644 --- a/bubus-rust/tests/test_eventbus_timeout.rs +++ b/bubus-rust/tests/test_eventbus_timeout.rs @@ -27,7 +27,7 @@ fn test_event_timeout_aborts_in_flight_handler_result() { let event = BaseEvent::new("timeout", Map::new()); event.inner.lock().event_timeout = Some(0.01); - bus.emit(event.clone()); + bus.emit_raw(event.clone()); block_on(event.wait_completed()); let result = event @@ -62,7 +62,7 @@ fn test_parent_timeout_cancels_pending_or_started_children() { async move { let child = BaseEvent::new("child", Map::new()); child.inner.lock().event_timeout = Some(1.0); - bus_local.emit(child); + bus_local.emit_raw(child); thread::sleep(Duration::from_millis(80)); Ok(json!("parent")) } @@ -71,7 +71,7 @@ fn test_parent_timeout_cancels_pending_or_started_children() { let parent = BaseEvent::new("parent", Map::new()); parent.inner.lock().event_timeout = Some(0.01); - bus.emit(parent.clone()); + bus.emit_raw(parent.clone()); wait_until_completed(&parent, 1000); thread::sleep(Duration::from_millis(120)); diff --git a/bubus-rust/tests/test_typed_events.rs b/bubus-rust/tests/test_typed_events.rs index b1b89f9..84bd5ac 100644 --- a/bubus-rust/tests/test_typed_events.rs +++ b/bubus-rust/tests/test_typed_events.rs @@ -36,7 +36,7 @@ fn test_on_typed_and_emit_typed_roundtrip() { }) }); - let event = bus.emit_typed::(AddPayload { a: 4, b: 9 }); + let event = bus.emit::(AddPayload { a: 4, b: 9 }); block_on(event.wait_completed()); let first = event.first_result(); @@ -48,7 +48,7 @@ fn test_on_typed_and_emit_typed_roundtrip() { fn test_find_typed_returns_typed_payload() { let bus = EventBus::new(Some("TypedFindBus".to_string())); - let event = bus.emit_typed::(AddPayload { a: 7, b: 1 }); + let event = bus.emit::(AddPayload { a: 7, b: 1 }); block_on(event.wait_completed()); let found = block_on(bus.find_typed::(true, None)).expect("expected typed event"); From 187bc68cf192de26353c710259d1f61aeeee660d Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Fri, 20 Feb 2026 13:49:51 -0800 Subject: [PATCH 3/4] Remove raw emit API and require typed emission paths --- bubus-rust/src/event_bus.rs | 10 +++- bubus-rust/src/typed.rs | 25 +++++++- bubus-rust/tests/event_bus_tests.rs | 41 ++++++++----- bubus-rust/tests/test_event_handler_first.rs | 30 ++++++---- bubus-rust/tests/test_event_history_store.rs | 33 +++++++---- .../tests/test_eventbus_dispatch_defaults.rs | 32 ++++++---- bubus-rust/tests/test_eventbus_edge_cases.rs | 44 ++++++++++---- bubus-rust/tests/test_eventbus_find.rs | 28 +++++++-- bubus-rust/tests/test_eventbus_locking.rs | 59 +++++++++++++------ bubus-rust/tests/test_eventbus_on_off.rs | 26 +++++--- bubus-rust/tests/test_eventbus_timeout.rs | 58 +++++++++++++----- 11 files changed, 278 insertions(+), 108 deletions(-) diff --git a/bubus-rust/src/event_bus.rs b/bubus-rust/src/event_bus.rs index 65af3d0..9ae1ff1 100644 --- a/bubus-rust/src/event_bus.rs +++ b/bubus-rust/src/event_bus.rs @@ -185,11 +185,15 @@ impl EventBus { } } - pub fn emit_raw(&self, event: Arc) -> Arc { - self.emit_raw_with_options(event, false) + pub(crate) fn enqueue_base(&self, event: Arc) -> Arc { + self.enqueue_base_with_options(event, false) } - pub fn emit_raw_with_options(&self, event: Arc, queue_jump: bool) -> Arc { + pub(crate) fn enqueue_base_with_options( + &self, + event: Arc, + queue_jump: bool, + ) -> Arc { if !self.register_in_history(event.clone()) { event.mark_completed(); return event; diff --git a/bubus-rust/src/typed.rs b/bubus-rust/src/typed.rs index ddac0d5..8593c0b 100644 --- a/bubus-rust/src/typed.rs +++ b/bubus-rust/src/typed.rs @@ -67,10 +67,33 @@ impl TypedEvent { impl EventBus { pub fn emit(&self, payload: E::Payload) -> TypedEvent { let typed_event = TypedEvent::::new(payload); - let emitted = self.emit_raw(typed_event.inner.clone()); + let emitted = self.enqueue_base(typed_event.inner.clone()); TypedEvent::from_base_event(emitted) } + pub fn emit_with_options( + &self, + payload: E::Payload, + queue_jump: bool, + ) -> TypedEvent { + let typed_event = TypedEvent::::new(payload); + let emitted = self.enqueue_base_with_options(typed_event.inner.clone(), queue_jump); + TypedEvent::from_base_event(emitted) + } + + pub fn emit_existing(&self, event: TypedEvent) -> TypedEvent { + let emitted = self.enqueue_base(event.inner.clone()); + TypedEvent::from_base_event(emitted) + } + + pub fn emit_existing_with_options( + &self, + event: TypedEvent, + queue_jump: bool, + ) -> TypedEvent { + let emitted = self.enqueue_base_with_options(event.inner.clone(), queue_jump); + TypedEvent::from_base_event(emitted) + } pub fn on_typed( &self, handler_name: &str, diff --git a/bubus-rust/tests/event_bus_tests.rs b/bubus-rust/tests/event_bus_tests.rs index e199ac6..cebf653 100644 --- a/bubus-rust/tests/event_bus_tests.rs +++ b/bubus-rust/tests/event_bus_tests.rs @@ -1,28 +1,39 @@ -use std::{sync::Arc, thread, time::Duration}; +use std::{thread, time::Duration}; use bubus_rust::{ - base_event::BaseEvent, event_bus::EventBus, + typed::{EventSpec, TypedEvent}, types::{EventConcurrencyMode, EventHandlerConcurrencyMode}, }; use futures::executor::block_on; -use serde_json::{json, Map}; +use serde::{Deserialize, Serialize}; +use serde_json::json; -fn mk_event(event_type: &str) -> Arc { - let mut payload = Map::new(); - payload.insert("value".to_string(), json!(1)); - BaseEvent::new(event_type.to_string(), payload) +#[derive(Clone, Serialize, Deserialize)] +struct WorkPayload { + value: i64, +} + +#[derive(Clone, Serialize, Deserialize)] +struct WorkResult { + value: i64, +} + +struct WorkEvent; +impl EventSpec for WorkEvent { + type Payload = WorkPayload; + type Result = WorkResult; + const EVENT_TYPE: &'static str = "work"; } #[test] fn test_emit_and_handler_result() { let bus = EventBus::new(Some("BusA".to_string())); bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); - let event = mk_event("work"); - bus.emit_raw(event.clone()); + let event = bus.emit::(WorkPayload { value: 1 }); block_on(event.wait_completed()); - let results = event.inner.lock().event_results.clone(); + let results = event.inner.inner.lock().event_results.clone(); assert_eq!(results.len(), 1); let first = results.values().next().expect("missing first result"); assert_eq!(first.result, Some(json!("ok"))); @@ -42,14 +53,14 @@ fn test_parallel_handler_concurrency() { Ok(json!(2)) }); - let event = mk_event("work"); + let event = TypedEvent::::new(WorkPayload { value: 1 }); { - let mut inner = event.inner.lock(); + let mut inner = event.inner.inner.lock(); inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Parallel); inner.event_concurrency = Some(EventConcurrencyMode::Parallel); } - bus.emit_raw(event.clone()); - block_on(event.wait_completed()); - assert_eq!(event.inner.lock().event_results.len(), 2); + let emitted = bus.emit_existing(event); + block_on(emitted.wait_completed()); + assert_eq!(emitted.inner.inner.lock().event_results.len(), 2); bus.stop(); } diff --git a/bubus-rust/tests/test_event_handler_first.rs b/bubus-rust/tests/test_event_handler_first.rs index fa349da..474bc88 100644 --- a/bubus-rust/tests/test_event_handler_first.rs +++ b/bubus-rust/tests/test_event_handler_first.rs @@ -1,15 +1,25 @@ -use std::{sync::Arc, thread, time::Duration}; +use std::{thread, time::Duration}; use bubus_rust::{ - base_event::BaseEvent, event_bus::EventBus, + typed::{EventSpec, TypedEvent}, types::{EventHandlerCompletionMode, EventHandlerConcurrencyMode}, }; use futures::executor::block_on; -use serde_json::{json, Map}; +use serde::{Deserialize, Serialize}; +use serde_json::json; -fn mk_event(event_type: &str) -> Arc { - BaseEvent::new(event_type.to_string(), Map::new()) +#[derive(Clone, Serialize, Deserialize)] +struct EmptyPayload {} +#[derive(Clone, Serialize, Deserialize)] +struct WorkResult { + value: String, +} +struct WorkEvent; +impl EventSpec for WorkEvent { + type Payload = EmptyPayload; + type Result = WorkResult; + const EVENT_TYPE: &'static str = "work"; } #[test] @@ -22,16 +32,16 @@ fn test_event_handler_first_serial_stops_after_first_success() { Ok(json!("late")) }); - let event = mk_event("work"); + let event = TypedEvent::::new(EmptyPayload {}); { - let mut inner = event.inner.lock(); + let mut inner = event.inner.inner.lock(); inner.event_handler_completion = Some(EventHandlerCompletionMode::First); inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Serial); } - bus.emit_raw(event.clone()); - block_on(event.wait_completed()); + let emitted = bus.emit_existing(event); + block_on(emitted.wait_completed()); - let results = event.inner.lock().event_results.clone(); + let results = emitted.inner.inner.lock().event_results.clone(); assert_eq!(results.len(), 1); assert_eq!( results.values().next().and_then(|r| r.result.clone()), diff --git a/bubus-rust/tests/test_event_history_store.rs b/bubus-rust/tests/test_event_history_store.rs index 3325b98..124490c 100644 --- a/bubus-rust/tests/test_event_history_store.rs +++ b/bubus-rust/tests/test_event_history_store.rs @@ -1,14 +1,28 @@ -use bubus_rust::{base_event::BaseEvent, event_bus::EventBus}; +use bubus_rust::{ + event_bus::EventBus, + typed::{EventSpec, TypedEvent}, +}; use futures::executor::block_on; -use serde_json::Map; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Serialize, Deserialize)] +struct EmptyPayload {} +#[derive(Clone, Serialize, Deserialize)] +struct EmptyResult {} + +struct HistoryEvent; +impl EventSpec for HistoryEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "history_event"; +} #[test] fn test_max_history_drop_true_keeps_recent_entries() { let bus = EventBus::new_with_history(Some("HistoryDropBus".to_string()), Some(2), true); - for i in 0..3 { - let event = BaseEvent::new(format!("evt_{i}"), Map::new()); - bus.emit_raw(event.clone()); + for _ in 0..3 { + let event = bus.emit::(EmptyPayload {}); block_on(event.wait_completed()); } @@ -22,14 +36,13 @@ fn test_max_history_drop_true_keeps_recent_entries() { fn test_max_history_drop_false_rejects_new_emit_when_full() { let bus = EventBus::new_with_history(Some("HistoryRejectBus".to_string()), Some(1), false); - let first = BaseEvent::new("first", Map::new()); - bus.emit_raw(first.clone()); + let first = bus.emit::(EmptyPayload {}); block_on(first.wait_completed()); - let second = BaseEvent::new("second", Map::new()); - bus.emit_raw(second.clone()); + let second = TypedEvent::::new(EmptyPayload {}); + let second = bus.emit_existing(second); block_on(second.wait_completed()); - assert_eq!(second.inner.lock().event_path.len(), 0); + assert_eq!(second.inner.inner.lock().event_path.len(), 0); bus.stop(); } diff --git a/bubus-rust/tests/test_eventbus_dispatch_defaults.rs b/bubus-rust/tests/test_eventbus_dispatch_defaults.rs index 4dad206..f994075 100644 --- a/bubus-rust/tests/test_eventbus_dispatch_defaults.rs +++ b/bubus-rust/tests/test_eventbus_dispatch_defaults.rs @@ -1,17 +1,25 @@ -use std::sync::Arc; - use bubus_rust::{ - base_event::BaseEvent, event_bus::EventBus, + typed::{EventSpec, TypedEvent}, types::{EventHandlerCompletionMode, EventHandlerConcurrencyMode}, }; use futures::executor::block_on; -use serde_json::{json, Map}; +use serde::{Deserialize, Serialize}; +use serde_json::json; -fn mk_event(event_type: &str) -> Arc { - let mut payload = Map::new(); - payload.insert("value".to_string(), json!(1)); - BaseEvent::new(event_type.to_string(), payload) +#[derive(Clone, Serialize, Deserialize)] +struct Payload { + value: i64, +} +#[derive(Clone, Serialize, Deserialize)] +struct ResultT { + value: String, +} +struct WorkEvent; +impl EventSpec for WorkEvent { + type Payload = Payload; + type Result = ResultT; + const EVENT_TYPE: &'static str = "work"; } #[test] @@ -19,15 +27,15 @@ fn test_bus_default_handler_settings_are_applied() { let bus = EventBus::new(Some("BusDefaults".to_string())); bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); - let event = mk_event("work"); + let event = TypedEvent::::new(Payload { value: 1 }); { - let mut inner = event.inner.lock(); + let mut inner = event.inner.inner.lock(); inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Serial); inner.event_handler_completion = Some(EventHandlerCompletionMode::All); } - bus.emit_raw(event.clone()); + let event = bus.emit_existing(event); block_on(event.wait_completed()); - assert_eq!(event.inner.lock().event_results.len(), 1); + assert_eq!(event.inner.inner.lock().event_results.len(), 1); bus.stop(); } diff --git a/bubus-rust/tests/test_eventbus_edge_cases.rs b/bubus-rust/tests/test_eventbus_edge_cases.rs index 77342b8..dcc6152 100644 --- a/bubus-rust/tests/test_eventbus_edge_cases.rs +++ b/bubus-rust/tests/test_eventbus_edge_cases.rs @@ -1,16 +1,40 @@ -use bubus_rust::{base_event::BaseEvent, event_bus::EventBus, event_result::EventResultStatus}; +use bubus_rust::{event_bus::EventBus, event_result::EventResultStatus, typed::EventSpec}; use futures::executor::block_on; -use serde_json::{json, Map}; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Clone, Serialize, Deserialize)] +struct EmptyPayload {} +#[derive(Clone, Serialize, Deserialize)] +struct EmptyResult {} + +struct NothingEvent; +impl EventSpec for NothingEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "nothing"; +} +struct SpecificEvent; +impl EventSpec for SpecificEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "specific_event"; +} +struct WorkEvent; +impl EventSpec for WorkEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "work"; +} #[test] fn test_emit_with_no_handlers_completes_event() { let bus = EventBus::new(Some("NoHandlers".to_string())); - let event = BaseEvent::new("nothing", Map::new()); + let event = bus.emit::(EmptyPayload {}); - bus.emit_raw(event.clone()); block_on(event.wait_completed()); - let inner = event.inner.lock(); + let inner = event.inner.inner.lock(); assert_eq!(inner.event_results.len(), 0); assert_eq!(inner.event_pending_bus_count, 0); assert!(inner.event_started_at.is_some()); @@ -23,12 +47,11 @@ fn test_emit_with_no_handlers_completes_event() { fn test_wildcard_handler_runs_for_any_event_type() { let bus = EventBus::new(Some("WildcardBus".to_string())); bus.on("*", "catch_all", |_event| async move { Ok(json!("all")) }); - let event = BaseEvent::new("specific_event", Map::new()); + let event = bus.emit::(EmptyPayload {}); - bus.emit_raw(event.clone()); block_on(event.wait_completed()); - let results = event.inner.lock().event_results.clone(); + let results = event.inner.inner.lock().event_results.clone(); assert_eq!(results.len(), 1); let only = results.values().next().expect("missing result"); assert_eq!(only.result, Some(json!("all"))); @@ -43,12 +66,11 @@ fn test_handler_error_populates_error_status() { "bad", |_event| async move { Err("boom".to_string()) }, ); - let event = BaseEvent::new("work", Map::new()); + let event = bus.emit::(EmptyPayload {}); - bus.emit_raw(event.clone()); block_on(event.wait_completed()); - let results = event.inner.lock().event_results.clone(); + let results = event.inner.inner.lock().event_results.clone(); assert_eq!(results.len(), 1); let only = results.values().next().expect("missing result"); assert_eq!(only.status, EventResultStatus::Error); diff --git a/bubus-rust/tests/test_eventbus_find.rs b/bubus-rust/tests/test_eventbus_find.rs index 67504f9..4f771b6 100644 --- a/bubus-rust/tests/test_eventbus_find.rs +++ b/bubus-rust/tests/test_eventbus_find.rs @@ -1,16 +1,33 @@ use std::{thread, time::Duration}; -use bubus_rust::{base_event::BaseEvent, event_bus::EventBus}; +use bubus_rust::{event_bus::EventBus, typed::EventSpec}; use futures::executor::block_on; -use serde_json::{json, Map}; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Clone, Serialize, Deserialize)] +struct EmptyPayload {} +#[derive(Clone, Serialize, Deserialize)] +struct EmptyResult {} +struct WorkEvent; +impl EventSpec for WorkEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "work"; +} +struct FutureEvent; +impl EventSpec for FutureEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "future_event"; +} #[test] fn test_find_past_match_returns_event() { let bus = EventBus::new(Some("FindBus".to_string())); bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); - let event = BaseEvent::new("work", Map::new()); - bus.emit_raw(event.clone()); + let event = bus.emit::(EmptyPayload {}); block_on(event.wait_completed()); let found = block_on(bus.find("work", true, None, None)); @@ -27,8 +44,7 @@ fn test_find_future_waits_for_new_event() { thread::spawn(move || { thread::sleep(Duration::from_millis(30)); - let event = BaseEvent::new("future_event", Map::new()); - bus_for_emit.emit_raw(event); + bus_for_emit.emit::(EmptyPayload {}); }); let found = block_on(bus.find("future_event", false, Some(0.5), None)); diff --git a/bubus-rust/tests/test_eventbus_locking.rs b/bubus-rust/tests/test_eventbus_locking.rs index dd7ac2c..9dff8eb 100644 --- a/bubus-rust/tests/test_eventbus_locking.rs +++ b/bubus-rust/tests/test_eventbus_locking.rs @@ -1,8 +1,34 @@ use std::{thread, time::Duration}; -use bubus_rust::{base_event::BaseEvent, event_bus::EventBus, types::EventConcurrencyMode}; +use bubus_rust::{ + event_bus::EventBus, + typed::{EventSpec, TypedEvent}, + types::EventConcurrencyMode, +}; use futures::executor::block_on; -use serde_json::{json, Map, Value}; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Clone, Serialize, Deserialize)] +struct QPayload { + idx: i64, +} +#[derive(Clone, Serialize, Deserialize)] +struct EmptyPayload {} +#[derive(Clone, Serialize, Deserialize)] +struct EmptyResult {} +struct QEvent; +impl EventSpec for QEvent { + type Payload = QPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "q"; +} +struct WorkEvent; +impl EventSpec for WorkEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "work"; +} #[test] fn test_queue_jump() { @@ -14,19 +40,12 @@ fn test_queue_jump() { .payload .get("idx") .cloned() - .unwrap_or(Value::Null); + .unwrap_or(serde_json::Value::Null); Ok(value) }); - let mut p1 = Map::new(); - p1.insert("idx".into(), json!(1)); - let event1 = BaseEvent::new("q", p1); - let mut p2 = Map::new(); - p2.insert("idx".into(), json!(2)); - let event2 = BaseEvent::new("q", p2); - - bus.emit_raw(event1.clone()); - bus.emit_raw_with_options(event2.clone(), true); + let event1 = bus.emit::(QPayload { idx: 1 }); + let event2 = bus.emit_with_options::(QPayload { idx: 2 }, true); block_on(async { event1.wait_completed().await; @@ -34,12 +53,14 @@ fn test_queue_jump() { }); let event1_started = event1 + .inner .inner .lock() .event_started_at .clone() .unwrap_or_default(); let event2_started = event2 + .inner .inner .lock() .event_started_at @@ -58,12 +79,12 @@ fn test_bus_serial_processes_in_order() { Ok(json!(1)) }); - let event1 = BaseEvent::new("work", Map::new()); - let event2 = BaseEvent::new("work", Map::new()); - event1.inner.lock().event_concurrency = Some(EventConcurrencyMode::BusSerial); - event2.inner.lock().event_concurrency = Some(EventConcurrencyMode::BusSerial); - bus.emit_raw(event1.clone()); - bus.emit_raw(event2.clone()); + let event1 = TypedEvent::::new(EmptyPayload {}); + let event2 = TypedEvent::::new(EmptyPayload {}); + event1.inner.inner.lock().event_concurrency = Some(EventConcurrencyMode::BusSerial); + event2.inner.inner.lock().event_concurrency = Some(EventConcurrencyMode::BusSerial); + let event1 = bus.emit_existing(event1); + let event2 = bus.emit_existing(event2); block_on(async { event1.wait_completed().await; @@ -71,12 +92,14 @@ fn test_bus_serial_processes_in_order() { }); let event1_started = event1 + .inner .inner .lock() .event_started_at .clone() .unwrap_or_default(); let event2_started = event2 + .inner .inner .lock() .event_started_at diff --git a/bubus-rust/tests/test_eventbus_on_off.rs b/bubus-rust/tests/test_eventbus_on_off.rs index 19062a8..7406d0c 100644 --- a/bubus-rust/tests/test_eventbus_on_off.rs +++ b/bubus-rust/tests/test_eventbus_on_off.rs @@ -1,22 +1,32 @@ -use bubus_rust::{base_event::BaseEvent, event_bus::EventBus}; +use bubus_rust::{event_bus::EventBus, typed::EventSpec}; use futures::executor::block_on; -use serde_json::{json, Map}; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Clone, Serialize, Deserialize)] +struct EmptyPayload {} +#[derive(Clone, Serialize, Deserialize)] +struct EmptyResult {} +struct WorkEvent; +impl EventSpec for WorkEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "work"; +} #[test] fn test_on_returns_handler_and_off_removes_handler() { let bus = EventBus::new(Some("OnOffBus".to_string())); let handler = bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); - let event_1 = BaseEvent::new("work", Map::new()); - bus.emit_raw(event_1.clone()); + let event_1 = bus.emit::(EmptyPayload {}); block_on(event_1.wait_completed()); - assert_eq!(event_1.inner.lock().event_results.len(), 1); + assert_eq!(event_1.inner.inner.lock().event_results.len(), 1); bus.off("work", Some(&handler.id)); - let event_2 = BaseEvent::new("work", Map::new()); - bus.emit_raw(event_2.clone()); + let event_2 = bus.emit::(EmptyPayload {}); block_on(event_2.wait_completed()); - assert_eq!(event_2.inner.lock().event_results.len(), 0); + assert_eq!(event_2.inner.inner.lock().event_results.len(), 0); bus.stop(); } diff --git a/bubus-rust/tests/test_eventbus_timeout.rs b/bubus-rust/tests/test_eventbus_timeout.rs index d4c8ae5..062bb77 100644 --- a/bubus-rust/tests/test_eventbus_timeout.rs +++ b/bubus-rust/tests/test_eventbus_timeout.rs @@ -1,13 +1,41 @@ use std::{thread, time::Duration}; -use bubus_rust::{base_event::BaseEvent, event_bus::EventBus, event_result::EventResultStatus}; +use bubus_rust::{ + event_bus::EventBus, + event_result::EventResultStatus, + typed::{EventSpec, TypedEvent}, +}; use futures::executor::block_on; -use serde_json::{json, Map}; +use serde::{Deserialize, Serialize}; +use serde_json::json; -fn wait_until_completed(event: &std::sync::Arc, timeout_ms: u64) { +#[derive(Clone, Serialize, Deserialize)] +struct EmptyPayload {} +#[derive(Clone, Serialize, Deserialize)] +struct EmptyResult {} +struct TimeoutEvent; +impl EventSpec for TimeoutEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "timeout"; +} +struct ChildEvent; +impl EventSpec for ChildEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "child"; +} +struct ParentEvent; +impl EventSpec for ParentEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "parent"; +} + +fn wait_until_completed(event: &TypedEvent, timeout_ms: u64) { let started = std::time::Instant::now(); while started.elapsed() < Duration::from_millis(timeout_ms) { - if event.inner.lock().event_status == bubus_rust::types::EventStatus::Completed { + if event.inner.inner.lock().event_status == bubus_rust::types::EventStatus::Completed { return; } thread::sleep(Duration::from_millis(5)); @@ -24,13 +52,14 @@ fn test_event_timeout_aborts_in_flight_handler_result() { Ok(json!("slow")) }); - let event = BaseEvent::new("timeout", Map::new()); - event.inner.lock().event_timeout = Some(0.01); + let event = TypedEvent::::new(EmptyPayload {}); + event.inner.inner.lock().event_timeout = Some(0.01); - bus.emit_raw(event.clone()); + let event = bus.emit_existing(event); block_on(event.wait_completed()); let result = event + .inner .inner .lock() .event_results @@ -60,22 +89,23 @@ fn test_parent_timeout_cancels_pending_or_started_children() { bus.on("parent", "emit_child", move |_event| { let bus_local = bus_for_handler.clone(); async move { - let child = BaseEvent::new("child", Map::new()); - child.inner.lock().event_timeout = Some(1.0); - bus_local.emit_raw(child); + let child = TypedEvent::::new(EmptyPayload {}); + child.inner.inner.lock().event_timeout = Some(1.0); + bus_local.emit_existing(child); thread::sleep(Duration::from_millis(80)); Ok(json!("parent")) } }); - let parent = BaseEvent::new("parent", Map::new()); - parent.inner.lock().event_timeout = Some(0.01); + let parent = TypedEvent::::new(EmptyPayload {}); + parent.inner.inner.lock().event_timeout = Some(0.01); - bus.emit_raw(parent.clone()); + let parent = bus.emit_existing(parent); wait_until_completed(&parent, 1000); thread::sleep(Duration::from_millis(120)); let parent_result = parent + .inner .inner .lock() .event_results @@ -85,7 +115,7 @@ fn test_parent_timeout_cancels_pending_or_started_children() { .expect("missing parent result"); assert_eq!(parent_result.status, EventResultStatus::Error); - let parent_id = parent.inner.lock().event_id.clone(); + let parent_id = parent.inner.inner.lock().event_id.clone(); let payload = bus.runtime_payload_for_test(); let child = payload .values() From 73d10ed5a626ddde891e4193c97cc6c9d0aa9f9e Mon Sep 17 00:00:00 2001 From: Nick Sweeting Date: Fri, 20 Feb 2026 14:32:28 -0800 Subject: [PATCH 4/4] Address PR review issues in typed emit and runtime coordination --- bubus-rust/src/event_bus.rs | 168 +++++++++++++----- bubus-rust/src/typed.rs | 28 +-- bubus-rust/tests/event_bus_tests.rs | 4 +- bubus-rust/tests/test_event_handler_first.rs | 2 +- bubus-rust/tests/test_event_history_store.rs | 6 +- .../tests/test_eventbus_dispatch_defaults.rs | 2 +- bubus-rust/tests/test_eventbus_edge_cases.rs | 12 +- bubus-rust/tests/test_eventbus_find.rs | 9 +- bubus-rust/tests/test_eventbus_locking.rs | 9 +- bubus-rust/tests/test_eventbus_on_off.rs | 9 +- bubus-rust/tests/test_eventbus_timeout.rs | 6 +- bubus-rust/tests/test_typed_events.rs | 4 +- 12 files changed, 168 insertions(+), 91 deletions(-) diff --git a/bubus-rust/src/event_bus.rs b/bubus-rust/src/event_bus.rs index 9ae1ff1..1445c08 100644 --- a/bubus-rust/src/event_bus.rs +++ b/bubus-rust/src/event_bus.rs @@ -205,10 +205,42 @@ impl EventBus { inner .event_path .push(format!("{}#{}", self.name, &self.id[0..4])); - CURRENT_EVENT_ID.with(|id| inner.event_parent_id = id.borrow().clone()); - CURRENT_HANDLER_ID.with(|id| inner.event_emitted_by_handler_id = id.borrow().clone()); + if inner.event_parent_id.is_none() { + CURRENT_EVENT_ID.with(|id| { + let current_parent = id.borrow().clone(); + if current_parent.as_deref() != Some(inner.event_id.as_str()) { + inner.event_parent_id = current_parent; + } + }); + } + if inner.event_emitted_by_handler_id.is_none() { + CURRENT_HANDLER_ID.with(|id| { + inner.event_emitted_by_handler_id = id.borrow().clone(); + }); + } } + let emitted_child_id = event.inner.lock().event_id.clone(); + CURRENT_EVENT_ID.with(|current_event_id| { + CURRENT_HANDLER_ID.with(|current_handler_id| { + let Some(parent_id) = current_event_id.borrow().clone() else { + return; + }; + let Some(handler_id) = current_handler_id.borrow().clone() else { + return; + }; + let Some(parent_event) = self.runtime.events.lock().get(&parent_id).cloned() else { + return; + }; + let mut parent_inner = parent_event.inner.lock(); + if let Some(result) = parent_inner.event_results.get_mut(&handler_id) { + if !result.event_children.contains(&emitted_child_id) { + result.event_children.push(emitted_child_id.clone()); + } + } + }); + }); + { let mut queue = self.runtime.queue.lock(); if queue_jump { @@ -227,19 +259,12 @@ impl EventBus { let event_id = event.inner.lock().event_id.clone(); if let Some(max_size) = self.runtime.max_history_size { - let current_size = self.runtime.history_order.lock().len(); - if current_size >= max_size { - if self.runtime.max_history_drop { - while self.runtime.history_order.lock().len() >= max_size { - if let Some(oldest) = self.runtime.history_order.lock().pop_front() { - self.runtime.events.lock().remove(&oldest); - } else { - break; - } - } - } else { + if max_size > 0 { + let current_size = self.runtime.history_order.lock().len(); + if current_size >= max_size && !self.runtime.max_history_drop { return false; } + self.trim_history_to_capacity(max_size, true); } } @@ -248,6 +273,36 @@ impl EventBus { true } + fn trim_history_to_capacity(&self, max_size: usize, include_equal: bool) { + if max_size == 0 { + return; + } + loop { + let current_len = self.runtime.history_order.lock().len(); + if current_len < max_size || (!include_equal && current_len == max_size) { + break; + } + let Some(oldest) = self.runtime.history_order.lock().front().cloned() else { + break; + }; + let is_active = self + .runtime + .events + .lock() + .get(&oldest) + .map(|event| { + let status = event.inner.lock().event_status; + status == EventStatus::Pending || status == EventStatus::Started + }) + .unwrap_or(false); + if is_active { + break; + } + self.runtime.history_order.lock().pop_front(); + self.runtime.events.lock().remove(&oldest); + } + } + pub async fn find( &self, pattern: &str, @@ -255,35 +310,53 @@ impl EventBus { future: Option, child_of: Option>, ) -> Option> { - if past { - let child_of_event_id = child_of - .as_ref() - .map(|event| event.inner.lock().event_id.clone()); - if let Some(matched) = self.find_in_history(pattern, child_of_event_id.as_deref()) { - return Some(matched); + let child_of_event_id = child_of + .as_ref() + .map(|event| event.inner.lock().event_id.clone()); + + let mut waiter_id: Option = None; + let mut waiter_rx: Option>> = None; + + if future.is_some() { + let (tx, rx) = std_mpsc::channel(); + let id = { + let mut next = self.runtime.next_waiter_id.lock(); + *next += 1; + *next + }; + self.runtime.find_waiters.lock().push(FindWaiter { + id, + pattern: pattern.to_string(), + child_of_event_id: child_of_event_id.clone(), + sender: tx, + }); + waiter_id = Some(id); + waiter_rx = Some(rx); + + if past { + if let Some(matched) = self.find_in_history(pattern, child_of_event_id.as_deref()) { + self.runtime + .find_waiters + .lock() + .retain(|waiter| waiter.id != id); + return Some(matched); + } } + } else if past { + return self.find_in_history(pattern, child_of_event_id.as_deref()); } - let future = future?; - let (tx, rx) = std_mpsc::channel(); - let waiter_id = { - let mut next = self.runtime.next_waiter_id.lock(); - *next += 1; - *next - }; + let timeout = future?; + let result = + waiter_rx.and_then(|rx| rx.recv_timeout(Duration::from_secs_f64(timeout)).ok()); - self.runtime.find_waiters.lock().push(FindWaiter { - id: waiter_id, - pattern: pattern.to_string(), - child_of_event_id: child_of.map(|event| event.inner.lock().event_id.clone()), - sender: tx, - }); + if let Some(id) = waiter_id { + self.runtime + .find_waiters + .lock() + .retain(|waiter| waiter.id != id); + } - let result = rx.recv_timeout(Duration::from_secs_f64(future)).ok(); - self.runtime - .find_waiters - .lock() - .retain(|waiter| waiter.id != waiter_id); result } @@ -294,7 +367,9 @@ impl EventBus { ) -> Option> { let history = self.runtime.history_order.lock().clone(); for event_id in history.iter().rev() { - let event = self.runtime.events.lock().get(event_id).cloned()?; + let Some(event) = self.runtime.events.lock().get(event_id).cloned() else { + continue; + }; if !self.matches_pattern(&event, pattern) { continue; } @@ -488,11 +563,6 @@ impl EventBus { } for handle in join_handles { let _ = handle.join(); - if handler_completion == EventHandlerCompletionMode::First - && self.has_winner(&event) - { - break; - } } } } @@ -518,14 +588,18 @@ impl EventBus { } } - { + let should_complete = { let mut inner = event.inner.lock(); inner.event_pending_bus_count = inner.event_pending_bus_count.saturating_sub(1); - if inner.event_status != EventStatus::Completed { + let done = inner.event_pending_bus_count == 0; + if done && inner.event_status != EventStatus::Completed { inner.event_completed_at = Some(now_iso()); } + done + }; + if should_complete { + event.mark_completed(); } - event.mark_completed(); if self.runtime.max_history_size == Some(0) { let event_id = event.inner.lock().event_id.clone(); @@ -534,6 +608,8 @@ impl EventBus { .history_order .lock() .retain(|id| id != &event_id); + } else if let Some(max_size) = self.runtime.max_history_size { + self.trim_history_to_capacity(max_size, false); } } diff --git a/bubus-rust/src/typed.rs b/bubus-rust/src/typed.rs index 8593c0b..a198365 100644 --- a/bubus-rust/src/typed.rs +++ b/bubus-rust/src/typed.rs @@ -51,7 +51,12 @@ impl TypedEvent { pub fn first_result(&self) -> Option { let results: HashMap = self.inner.inner.lock().event_results.clone(); - for result in results.values() { + let mut ordered_handler_ids: Vec = results.keys().cloned().collect(); + ordered_handler_ids.sort(); + for handler_id in ordered_handler_ids { + let Some(result) = results.get(&handler_id) else { + continue; + }; if result.error.is_none() { if let Some(value) = &result.result { let decoded: E::Result = @@ -65,28 +70,12 @@ impl TypedEvent { } impl EventBus { - pub fn emit(&self, payload: E::Payload) -> TypedEvent { - let typed_event = TypedEvent::::new(payload); - let emitted = self.enqueue_base(typed_event.inner.clone()); - TypedEvent::from_base_event(emitted) - } - - pub fn emit_with_options( - &self, - payload: E::Payload, - queue_jump: bool, - ) -> TypedEvent { - let typed_event = TypedEvent::::new(payload); - let emitted = self.enqueue_base_with_options(typed_event.inner.clone(), queue_jump); - TypedEvent::from_base_event(emitted) - } - - pub fn emit_existing(&self, event: TypedEvent) -> TypedEvent { + pub fn emit(&self, event: TypedEvent) -> TypedEvent { let emitted = self.enqueue_base(event.inner.clone()); TypedEvent::from_base_event(emitted) } - pub fn emit_existing_with_options( + pub fn emit_with_options( &self, event: TypedEvent, queue_jump: bool, @@ -94,6 +83,7 @@ impl EventBus { let emitted = self.enqueue_base_with_options(event.inner.clone(), queue_jump); TypedEvent::from_base_event(emitted) } + pub fn on_typed( &self, handler_name: &str, diff --git a/bubus-rust/tests/event_bus_tests.rs b/bubus-rust/tests/event_bus_tests.rs index cebf653..876a0ef 100644 --- a/bubus-rust/tests/event_bus_tests.rs +++ b/bubus-rust/tests/event_bus_tests.rs @@ -30,7 +30,7 @@ impl EventSpec for WorkEvent { fn test_emit_and_handler_result() { let bus = EventBus::new(Some("BusA".to_string())); bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); - let event = bus.emit::(WorkPayload { value: 1 }); + let event = bus.emit::(TypedEvent::::new(WorkPayload { value: 1 })); block_on(event.wait_completed()); let results = event.inner.inner.lock().event_results.clone(); @@ -59,7 +59,7 @@ fn test_parallel_handler_concurrency() { inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Parallel); inner.event_concurrency = Some(EventConcurrencyMode::Parallel); } - let emitted = bus.emit_existing(event); + let emitted = bus.emit(event); block_on(emitted.wait_completed()); assert_eq!(emitted.inner.inner.lock().event_results.len(), 2); bus.stop(); diff --git a/bubus-rust/tests/test_event_handler_first.rs b/bubus-rust/tests/test_event_handler_first.rs index 474bc88..bf23e62 100644 --- a/bubus-rust/tests/test_event_handler_first.rs +++ b/bubus-rust/tests/test_event_handler_first.rs @@ -38,7 +38,7 @@ fn test_event_handler_first_serial_stops_after_first_success() { inner.event_handler_completion = Some(EventHandlerCompletionMode::First); inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Serial); } - let emitted = bus.emit_existing(event); + let emitted = bus.emit(event); block_on(emitted.wait_completed()); let results = emitted.inner.inner.lock().event_results.clone(); diff --git a/bubus-rust/tests/test_event_history_store.rs b/bubus-rust/tests/test_event_history_store.rs index 124490c..67881de 100644 --- a/bubus-rust/tests/test_event_history_store.rs +++ b/bubus-rust/tests/test_event_history_store.rs @@ -22,7 +22,7 @@ fn test_max_history_drop_true_keeps_recent_entries() { let bus = EventBus::new_with_history(Some("HistoryDropBus".to_string()), Some(2), true); for _ in 0..3 { - let event = bus.emit::(EmptyPayload {}); + let event = bus.emit::(TypedEvent::::new(EmptyPayload {})); block_on(event.wait_completed()); } @@ -36,11 +36,11 @@ fn test_max_history_drop_true_keeps_recent_entries() { fn test_max_history_drop_false_rejects_new_emit_when_full() { let bus = EventBus::new_with_history(Some("HistoryRejectBus".to_string()), Some(1), false); - let first = bus.emit::(EmptyPayload {}); + let first = bus.emit::(TypedEvent::::new(EmptyPayload {})); block_on(first.wait_completed()); let second = TypedEvent::::new(EmptyPayload {}); - let second = bus.emit_existing(second); + let second = bus.emit(second); block_on(second.wait_completed()); assert_eq!(second.inner.inner.lock().event_path.len(), 0); diff --git a/bubus-rust/tests/test_eventbus_dispatch_defaults.rs b/bubus-rust/tests/test_eventbus_dispatch_defaults.rs index f994075..a45ce01 100644 --- a/bubus-rust/tests/test_eventbus_dispatch_defaults.rs +++ b/bubus-rust/tests/test_eventbus_dispatch_defaults.rs @@ -33,7 +33,7 @@ fn test_bus_default_handler_settings_are_applied() { inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Serial); inner.event_handler_completion = Some(EventHandlerCompletionMode::All); } - let event = bus.emit_existing(event); + let event = bus.emit(event); block_on(event.wait_completed()); assert_eq!(event.inner.inner.lock().event_results.len(), 1); diff --git a/bubus-rust/tests/test_eventbus_edge_cases.rs b/bubus-rust/tests/test_eventbus_edge_cases.rs index dcc6152..f5a1c19 100644 --- a/bubus-rust/tests/test_eventbus_edge_cases.rs +++ b/bubus-rust/tests/test_eventbus_edge_cases.rs @@ -1,4 +1,8 @@ -use bubus_rust::{event_bus::EventBus, event_result::EventResultStatus, typed::EventSpec}; +use bubus_rust::{ + event_bus::EventBus, + event_result::EventResultStatus, + typed::{EventSpec, TypedEvent}, +}; use futures::executor::block_on; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -30,7 +34,7 @@ impl EventSpec for WorkEvent { #[test] fn test_emit_with_no_handlers_completes_event() { let bus = EventBus::new(Some("NoHandlers".to_string())); - let event = bus.emit::(EmptyPayload {}); + let event = bus.emit::(TypedEvent::::new(EmptyPayload {})); block_on(event.wait_completed()); @@ -47,7 +51,7 @@ fn test_emit_with_no_handlers_completes_event() { fn test_wildcard_handler_runs_for_any_event_type() { let bus = EventBus::new(Some("WildcardBus".to_string())); bus.on("*", "catch_all", |_event| async move { Ok(json!("all")) }); - let event = bus.emit::(EmptyPayload {}); + let event = bus.emit::(TypedEvent::::new(EmptyPayload {})); block_on(event.wait_completed()); @@ -66,7 +70,7 @@ fn test_handler_error_populates_error_status() { "bad", |_event| async move { Err("boom".to_string()) }, ); - let event = bus.emit::(EmptyPayload {}); + let event = bus.emit::(TypedEvent::::new(EmptyPayload {})); block_on(event.wait_completed()); diff --git a/bubus-rust/tests/test_eventbus_find.rs b/bubus-rust/tests/test_eventbus_find.rs index 4f771b6..48c199f 100644 --- a/bubus-rust/tests/test_eventbus_find.rs +++ b/bubus-rust/tests/test_eventbus_find.rs @@ -1,6 +1,9 @@ use std::{thread, time::Duration}; -use bubus_rust::{event_bus::EventBus, typed::EventSpec}; +use bubus_rust::{ + event_bus::EventBus, + typed::{EventSpec, TypedEvent}, +}; use futures::executor::block_on; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -27,7 +30,7 @@ fn test_find_past_match_returns_event() { let bus = EventBus::new(Some("FindBus".to_string())); bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); - let event = bus.emit::(EmptyPayload {}); + let event = bus.emit::(TypedEvent::::new(EmptyPayload {})); block_on(event.wait_completed()); let found = block_on(bus.find("work", true, None, None)); @@ -44,7 +47,7 @@ fn test_find_future_waits_for_new_event() { thread::spawn(move || { thread::sleep(Duration::from_millis(30)); - bus_for_emit.emit::(EmptyPayload {}); + bus_for_emit.emit::(TypedEvent::::new(EmptyPayload {})); }); let found = block_on(bus.find("future_event", false, Some(0.5), None)); diff --git a/bubus-rust/tests/test_eventbus_locking.rs b/bubus-rust/tests/test_eventbus_locking.rs index 9dff8eb..33a27b7 100644 --- a/bubus-rust/tests/test_eventbus_locking.rs +++ b/bubus-rust/tests/test_eventbus_locking.rs @@ -44,8 +44,9 @@ fn test_queue_jump() { Ok(value) }); - let event1 = bus.emit::(QPayload { idx: 1 }); - let event2 = bus.emit_with_options::(QPayload { idx: 2 }, true); + let event1 = bus.emit::(TypedEvent::::new(QPayload { idx: 1 })); + let event2 = + bus.emit_with_options::(TypedEvent::::new(QPayload { idx: 2 }), true); block_on(async { event1.wait_completed().await; @@ -83,8 +84,8 @@ fn test_bus_serial_processes_in_order() { let event2 = TypedEvent::::new(EmptyPayload {}); event1.inner.inner.lock().event_concurrency = Some(EventConcurrencyMode::BusSerial); event2.inner.inner.lock().event_concurrency = Some(EventConcurrencyMode::BusSerial); - let event1 = bus.emit_existing(event1); - let event2 = bus.emit_existing(event2); + let event1 = bus.emit(event1); + let event2 = bus.emit(event2); block_on(async { event1.wait_completed().await; diff --git a/bubus-rust/tests/test_eventbus_on_off.rs b/bubus-rust/tests/test_eventbus_on_off.rs index 7406d0c..6a719d3 100644 --- a/bubus-rust/tests/test_eventbus_on_off.rs +++ b/bubus-rust/tests/test_eventbus_on_off.rs @@ -1,4 +1,7 @@ -use bubus_rust::{event_bus::EventBus, typed::EventSpec}; +use bubus_rust::{ + event_bus::EventBus, + typed::{EventSpec, TypedEvent}, +}; use futures::executor::block_on; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -19,12 +22,12 @@ fn test_on_returns_handler_and_off_removes_handler() { let bus = EventBus::new(Some("OnOffBus".to_string())); let handler = bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); - let event_1 = bus.emit::(EmptyPayload {}); + let event_1 = bus.emit::(TypedEvent::::new(EmptyPayload {})); block_on(event_1.wait_completed()); assert_eq!(event_1.inner.inner.lock().event_results.len(), 1); bus.off("work", Some(&handler.id)); - let event_2 = bus.emit::(EmptyPayload {}); + let event_2 = bus.emit::(TypedEvent::::new(EmptyPayload {})); block_on(event_2.wait_completed()); assert_eq!(event_2.inner.inner.lock().event_results.len(), 0); diff --git a/bubus-rust/tests/test_eventbus_timeout.rs b/bubus-rust/tests/test_eventbus_timeout.rs index 062bb77..934b76f 100644 --- a/bubus-rust/tests/test_eventbus_timeout.rs +++ b/bubus-rust/tests/test_eventbus_timeout.rs @@ -55,7 +55,7 @@ fn test_event_timeout_aborts_in_flight_handler_result() { let event = TypedEvent::::new(EmptyPayload {}); event.inner.inner.lock().event_timeout = Some(0.01); - let event = bus.emit_existing(event); + let event = bus.emit(event); block_on(event.wait_completed()); let result = event @@ -91,7 +91,7 @@ fn test_parent_timeout_cancels_pending_or_started_children() { async move { let child = TypedEvent::::new(EmptyPayload {}); child.inner.inner.lock().event_timeout = Some(1.0); - bus_local.emit_existing(child); + bus_local.emit(child); thread::sleep(Duration::from_millis(80)); Ok(json!("parent")) } @@ -100,7 +100,7 @@ fn test_parent_timeout_cancels_pending_or_started_children() { let parent = TypedEvent::::new(EmptyPayload {}); parent.inner.inner.lock().event_timeout = Some(0.01); - let parent = bus.emit_existing(parent); + let parent = bus.emit(parent); wait_until_completed(&parent, 1000); thread::sleep(Duration::from_millis(120)); diff --git a/bubus-rust/tests/test_typed_events.rs b/bubus-rust/tests/test_typed_events.rs index 84bd5ac..1076e34 100644 --- a/bubus-rust/tests/test_typed_events.rs +++ b/bubus-rust/tests/test_typed_events.rs @@ -36,7 +36,7 @@ fn test_on_typed_and_emit_typed_roundtrip() { }) }); - let event = bus.emit::(AddPayload { a: 4, b: 9 }); + let event = bus.emit::(TypedEvent::::new(AddPayload { a: 4, b: 9 })); block_on(event.wait_completed()); let first = event.first_result(); @@ -48,7 +48,7 @@ fn test_on_typed_and_emit_typed_roundtrip() { fn test_find_typed_returns_typed_payload() { let bus = EventBus::new(Some("TypedFindBus".to_string())); - let event = bus.emit::(AddPayload { a: 7, b: 1 }); + let event = bus.emit::(TypedEvent::::new(AddPayload { a: 7, b: 1 })); block_on(event.wait_completed()); let found = block_on(bus.find_typed::(true, None)).expect("expected typed event");