diff --git a/bubus-rust/Cargo.toml b/bubus-rust/Cargo.toml new file mode 100644 index 0000000..6400b60 --- /dev/null +++ b/bubus-rust/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "bubus-rust" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { version = "1", features = ["derive"] } +serde_json = "1" +uuid = { version = "1", features = ["v4", "v5", "v7", "serde"] } +futures = { version = "0.3", features = ["executor", "thread-pool"] } +event-listener = "5" +parking_lot = "0.12" + +[dev-dependencies] diff --git a/bubus-rust/README.md b/bubus-rust/README.md new file mode 100644 index 0000000..c2084f0 --- /dev/null +++ b/bubus-rust/README.md @@ -0,0 +1,40 @@ +# bubus-rust + +Idiomatic Rust implementation of `bubus`, matching the Python/TypeScript event JSON surface and execution semantics as closely as possible. + +## Current scope + +Implemented core features: +- Base event model and event result model with serde JSON compatibility +- Async event bus with queueing and queue-jump behavior +- Event concurrency: `global-serial`, `bus-serial`, `parallel` +- Handler concurrency: `serial`, `parallel` +- Handler completion strategies: `all`, `first` +- Event path tracking and pending bus count + +Not yet implemented in this crate revision: +- Bridges +- Middlewares (hook points are left in code comments) + +## Quickstart + +```rust +use bubus_rust::{base_event, event_bus}; +use futures::executor::block_on; +use serde_json::{Map, json}; + +let bus = event_bus::new(Some("MainBus".to_string())); +bus.on("UserLoginEvent", "handle_login", |event| async move { + Ok(json!({"ok": true, "event_id": event.inner.lock().event_id})) +}); + +let mut payload = Map::new(); +payload.insert("username".to_string(), json!("alice")); +let event = base_event::new("UserLoginEvent", payload); +bus.emit(event.clone()); + +block_on(async { + event.wait_completed().await; + println!("{}", event.to_json_value()); +}); +``` diff --git a/bubus-rust/src/base_event.rs b/bubus-rust/src/base_event.rs new file mode 100644 index 0000000..6cd1f1d --- /dev/null +++ b/bubus-rust/src/base_event.rs @@ -0,0 +1,126 @@ +use std::{collections::HashMap, sync::Arc}; + +use event_listener::Event; +use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; + +use crate::{ + event_result::EventResult, + id::uuid_v7_string, + types::{ + EventConcurrencyMode, EventHandlerCompletionMode, EventHandlerConcurrencyMode, EventStatus, + }, +}; + +pub fn now_iso() -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + let dur = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default(); + format!("{}.{:09}Z", dur.as_secs(), dur.subsec_nanos()) +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct BaseEventData { + pub event_type: String, + pub event_version: String, + pub event_timeout: Option, + pub event_slow_timeout: Option, + pub event_concurrency: Option, + pub event_handler_timeout: Option, + pub event_handler_slow_timeout: Option, + pub event_handler_concurrency: Option, + pub event_handler_completion: Option, + pub event_result_type: Option, + pub event_id: String, + pub event_path: Vec, + pub event_parent_id: Option, + pub event_emitted_by_handler_id: Option, + pub event_pending_bus_count: usize, + pub event_created_at: String, + pub event_status: EventStatus, + pub event_started_at: Option, + pub event_completed_at: Option, + pub event_results: HashMap, + #[serde(flatten)] + pub payload: Map, +} + +pub struct BaseEvent { + pub inner: Mutex, + pub completed: Event, +} + +impl BaseEvent { + pub fn new(event_type: impl Into, payload: Map) -> Arc { + Arc::new(Self { + inner: Mutex::new(BaseEventData { + event_type: event_type.into(), + event_version: "0.0.1".to_string(), + event_timeout: None, + event_slow_timeout: None, + event_concurrency: None, + event_handler_timeout: None, + event_handler_slow_timeout: None, + event_handler_concurrency: None, + event_handler_completion: None, + event_result_type: None, + event_id: uuid_v7_string(), + event_path: vec![], + event_parent_id: None, + event_emitted_by_handler_id: None, + event_pending_bus_count: 0, + event_created_at: now_iso(), + event_status: EventStatus::Pending, + event_started_at: None, + event_completed_at: None, + event_results: HashMap::new(), + payload, + }), + completed: Event::new(), + }) + } + + pub async fn wait_completed(self: &Arc) { + loop { + let listener = self.completed.listen(); + { + let event = self.inner.lock(); + if event.event_status == EventStatus::Completed { + return; + } + } + listener.await; + } + } + + pub fn mark_started(&self) { + let mut event = self.inner.lock(); + if event.event_started_at.is_none() { + event.event_started_at = Some(now_iso()); + } + event.event_status = EventStatus::Started; + } + + pub fn mark_completed(&self) { + let mut event = self.inner.lock(); + event.event_status = EventStatus::Completed; + if event.event_completed_at.is_none() { + event.event_completed_at = Some(now_iso()); + } + self.completed.notify(usize::MAX); + } + + pub fn to_json_value(&self) -> Value { + serde_json::to_value(&*self.inner.lock()).unwrap_or(Value::Null) + } + + pub fn from_json_value(value: Value) -> Arc { + let parsed: BaseEventData = serde_json::from_value(value).expect("invalid base_event json"); + Arc::new(Self { + inner: Mutex::new(parsed), + completed: Event::new(), + }) + } +} diff --git a/bubus-rust/src/event_bus.rs b/bubus-rust/src/event_bus.rs new file mode 100644 index 0000000..1445c08 --- /dev/null +++ b/bubus-rust/src/event_bus.rs @@ -0,0 +1,779 @@ +use std::{ + collections::{HashMap, VecDeque}, + sync::{mpsc as std_mpsc, Arc, OnceLock}, + thread, + time::{Duration, Instant}, +}; + +use event_listener::Event; +use futures::executor::block_on; +use parking_lot::Mutex; +use serde::Serialize; +use serde_json::Value; + +use crate::{ + base_event::{now_iso, BaseEvent}, + event_handler::{EventHandler, EventHandlerCallable}, + event_result::{EventResult, EventResultStatus}, + id::uuid_v7_string, + lock_manager::ReentrantLock, + types::{ + EventConcurrencyMode, EventHandlerCompletionMode, EventHandlerConcurrencyMode, EventStatus, + }, +}; + +static GLOBAL_SERIAL_LOCK: OnceLock> = OnceLock::new(); +thread_local! { + static CURRENT_EVENT_ID: std::cell::RefCell> = const { std::cell::RefCell::new(None) }; + static CURRENT_HANDLER_ID: std::cell::RefCell> = const { std::cell::RefCell::new(None) }; +} + +struct FindWaiter { + id: u64, + pattern: String, + child_of_event_id: Option, + sender: std_mpsc::Sender>, +} + +struct BusRuntime { + queue: Mutex>>, + queue_notify: Event, + stop: Mutex, + events: Mutex>>, + history_order: Mutex>, + max_history_size: Option, + max_history_drop: bool, + find_waiters: Mutex>, + next_waiter_id: Mutex, +} + +#[derive(Clone, Serialize)] +pub struct EventBus { + pub name: String, + pub id: String, + pub event_concurrency: EventConcurrencyMode, + pub event_timeout: Option, + pub event_slow_timeout: Option, + pub event_handler_concurrency: EventHandlerConcurrencyMode, + pub event_handler_completion: EventHandlerCompletionMode, + pub event_handler_slow_timeout: Option, + #[serde(skip)] + handlers: Arc>>>, + #[serde(skip)] + runtime: Arc, + #[serde(skip)] + bus_serial_lock: Arc, +} + +impl EventBus { + pub fn new(name: Option) -> Arc { + Self::new_with_history(name, Some(100), false) + } + + pub fn new_with_history( + name: Option, + max_history_size: Option, + max_history_drop: bool, + ) -> Arc { + let bus = Arc::new(Self { + name: name.unwrap_or_else(|| "EventBus".to_string()), + id: uuid_v7_string(), + event_concurrency: EventConcurrencyMode::BusSerial, + event_timeout: Some(60.0), + event_slow_timeout: Some(300.0), + event_handler_concurrency: EventHandlerConcurrencyMode::Serial, + event_handler_completion: EventHandlerCompletionMode::All, + event_handler_slow_timeout: Some(30.0), + handlers: Arc::new(Mutex::new(HashMap::new())), + runtime: Arc::new(BusRuntime { + queue: Mutex::new(VecDeque::new()), + queue_notify: Event::new(), + stop: Mutex::new(false), + events: Mutex::new(HashMap::new()), + history_order: Mutex::new(VecDeque::new()), + max_history_size, + max_history_drop, + find_waiters: Mutex::new(Vec::new()), + next_waiter_id: Mutex::new(0), + }), + bus_serial_lock: Arc::new(ReentrantLock::default()), + }); + Self::start_loop(bus.clone()); + bus + } + + fn start_loop(bus: Arc) { + thread::spawn(move || { + block_on(async move { + loop { + if !bus.runtime.queue.lock().is_empty() { + thread::sleep(Duration::from_millis(1)); + } + let next_event = bus.runtime.queue.lock().pop_front(); + if let Some(event) = next_event { + let bus_for_task = bus.clone(); + let mode = event + .inner + .lock() + .event_concurrency + .unwrap_or(bus.event_concurrency); + match mode { + EventConcurrencyMode::Parallel => { + thread::spawn(move || { + block_on(bus_for_task.process_event(event)); + }); + } + EventConcurrencyMode::GlobalSerial + | EventConcurrencyMode::BusSerial => { + bus.process_event(event).await; + } + } + continue; + } + + if *bus.runtime.stop.lock() { + break; + } + bus.runtime.queue_notify.listen().await; + } + }); + }); + } + + pub fn stop(&self) { + *self.runtime.stop.lock() = true; + self.runtime.queue_notify.notify(usize::MAX); + } + + pub fn runtime_payload_for_test(&self) -> HashMap> { + self.runtime.events.lock().clone() + } + + pub fn event_history_ids(&self) -> Vec { + self.runtime.history_order.lock().iter().cloned().collect() + } + + pub fn on(&self, pattern: &str, handler_name: &str, handler_fn: F) -> EventHandler + where + F: Fn(Arc) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, + { + let callable: EventHandlerCallable = Arc::new(move |event| Box::pin(handler_fn(event))); + let entry = EventHandler::from_callable( + pattern.to_string(), + handler_name.to_string(), + self.name.clone(), + self.id.clone(), + callable, + ); + self.handlers + .lock() + .entry(pattern.to_string()) + .or_default() + .push(entry.clone()); + entry + } + + pub fn off(&self, pattern: &str, handler_id: Option<&str>) { + let mut handlers = self.handlers.lock(); + if let Some(list) = handlers.get_mut(pattern) { + if let Some(handler_id) = handler_id { + list.retain(|handler| handler.id != handler_id); + } else { + list.clear(); + } + } + } + + pub(crate) fn enqueue_base(&self, event: Arc) -> Arc { + self.enqueue_base_with_options(event, false) + } + + pub(crate) fn enqueue_base_with_options( + &self, + event: Arc, + queue_jump: bool, + ) -> Arc { + if !self.register_in_history(event.clone()) { + event.mark_completed(); + return event; + } + + { + let mut inner = event.inner.lock(); + inner.event_pending_bus_count += 1; + inner + .event_path + .push(format!("{}#{}", self.name, &self.id[0..4])); + if inner.event_parent_id.is_none() { + CURRENT_EVENT_ID.with(|id| { + let current_parent = id.borrow().clone(); + if current_parent.as_deref() != Some(inner.event_id.as_str()) { + inner.event_parent_id = current_parent; + } + }); + } + if inner.event_emitted_by_handler_id.is_none() { + CURRENT_HANDLER_ID.with(|id| { + inner.event_emitted_by_handler_id = id.borrow().clone(); + }); + } + } + + let emitted_child_id = event.inner.lock().event_id.clone(); + CURRENT_EVENT_ID.with(|current_event_id| { + CURRENT_HANDLER_ID.with(|current_handler_id| { + let Some(parent_id) = current_event_id.borrow().clone() else { + return; + }; + let Some(handler_id) = current_handler_id.borrow().clone() else { + return; + }; + let Some(parent_event) = self.runtime.events.lock().get(&parent_id).cloned() else { + return; + }; + let mut parent_inner = parent_event.inner.lock(); + if let Some(result) = parent_inner.event_results.get_mut(&handler_id) { + if !result.event_children.contains(&emitted_child_id) { + result.event_children.push(emitted_child_id.clone()); + } + } + }); + }); + + { + let mut queue = self.runtime.queue.lock(); + if queue_jump { + queue.push_front(event.clone()); + } else { + queue.push_back(event.clone()); + } + } + + self.notify_find_waiters(event.clone()); + self.runtime.queue_notify.notify(1); + event + } + + fn register_in_history(&self, event: Arc) -> bool { + let event_id = event.inner.lock().event_id.clone(); + + if let Some(max_size) = self.runtime.max_history_size { + if max_size > 0 { + let current_size = self.runtime.history_order.lock().len(); + if current_size >= max_size && !self.runtime.max_history_drop { + return false; + } + self.trim_history_to_capacity(max_size, true); + } + } + + self.runtime.events.lock().insert(event_id.clone(), event); + self.runtime.history_order.lock().push_back(event_id); + true + } + + fn trim_history_to_capacity(&self, max_size: usize, include_equal: bool) { + if max_size == 0 { + return; + } + loop { + let current_len = self.runtime.history_order.lock().len(); + if current_len < max_size || (!include_equal && current_len == max_size) { + break; + } + let Some(oldest) = self.runtime.history_order.lock().front().cloned() else { + break; + }; + let is_active = self + .runtime + .events + .lock() + .get(&oldest) + .map(|event| { + let status = event.inner.lock().event_status; + status == EventStatus::Pending || status == EventStatus::Started + }) + .unwrap_or(false); + if is_active { + break; + } + self.runtime.history_order.lock().pop_front(); + self.runtime.events.lock().remove(&oldest); + } + } + + pub async fn find( + &self, + pattern: &str, + past: bool, + future: Option, + child_of: Option>, + ) -> Option> { + let child_of_event_id = child_of + .as_ref() + .map(|event| event.inner.lock().event_id.clone()); + + let mut waiter_id: Option = None; + let mut waiter_rx: Option>> = None; + + if future.is_some() { + let (tx, rx) = std_mpsc::channel(); + let id = { + let mut next = self.runtime.next_waiter_id.lock(); + *next += 1; + *next + }; + self.runtime.find_waiters.lock().push(FindWaiter { + id, + pattern: pattern.to_string(), + child_of_event_id: child_of_event_id.clone(), + sender: tx, + }); + waiter_id = Some(id); + waiter_rx = Some(rx); + + if past { + if let Some(matched) = self.find_in_history(pattern, child_of_event_id.as_deref()) { + self.runtime + .find_waiters + .lock() + .retain(|waiter| waiter.id != id); + return Some(matched); + } + } + } else if past { + return self.find_in_history(pattern, child_of_event_id.as_deref()); + } + + let timeout = future?; + let result = + waiter_rx.and_then(|rx| rx.recv_timeout(Duration::from_secs_f64(timeout)).ok()); + + if let Some(id) = waiter_id { + self.runtime + .find_waiters + .lock() + .retain(|waiter| waiter.id != id); + } + + result + } + + fn find_in_history( + &self, + pattern: &str, + child_of_event_id: Option<&str>, + ) -> Option> { + let history = self.runtime.history_order.lock().clone(); + for event_id in history.iter().rev() { + let Some(event) = self.runtime.events.lock().get(event_id).cloned() else { + continue; + }; + if !self.matches_pattern(&event, pattern) { + continue; + } + if let Some(parent_id) = child_of_event_id { + if !self.event_is_child_of_ids(&event.inner.lock().event_id, parent_id) { + continue; + } + } + return Some(event); + } + None + } + + fn matches_pattern(&self, event: &Arc, pattern: &str) -> bool { + if pattern == "*" { + return true; + } + event.inner.lock().event_type == pattern + } + + fn notify_find_waiters(&self, event: Arc) { + let event_id = event.inner.lock().event_id.clone(); + let mut matched_waiter_ids = Vec::new(); + let mut matched_senders = Vec::new(); + + { + let waiters = self.runtime.find_waiters.lock(); + for waiter in waiters.iter() { + if !self.matches_pattern(&event, &waiter.pattern) { + continue; + } + if let Some(parent_id) = waiter.child_of_event_id.as_deref() { + if !self.event_is_child_of_ids(&event_id, parent_id) { + continue; + } + } + matched_waiter_ids.push(waiter.id); + matched_senders.push(waiter.sender.clone()); + } + } + + if !matched_waiter_ids.is_empty() { + self.runtime + .find_waiters + .lock() + .retain(|waiter| !matched_waiter_ids.contains(&waiter.id)); + for sender in matched_senders { + let _ = sender.send(event.clone()); + } + } + } + + pub fn event_is_child_of( + &self, + child_event: &Arc, + parent_event: &Arc, + ) -> bool { + let child_id = child_event.inner.lock().event_id.clone(); + let parent_id = parent_event.inner.lock().event_id.clone(); + self.event_is_child_of_ids(&child_id, &parent_id) + } + + pub fn event_is_parent_of( + &self, + parent_event: &Arc, + child_event: &Arc, + ) -> bool { + self.event_is_child_of(child_event, parent_event) + } + + fn event_is_child_of_ids(&self, child_event_id: &str, parent_event_id: &str) -> bool { + if child_event_id == parent_event_id { + return false; + } + + let mut current_id = child_event_id.to_string(); + loop { + let Some(current_event) = self.runtime.events.lock().get(¤t_id).cloned() else { + return false; + }; + let current_parent = current_event.inner.lock().event_parent_id.clone(); + let Some(current_parent_id) = current_parent else { + return false; + }; + if current_parent_id == parent_event_id { + return true; + } + current_id = current_parent_id; + } + } + + pub async fn wait_until_idle(&self, timeout: Option) -> bool { + let start = Instant::now(); + loop { + let queue_empty = self.runtime.queue.lock().is_empty(); + let all_completed = self.runtime.events.lock().values().all(|event| { + let status = event.inner.lock().event_status; + status == EventStatus::Completed + }); + if queue_empty && all_completed { + return true; + } + + if let Some(timeout) = timeout { + if start.elapsed() > Duration::from_secs_f64(timeout) { + return false; + } + } + thread::sleep(Duration::from_millis(5)); + } + } + + async fn process_event(&self, event: Arc) { + let mode = event + .inner + .lock() + .event_concurrency + .unwrap_or(self.event_concurrency); + match mode { + EventConcurrencyMode::GlobalSerial => { + let _guard = GLOBAL_SERIAL_LOCK + .get_or_init(|| Arc::new(ReentrantLock::default())) + .lock(); + self.process_event_inner(event).await; + } + EventConcurrencyMode::BusSerial => { + let _guard = self.bus_serial_lock.lock(); + self.process_event_inner(event).await; + } + EventConcurrencyMode::Parallel => { + self.process_event_inner(event).await; + } + } + } + + async fn process_event_inner(&self, event: Arc) { + event.mark_started(); + let started_at = Instant::now(); + + let event_type = event.inner.lock().event_type.clone(); + let mut handlers = self + .handlers + .lock() + .get(&event_type) + .cloned() + .unwrap_or_default(); + handlers.extend(self.handlers.lock().get("*").cloned().unwrap_or_default()); + + let handler_concurrency = event + .inner + .lock() + .event_handler_concurrency + .unwrap_or(self.event_handler_concurrency); + let handler_completion = event + .inner + .lock() + .event_handler_completion + .unwrap_or(self.event_handler_completion); + + let event_timeout = event.inner.lock().event_timeout.or(self.event_timeout); + + match handler_concurrency { + EventHandlerConcurrencyMode::Serial => { + for handler in handlers { + let timed_out = self + .run_handler_with_context(event.clone(), handler, started_at, event_timeout) + .await; + if timed_out { + break; + } + if handler_completion == EventHandlerCompletionMode::First + && self.has_winner(&event) + { + break; + } + } + } + EventHandlerConcurrencyMode::Parallel => { + let mut join_handles = Vec::new(); + for handler in handlers { + let bus = self.clone(); + let event_clone = event.clone(); + join_handles.push(thread::spawn(move || { + block_on(bus.run_handler_with_context( + event_clone, + handler, + started_at, + event_timeout, + )) + })); + } + for handle in join_handles { + let _ = handle.join(); + } + } + } + + if let Some(timeout) = event_timeout { + if started_at.elapsed() > Duration::from_secs_f64(timeout) { + self.cancel_children(&event, &format!("parent event timed out after {timeout}s")); + } + } + + if let Some(slow) = event + .inner + .lock() + .event_slow_timeout + .or(self.event_slow_timeout) + { + if started_at.elapsed() > Duration::from_secs_f64(slow) { + eprintln!( + "slow event warning: {} took {:?}", + event.inner.lock().event_type, + started_at.elapsed() + ); + } + } + + let should_complete = { + let mut inner = event.inner.lock(); + inner.event_pending_bus_count = inner.event_pending_bus_count.saturating_sub(1); + let done = inner.event_pending_bus_count == 0; + if done && inner.event_status != EventStatus::Completed { + inner.event_completed_at = Some(now_iso()); + } + done + }; + if should_complete { + event.mark_completed(); + } + + if self.runtime.max_history_size == Some(0) { + let event_id = event.inner.lock().event_id.clone(); + self.runtime.events.lock().remove(&event_id); + self.runtime + .history_order + .lock() + .retain(|id| id != &event_id); + } else if let Some(max_size) = self.runtime.max_history_size { + self.trim_history_to_capacity(max_size, false); + } + } + + fn cancel_children(&self, event: &Arc, reason: &str) { + let results = event.inner.lock().event_results.clone(); + for result in results.values() { + for child_id in &result.event_children { + if let Some(child) = self.runtime.events.lock().get(child_id).cloned() { + for child_result in child.inner.lock().event_results.values_mut() { + if child_result.status == EventResultStatus::Pending + || child_result.status == EventResultStatus::Started + { + child_result.status = EventResultStatus::Error; + child_result.error = Some(format!("cancelled: {reason}")); + child_result.completed_at = Some(now_iso()); + } + } + child.mark_completed(); + } + } + } + } + + fn has_winner(&self, event: &Arc) -> bool { + event.inner.lock().event_results.values().any(|result| { + result.status == EventResultStatus::Completed + && result.error.is_none() + && result.result.is_some() + }) + } + + async fn run_handler_with_context( + &self, + event: Arc, + handler: EventHandler, + event_started_at: Instant, + event_timeout: Option, + ) -> bool { + let handler_id = handler.id.clone(); + CURRENT_EVENT_ID.with(|id| *id.borrow_mut() = Some(event.inner.lock().event_id.clone())); + CURRENT_HANDLER_ID.with(|id| *id.borrow_mut() = Some(handler_id.clone())); + let timed_out = self + .run_handler(event, handler, event_started_at, event_timeout) + .await; + CURRENT_HANDLER_ID.with(|id| *id.borrow_mut() = None); + CURRENT_EVENT_ID.with(|id| *id.borrow_mut() = None); + timed_out + } + + async fn run_handler( + &self, + event: Arc, + handler: EventHandler, + event_started_at: Instant, + event_timeout: Option, + ) -> bool { + let handler_timeout = handler + .handler_timeout + .or(event.inner.lock().event_handler_timeout) + .or(event_timeout); + + let remaining_event_timeout = event_timeout.map(|timeout| { + let elapsed = event_started_at.elapsed().as_secs_f64(); + (timeout - elapsed).max(0.0) + }); + + let timeout = match (handler_timeout, remaining_event_timeout) { + (Some(a), Some(b)) => Some(a.min(b)), + (Some(a), None) => Some(a), + (None, Some(b)) => Some(b), + (None, None) => None, + }; + + let mut result = EventResult::new( + event.inner.lock().event_id.clone(), + handler.clone(), + timeout, + ); + result.status = EventResultStatus::Started; + result.started_at = Some(now_iso()); + event + .inner + .lock() + .event_results + .insert(handler.id.clone(), result.clone()); + + let call = handler + .callable + .as_ref() + .expect("handler callable missing") + .clone(); + let (tx, rx) = std_mpsc::channel(); + let event_clone = event.clone(); + let context_event_id = event.inner.lock().event_id.clone(); + let context_handler_id = handler.id.clone(); + thread::spawn(move || { + CURRENT_EVENT_ID.with(|id| *id.borrow_mut() = Some(context_event_id)); + CURRENT_HANDLER_ID.with(|id| *id.borrow_mut() = Some(context_handler_id)); + let response = block_on(call(event_clone)); + CURRENT_HANDLER_ID.with(|id| *id.borrow_mut() = None); + CURRENT_EVENT_ID.with(|id| *id.borrow_mut() = None); + let _ = tx.send(response); + }); + + let call_started = Instant::now(); + let call_result = if let Some(timeout_secs) = timeout { + rx.recv_timeout(Duration::from_secs_f64(timeout_secs)) + .map_err(|_| "timeout".to_string()) + } else { + rx.recv().map_err(|_| "handler channel closed".to_string()) + }; + + let mut current = event + .inner + .lock() + .event_results + .get(&handler.id) + .cloned() + .expect("missing result row"); + + match call_result { + Ok(Ok(value)) => { + current.status = EventResultStatus::Completed; + current.result = Some(value); + } + Ok(Err(error)) => { + current.status = EventResultStatus::Error; + current.error = Some(error); + } + Err(error) => { + current.status = EventResultStatus::Error; + current.error = Some(format!("EventHandlerAbortedError: {error}")); + current.completed_at = Some(now_iso()); + if let Some(latest) = event.inner.lock().event_results.get(&handler.id).cloned() { + current.event_children = latest.event_children; + } + event + .inner + .lock() + .event_results + .insert(handler.id.clone(), current); + return true; + } + } + + if let Some(slow_timeout) = handler + .handler_slow_timeout + .or(event.inner.lock().event_handler_slow_timeout) + .or(self.event_handler_slow_timeout) + { + if call_started.elapsed() > Duration::from_secs_f64(slow_timeout) { + eprintln!( + "slow handler warning: {} took {:?}", + handler.handler_name, + call_started.elapsed() + ); + } + } + + current.completed_at = Some(now_iso()); + if let Some(latest) = event.inner.lock().event_results.get(&handler.id).cloned() { + current.event_children = latest.event_children; + } + event.inner.lock().event_results.insert(handler.id, current); + false + } +} diff --git a/bubus-rust/src/event_handler.rs b/bubus-rust/src/event_handler.rs new file mode 100644 index 0000000..49d4726 --- /dev/null +++ b/bubus-rust/src/event_handler.rs @@ -0,0 +1,56 @@ +use std::{future::Future, pin::Pin, sync::Arc}; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::{base_event::BaseEvent, id::compute_handler_id}; + +pub type HandlerFuture = Pin> + Send + 'static>>; +pub type EventHandlerCallable = + Arc) -> HandlerFuture + Send + Sync + 'static>; + +#[derive(Clone, Serialize, Deserialize)] +pub struct EventHandler { + pub id: String, + pub event_pattern: String, + pub handler_name: String, + pub handler_file_path: Option, + pub handler_timeout: Option, + pub handler_slow_timeout: Option, + pub handler_registered_at: String, + pub eventbus_name: String, + pub eventbus_id: String, + #[serde(skip)] + pub callable: Option, +} + +impl EventHandler { + pub fn from_callable( + event_pattern: String, + handler_name: String, + eventbus_name: String, + eventbus_id: String, + callable: EventHandlerCallable, + ) -> Self { + let handler_registered_at = crate::base_event::now_iso(); + let id = compute_handler_id( + &eventbus_id, + &handler_name, + None, + &handler_registered_at, + &event_pattern, + ); + Self { + id, + event_pattern, + handler_name, + handler_file_path: None, + handler_timeout: None, + handler_slow_timeout: None, + handler_registered_at, + eventbus_name, + eventbus_id, + callable: Some(callable), + } + } +} diff --git a/bubus-rust/src/event_result.rs b/bubus-rust/src/event_result.rs new file mode 100644 index 0000000..3f34aa2 --- /dev/null +++ b/bubus-rust/src/event_result.rs @@ -0,0 +1,44 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::{event_handler::EventHandler, id::uuid_v7_string}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum EventResultStatus { + Pending, + Started, + Completed, + Error, +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct EventResult { + pub id: String, + pub status: EventResultStatus, + pub event_id: String, + pub handler: EventHandler, + pub timeout: Option, + pub started_at: Option, + pub result: Option, + pub error: Option, + pub completed_at: Option, + pub event_children: Vec, +} + +impl EventResult { + pub fn new(event_id: String, handler: EventHandler, timeout: Option) -> Self { + Self { + id: uuid_v7_string(), + status: EventResultStatus::Pending, + event_id, + handler, + timeout, + started_at: None, + result: None, + error: None, + completed_at: None, + event_children: vec![], + } + } +} diff --git a/bubus-rust/src/id.rs b/bubus-rust/src/id.rs new file mode 100644 index 0000000..e2f61ae --- /dev/null +++ b/bubus-rust/src/id.rs @@ -0,0 +1,24 @@ +use uuid::Uuid; + +pub fn uuid_v7_string() -> String { + Uuid::now_v7().to_string() +} + +pub fn handler_id_namespace() -> Uuid { + Uuid::new_v5(&Uuid::NAMESPACE_DNS, b"bubus-handler") +} + +pub fn compute_handler_id( + eventbus_id: &str, + handler_name: &str, + handler_file_path: Option<&str>, + handler_registered_at: &str, + event_pattern: &str, +) -> String { + let file_path = handler_file_path.unwrap_or("unknown"); + let seed = format!( + "{}|{}|{}|{}|{}", + eventbus_id, handler_name, file_path, handler_registered_at, event_pattern + ); + Uuid::new_v5(&handler_id_namespace(), seed.as_bytes()).to_string() +} diff --git a/bubus-rust/src/lib.rs b/bubus-rust/src/lib.rs new file mode 100644 index 0000000..b55f348 --- /dev/null +++ b/bubus-rust/src/lib.rs @@ -0,0 +1,10 @@ +pub mod base_event; +pub mod event_bus; +pub mod event_handler; +pub mod event_result; +pub mod id; +pub mod lock_manager; +pub mod typed; +pub mod types; + +pub use types::*; diff --git a/bubus-rust/src/lock_manager.rs b/bubus-rust/src/lock_manager.rs new file mode 100644 index 0000000..8fdc730 --- /dev/null +++ b/bubus-rust/src/lock_manager.rs @@ -0,0 +1,29 @@ +use std::{collections::HashMap, sync::Arc}; + +use parking_lot::Mutex; + +#[derive(Default, Clone)] +pub struct ReentrantLock { + lock: Arc>, +} + +impl ReentrantLock { + pub fn lock(&self) -> parking_lot::MutexGuard<'_, ()> { + self.lock.lock() + } +} + +#[derive(Default)] +pub struct LockManager { + locks: Mutex>>, +} + +impl LockManager { + pub fn get_lock(&self, key: &str) -> Arc { + let mut locks = self.locks.lock(); + locks + .entry(key.to_string()) + .or_insert_with(|| Arc::new(ReentrantLock::default())) + .clone() + } +} diff --git a/bubus-rust/src/typed.rs b/bubus-rust/src/typed.rs new file mode 100644 index 0000000..a198365 --- /dev/null +++ b/bubus-rust/src/typed.rs @@ -0,0 +1,122 @@ +use std::{collections::HashMap, marker::PhantomData, sync::Arc}; + +use serde::{de::DeserializeOwned, Serialize}; +use serde_json::{Map, Value}; + +use crate::{base_event::BaseEvent, event_bus::EventBus}; + +pub trait EventSpec: Send + Sync + 'static { + type Payload: Serialize + DeserializeOwned + Clone + Send + Sync + 'static; + type Result: Serialize + DeserializeOwned + Clone + Send + Sync + 'static; + + const EVENT_TYPE: &'static str; +} + +#[derive(Clone)] +pub struct TypedEvent { + pub inner: Arc, + marker: PhantomData, +} + +impl TypedEvent { + pub fn new(payload: E::Payload) -> Self { + let value = serde_json::to_value(payload).expect("typed payload serialization failed"); + let Value::Object(payload_map) = value else { + panic!("typed payload must serialize to a JSON object"); + }; + + Self { + inner: BaseEvent::new(E::EVENT_TYPE, payload_map), + marker: PhantomData, + } + } + + pub fn from_base_event(event: Arc) -> Self { + Self { + inner: event, + marker: PhantomData, + } + } + + pub fn payload(&self) -> E::Payload { + let payload = self.inner.inner.lock().payload.clone(); + let value = Value::Object(payload); + serde_json::from_value(value).expect("typed payload decode failed") + } + + pub async fn wait_completed(&self) { + self.inner.wait_completed().await; + } + + pub fn first_result(&self) -> Option { + let results: HashMap = + self.inner.inner.lock().event_results.clone(); + let mut ordered_handler_ids: Vec = results.keys().cloned().collect(); + ordered_handler_ids.sort(); + for handler_id in ordered_handler_ids { + let Some(result) = results.get(&handler_id) else { + continue; + }; + if result.error.is_none() { + if let Some(value) = &result.result { + let decoded: E::Result = + serde_json::from_value(value.clone()).expect("typed result decode failed"); + return Some(decoded); + } + } + } + None + } +} + +impl EventBus { + pub fn emit(&self, event: TypedEvent) -> TypedEvent { + let emitted = self.enqueue_base(event.inner.clone()); + TypedEvent::from_base_event(emitted) + } + + pub fn emit_with_options( + &self, + event: TypedEvent, + queue_jump: bool, + ) -> TypedEvent { + let emitted = self.enqueue_base_with_options(event.inner.clone(), queue_jump); + TypedEvent::from_base_event(emitted) + } + + pub fn on_typed( + &self, + handler_name: &str, + handler_fn: F, + ) -> crate::event_handler::EventHandler + where + E: EventSpec, + F: Fn(TypedEvent) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, + { + self.on(E::EVENT_TYPE, handler_name, move |event| { + let typed = TypedEvent::::from_base_event(event); + let fut = handler_fn(typed); + async move { + let result = fut.await?; + serde_json::to_value(result).map_err(|error| error.to_string()) + } + }) + } + + pub async fn find_typed( + &self, + past: bool, + future: Option, + ) -> Option> { + let found = self.find(E::EVENT_TYPE, past, future, None).await?; + Some(TypedEvent::from_base_event(found)) + } +} + +pub fn payload_map_from_value(value: Value) -> Map { + match value { + Value::Object(map) => map, + _ => panic!("typed payload must be a JSON object"), + } +} diff --git a/bubus-rust/src/types.rs b/bubus-rust/src/types.rs new file mode 100644 index 0000000..0d0c556 --- /dev/null +++ b/bubus-rust/src/types.rs @@ -0,0 +1,31 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum EventConcurrencyMode { + GlobalSerial, + BusSerial, + Parallel, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum EventHandlerConcurrencyMode { + Serial, + Parallel, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum EventHandlerCompletionMode { + All, + First, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum EventStatus { + Pending, + Started, + Completed, +} diff --git a/bubus-rust/tests/event_bus_tests.rs b/bubus-rust/tests/event_bus_tests.rs new file mode 100644 index 0000000..876a0ef --- /dev/null +++ b/bubus-rust/tests/event_bus_tests.rs @@ -0,0 +1,66 @@ +use std::{thread, time::Duration}; + +use bubus_rust::{ + event_bus::EventBus, + typed::{EventSpec, TypedEvent}, + types::{EventConcurrencyMode, EventHandlerConcurrencyMode}, +}; +use futures::executor::block_on; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Clone, Serialize, Deserialize)] +struct WorkPayload { + value: i64, +} + +#[derive(Clone, Serialize, Deserialize)] +struct WorkResult { + value: i64, +} + +struct WorkEvent; +impl EventSpec for WorkEvent { + type Payload = WorkPayload; + type Result = WorkResult; + const EVENT_TYPE: &'static str = "work"; +} + +#[test] +fn test_emit_and_handler_result() { + let bus = EventBus::new(Some("BusA".to_string())); + bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); + let event = bus.emit::(TypedEvent::::new(WorkPayload { value: 1 })); + block_on(event.wait_completed()); + + let results = event.inner.inner.lock().event_results.clone(); + assert_eq!(results.len(), 1); + let first = results.values().next().expect("missing first result"); + assert_eq!(first.result, Some(json!("ok"))); + bus.stop(); +} + +#[test] +fn test_parallel_handler_concurrency() { + let bus = EventBus::new(Some("BusPar".to_string())); + + bus.on("work", "h1", |_event| async move { + thread::sleep(Duration::from_millis(20)); + Ok(json!(1)) + }); + bus.on("work", "h2", |_event| async move { + thread::sleep(Duration::from_millis(20)); + Ok(json!(2)) + }); + + let event = TypedEvent::::new(WorkPayload { value: 1 }); + { + let mut inner = event.inner.inner.lock(); + inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Parallel); + inner.event_concurrency = Some(EventConcurrencyMode::Parallel); + } + let emitted = bus.emit(event); + block_on(emitted.wait_completed()); + assert_eq!(emitted.inner.inner.lock().event_results.len(), 2); + bus.stop(); +} diff --git a/bubus-rust/tests/test_base_event.rs b/bubus-rust/tests/test_base_event.rs new file mode 100644 index 0000000..06737ff --- /dev/null +++ b/bubus-rust/tests/test_base_event.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; + +use bubus_rust::{base_event::BaseEvent, types::EventStatus}; +use futures::executor::block_on; +use serde_json::{json, Map}; + +fn mk_event(event_type: &str) -> Arc { + let mut payload = Map::new(); + payload.insert("value".to_string(), json!(1)); + BaseEvent::new(event_type.to_string(), payload) +} + +#[test] +fn test_base_event_json_roundtrip() { + let event = mk_event("test_event"); + let json_value = event.to_json_value(); + let deserialized = BaseEvent::from_json_value(json_value.clone()); + assert_eq!(json_value, deserialized.to_json_value()); +} + +#[test] +fn test_base_event_runtime_state_transitions() { + let event = mk_event("runtime_event"); + assert_eq!(event.inner.lock().event_status, EventStatus::Pending); + event.mark_started(); + assert_eq!(event.inner.lock().event_status, EventStatus::Started); + event.mark_completed(); + assert_eq!(event.inner.lock().event_status, EventStatus::Completed); + block_on(event.wait_completed()); +} diff --git a/bubus-rust/tests/test_event_handler_first.rs b/bubus-rust/tests/test_event_handler_first.rs new file mode 100644 index 0000000..bf23e62 --- /dev/null +++ b/bubus-rust/tests/test_event_handler_first.rs @@ -0,0 +1,51 @@ +use std::{thread, time::Duration}; + +use bubus_rust::{ + event_bus::EventBus, + typed::{EventSpec, TypedEvent}, + types::{EventHandlerCompletionMode, EventHandlerConcurrencyMode}, +}; +use futures::executor::block_on; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Clone, Serialize, Deserialize)] +struct EmptyPayload {} +#[derive(Clone, Serialize, Deserialize)] +struct WorkResult { + value: String, +} +struct WorkEvent; +impl EventSpec for WorkEvent { + type Payload = EmptyPayload; + type Result = WorkResult; + const EVENT_TYPE: &'static str = "work"; +} + +#[test] +fn test_event_handler_first_serial_stops_after_first_success() { + let bus = EventBus::new(Some("BusFirstSerial".to_string())); + + bus.on("work", "first", |_event| async move { Ok(json!("winner")) }); + bus.on("work", "second", |_event| async move { + thread::sleep(Duration::from_millis(20)); + Ok(json!("late")) + }); + + let event = TypedEvent::::new(EmptyPayload {}); + { + let mut inner = event.inner.inner.lock(); + inner.event_handler_completion = Some(EventHandlerCompletionMode::First); + inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Serial); + } + let emitted = bus.emit(event); + block_on(emitted.wait_completed()); + + let results = emitted.inner.inner.lock().event_results.clone(); + assert_eq!(results.len(), 1); + assert_eq!( + results.values().next().and_then(|r| r.result.clone()), + Some(json!("winner")) + ); + bus.stop(); +} diff --git a/bubus-rust/tests/test_event_handler_ids.rs b/bubus-rust/tests/test_event_handler_ids.rs new file mode 100644 index 0000000..de4f619 --- /dev/null +++ b/bubus-rust/tests/test_event_handler_ids.rs @@ -0,0 +1,32 @@ +use bubus_rust::id::compute_handler_id; +use uuid::Uuid; + +#[test] +fn test_compute_handler_id_matches_uuidv5_seed_algorithm() { + let eventbus_id = "0195f6ac-9f10-7e4b-bf69-fb33c68ca13e"; + let handler_name = "tests.handlers.handle_work"; + let handler_file_path = Some("~/repo/tests/handlers.py:10"); + let handler_registered_at = "2025-01-01T00:00:00.000000Z"; + let event_pattern = "work"; + + let computed = compute_handler_id( + eventbus_id, + handler_name, + handler_file_path, + handler_registered_at, + event_pattern, + ); + + let namespace = Uuid::new_v5(&Uuid::NAMESPACE_DNS, b"bubus-handler"); + let seed = format!( + "{}|{}|{}|{}|{}", + eventbus_id, + handler_name, + handler_file_path.unwrap(), + handler_registered_at, + event_pattern + ); + let expected = Uuid::new_v5(&namespace, seed.as_bytes()).to_string(); + + assert_eq!(computed, expected); +} diff --git a/bubus-rust/tests/test_event_history_store.rs b/bubus-rust/tests/test_event_history_store.rs new file mode 100644 index 0000000..67881de --- /dev/null +++ b/bubus-rust/tests/test_event_history_store.rs @@ -0,0 +1,48 @@ +use bubus_rust::{ + event_bus::EventBus, + typed::{EventSpec, TypedEvent}, +}; +use futures::executor::block_on; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Serialize, Deserialize)] +struct EmptyPayload {} +#[derive(Clone, Serialize, Deserialize)] +struct EmptyResult {} + +struct HistoryEvent; +impl EventSpec for HistoryEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "history_event"; +} + +#[test] +fn test_max_history_drop_true_keeps_recent_entries() { + let bus = EventBus::new_with_history(Some("HistoryDropBus".to_string()), Some(2), true); + + for _ in 0..3 { + let event = bus.emit::(TypedEvent::::new(EmptyPayload {})); + block_on(event.wait_completed()); + } + + let history = bus.event_history_ids(); + assert_eq!(history.len(), 2); + assert!(history.iter().any(|id| id.contains('-'))); + bus.stop(); +} + +#[test] +fn test_max_history_drop_false_rejects_new_emit_when_full() { + let bus = EventBus::new_with_history(Some("HistoryRejectBus".to_string()), Some(1), false); + + let first = bus.emit::(TypedEvent::::new(EmptyPayload {})); + block_on(first.wait_completed()); + + let second = TypedEvent::::new(EmptyPayload {}); + let second = bus.emit(second); + block_on(second.wait_completed()); + + assert_eq!(second.inner.inner.lock().event_path.len(), 0); + bus.stop(); +} diff --git a/bubus-rust/tests/test_event_result.rs b/bubus-rust/tests/test_event_result.rs new file mode 100644 index 0000000..6fbc58c --- /dev/null +++ b/bubus-rust/tests/test_event_result.rs @@ -0,0 +1,24 @@ +use bubus_rust::{ + event_handler::EventHandler, + event_result::{EventResult, EventResultStatus}, +}; + +#[test] +fn test_event_result_defaults() { + let handler = EventHandler { + id: "h1".into(), + event_pattern: "work".into(), + handler_name: "handler".into(), + handler_file_path: None, + handler_timeout: None, + handler_slow_timeout: None, + handler_registered_at: "2026-01-01T00:00:00.000Z".into(), + eventbus_name: "bus".into(), + eventbus_id: "bus-id".into(), + callable: None, + }; + + let result = EventResult::new("event-id".into(), handler, Some(5.0)); + assert_eq!(result.status, EventResultStatus::Pending); + assert_eq!(result.timeout, Some(5.0)); +} diff --git a/bubus-rust/tests/test_eventbus_dispatch_defaults.rs b/bubus-rust/tests/test_eventbus_dispatch_defaults.rs new file mode 100644 index 0000000..a45ce01 --- /dev/null +++ b/bubus-rust/tests/test_eventbus_dispatch_defaults.rs @@ -0,0 +1,41 @@ +use bubus_rust::{ + event_bus::EventBus, + typed::{EventSpec, TypedEvent}, + types::{EventHandlerCompletionMode, EventHandlerConcurrencyMode}, +}; +use futures::executor::block_on; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Clone, Serialize, Deserialize)] +struct Payload { + value: i64, +} +#[derive(Clone, Serialize, Deserialize)] +struct ResultT { + value: String, +} +struct WorkEvent; +impl EventSpec for WorkEvent { + type Payload = Payload; + type Result = ResultT; + const EVENT_TYPE: &'static str = "work"; +} + +#[test] +fn test_bus_default_handler_settings_are_applied() { + let bus = EventBus::new(Some("BusDefaults".to_string())); + + bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); + let event = TypedEvent::::new(Payload { value: 1 }); + { + let mut inner = event.inner.inner.lock(); + inner.event_handler_concurrency = Some(EventHandlerConcurrencyMode::Serial); + inner.event_handler_completion = Some(EventHandlerCompletionMode::All); + } + let event = bus.emit(event); + block_on(event.wait_completed()); + + assert_eq!(event.inner.inner.lock().event_results.len(), 1); + bus.stop(); +} diff --git a/bubus-rust/tests/test_eventbus_edge_cases.rs b/bubus-rust/tests/test_eventbus_edge_cases.rs new file mode 100644 index 0000000..f5a1c19 --- /dev/null +++ b/bubus-rust/tests/test_eventbus_edge_cases.rs @@ -0,0 +1,83 @@ +use bubus_rust::{ + event_bus::EventBus, + event_result::EventResultStatus, + typed::{EventSpec, TypedEvent}, +}; +use futures::executor::block_on; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Clone, Serialize, Deserialize)] +struct EmptyPayload {} +#[derive(Clone, Serialize, Deserialize)] +struct EmptyResult {} + +struct NothingEvent; +impl EventSpec for NothingEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "nothing"; +} +struct SpecificEvent; +impl EventSpec for SpecificEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "specific_event"; +} +struct WorkEvent; +impl EventSpec for WorkEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "work"; +} + +#[test] +fn test_emit_with_no_handlers_completes_event() { + let bus = EventBus::new(Some("NoHandlers".to_string())); + let event = bus.emit::(TypedEvent::::new(EmptyPayload {})); + + block_on(event.wait_completed()); + + let inner = event.inner.inner.lock(); + assert_eq!(inner.event_results.len(), 0); + assert_eq!(inner.event_pending_bus_count, 0); + assert!(inner.event_started_at.is_some()); + assert!(inner.event_completed_at.is_some()); + drop(inner); + bus.stop(); +} + +#[test] +fn test_wildcard_handler_runs_for_any_event_type() { + let bus = EventBus::new(Some("WildcardBus".to_string())); + bus.on("*", "catch_all", |_event| async move { Ok(json!("all")) }); + let event = bus.emit::(TypedEvent::::new(EmptyPayload {})); + + block_on(event.wait_completed()); + + let results = event.inner.inner.lock().event_results.clone(); + assert_eq!(results.len(), 1); + let only = results.values().next().expect("missing result"); + assert_eq!(only.result, Some(json!("all"))); + bus.stop(); +} + +#[test] +fn test_handler_error_populates_error_status() { + let bus = EventBus::new(Some("ErrorBus".to_string())); + bus.on( + "work", + "bad", + |_event| async move { Err("boom".to_string()) }, + ); + let event = bus.emit::(TypedEvent::::new(EmptyPayload {})); + + block_on(event.wait_completed()); + + let results = event.inner.inner.lock().event_results.clone(); + assert_eq!(results.len(), 1); + let only = results.values().next().expect("missing result"); + assert_eq!(only.status, EventResultStatus::Error); + assert_eq!(only.error.as_deref(), Some("boom")); + bus.stop(); +} diff --git a/bubus-rust/tests/test_eventbus_find.rs b/bubus-rust/tests/test_eventbus_find.rs new file mode 100644 index 0000000..48c199f --- /dev/null +++ b/bubus-rust/tests/test_eventbus_find.rs @@ -0,0 +1,56 @@ +use std::{thread, time::Duration}; + +use bubus_rust::{ + event_bus::EventBus, + typed::{EventSpec, TypedEvent}, +}; +use futures::executor::block_on; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Clone, Serialize, Deserialize)] +struct EmptyPayload {} +#[derive(Clone, Serialize, Deserialize)] +struct EmptyResult {} +struct WorkEvent; +impl EventSpec for WorkEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "work"; +} +struct FutureEvent; +impl EventSpec for FutureEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "future_event"; +} + +#[test] +fn test_find_past_match_returns_event() { + let bus = EventBus::new(Some("FindBus".to_string())); + bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); + + let event = bus.emit::(TypedEvent::::new(EmptyPayload {})); + block_on(event.wait_completed()); + + let found = block_on(bus.find("work", true, None, None)); + assert!(found.is_some()); + assert_eq!(found.expect("missing").inner.lock().event_type, "work"); + + bus.stop(); +} + +#[test] +fn test_find_future_waits_for_new_event() { + let bus = EventBus::new(Some("FindFutureBus".to_string())); + let bus_for_emit = bus.clone(); + + thread::spawn(move || { + thread::sleep(Duration::from_millis(30)); + bus_for_emit.emit::(TypedEvent::::new(EmptyPayload {})); + }); + + let found = block_on(bus.find("future_event", false, Some(0.5), None)); + assert!(found.is_some()); + bus.stop(); +} diff --git a/bubus-rust/tests/test_eventbus_locking.rs b/bubus-rust/tests/test_eventbus_locking.rs new file mode 100644 index 0000000..33a27b7 --- /dev/null +++ b/bubus-rust/tests/test_eventbus_locking.rs @@ -0,0 +1,111 @@ +use std::{thread, time::Duration}; + +use bubus_rust::{ + event_bus::EventBus, + typed::{EventSpec, TypedEvent}, + types::EventConcurrencyMode, +}; +use futures::executor::block_on; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Clone, Serialize, Deserialize)] +struct QPayload { + idx: i64, +} +#[derive(Clone, Serialize, Deserialize)] +struct EmptyPayload {} +#[derive(Clone, Serialize, Deserialize)] +struct EmptyResult {} +struct QEvent; +impl EventSpec for QEvent { + type Payload = QPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "q"; +} +struct WorkEvent; +impl EventSpec for WorkEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "work"; +} + +#[test] +fn test_queue_jump() { + let bus = EventBus::new(Some("BusJump".to_string())); + bus.on("q", "h", |event| async move { + let value = event + .inner + .lock() + .payload + .get("idx") + .cloned() + .unwrap_or(serde_json::Value::Null); + Ok(value) + }); + + let event1 = bus.emit::(TypedEvent::::new(QPayload { idx: 1 })); + let event2 = + bus.emit_with_options::(TypedEvent::::new(QPayload { idx: 2 }), true); + + block_on(async { + event1.wait_completed().await; + event2.wait_completed().await; + }); + + let event1_started = event1 + .inner + .inner + .lock() + .event_started_at + .clone() + .unwrap_or_default(); + let event2_started = event2 + .inner + .inner + .lock() + .event_started_at + .clone() + .unwrap_or_default(); + assert!(event2_started <= event1_started); + bus.stop(); +} + +#[test] +fn test_bus_serial_processes_in_order() { + let bus = EventBus::new(Some("BusSerial".to_string())); + + bus.on("work", "slow", |_event| async move { + thread::sleep(Duration::from_millis(15)); + Ok(json!(1)) + }); + + let event1 = TypedEvent::::new(EmptyPayload {}); + let event2 = TypedEvent::::new(EmptyPayload {}); + event1.inner.inner.lock().event_concurrency = Some(EventConcurrencyMode::BusSerial); + event2.inner.inner.lock().event_concurrency = Some(EventConcurrencyMode::BusSerial); + let event1 = bus.emit(event1); + let event2 = bus.emit(event2); + + block_on(async { + event1.wait_completed().await; + event2.wait_completed().await; + }); + + let event1_started = event1 + .inner + .inner + .lock() + .event_started_at + .clone() + .unwrap_or_default(); + let event2_started = event2 + .inner + .inner + .lock() + .event_started_at + .clone() + .unwrap_or_default(); + assert!(event1_started <= event2_started); + bus.stop(); +} diff --git a/bubus-rust/tests/test_eventbus_on_off.rs b/bubus-rust/tests/test_eventbus_on_off.rs new file mode 100644 index 0000000..6a719d3 --- /dev/null +++ b/bubus-rust/tests/test_eventbus_on_off.rs @@ -0,0 +1,35 @@ +use bubus_rust::{ + event_bus::EventBus, + typed::{EventSpec, TypedEvent}, +}; +use futures::executor::block_on; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Clone, Serialize, Deserialize)] +struct EmptyPayload {} +#[derive(Clone, Serialize, Deserialize)] +struct EmptyResult {} +struct WorkEvent; +impl EventSpec for WorkEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "work"; +} + +#[test] +fn test_on_returns_handler_and_off_removes_handler() { + let bus = EventBus::new(Some("OnOffBus".to_string())); + + let handler = bus.on("work", "h1", |_event| async move { Ok(json!("ok")) }); + let event_1 = bus.emit::(TypedEvent::::new(EmptyPayload {})); + block_on(event_1.wait_completed()); + assert_eq!(event_1.inner.inner.lock().event_results.len(), 1); + + bus.off("work", Some(&handler.id)); + let event_2 = bus.emit::(TypedEvent::::new(EmptyPayload {})); + block_on(event_2.wait_completed()); + assert_eq!(event_2.inner.inner.lock().event_results.len(), 0); + + bus.stop(); +} diff --git a/bubus-rust/tests/test_eventbus_timeout.rs b/bubus-rust/tests/test_eventbus_timeout.rs new file mode 100644 index 0000000..934b76f --- /dev/null +++ b/bubus-rust/tests/test_eventbus_timeout.rs @@ -0,0 +1,137 @@ +use std::{thread, time::Duration}; + +use bubus_rust::{ + event_bus::EventBus, + event_result::EventResultStatus, + typed::{EventSpec, TypedEvent}, +}; +use futures::executor::block_on; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Clone, Serialize, Deserialize)] +struct EmptyPayload {} +#[derive(Clone, Serialize, Deserialize)] +struct EmptyResult {} +struct TimeoutEvent; +impl EventSpec for TimeoutEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "timeout"; +} +struct ChildEvent; +impl EventSpec for ChildEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "child"; +} +struct ParentEvent; +impl EventSpec for ParentEvent { + type Payload = EmptyPayload; + type Result = EmptyResult; + const EVENT_TYPE: &'static str = "parent"; +} + +fn wait_until_completed(event: &TypedEvent, timeout_ms: u64) { + let started = std::time::Instant::now(); + while started.elapsed() < Duration::from_millis(timeout_ms) { + if event.inner.inner.lock().event_status == bubus_rust::types::EventStatus::Completed { + return; + } + thread::sleep(Duration::from_millis(5)); + } + panic!("event did not complete within {timeout_ms}ms"); +} + +#[test] +fn test_event_timeout_aborts_in_flight_handler_result() { + let bus = EventBus::new(Some("TimeoutBus".to_string())); + + bus.on("timeout", "slow", |_event| async move { + thread::sleep(Duration::from_millis(50)); + Ok(json!("slow")) + }); + + let event = TypedEvent::::new(EmptyPayload {}); + event.inner.inner.lock().event_timeout = Some(0.01); + + let event = bus.emit(event); + block_on(event.wait_completed()); + + let result = event + .inner + .inner + .lock() + .event_results + .values() + .next() + .cloned() + .expect("missing result"); + assert_eq!(result.status, EventResultStatus::Error); + assert!(result + .error + .as_deref() + .unwrap_or_default() + .contains("EventHandlerAbortedError")); + bus.stop(); +} + +#[test] +fn test_parent_timeout_cancels_pending_or_started_children() { + let bus = EventBus::new(Some("ParentTimeoutBus".to_string())); + let bus_for_handler = bus.clone(); + + bus.on("child", "child_slow", |_event| async move { + thread::sleep(Duration::from_millis(80)); + Ok(json!("child")) + }); + + bus.on("parent", "emit_child", move |_event| { + let bus_local = bus_for_handler.clone(); + async move { + let child = TypedEvent::::new(EmptyPayload {}); + child.inner.inner.lock().event_timeout = Some(1.0); + bus_local.emit(child); + thread::sleep(Duration::from_millis(80)); + Ok(json!("parent")) + } + }); + + let parent = TypedEvent::::new(EmptyPayload {}); + parent.inner.inner.lock().event_timeout = Some(0.01); + + let parent = bus.emit(parent); + wait_until_completed(&parent, 1000); + thread::sleep(Duration::from_millis(120)); + + let parent_result = parent + .inner + .inner + .lock() + .event_results + .values() + .next() + .cloned() + .expect("missing parent result"); + assert_eq!(parent_result.status, EventResultStatus::Error); + + let parent_id = parent.inner.inner.lock().event_id.clone(); + let payload = bus.runtime_payload_for_test(); + let child = payload + .values() + .find(|evt| evt.inner.lock().event_parent_id.as_deref() == Some(parent_id.as_str())) + .cloned() + .expect("missing child event"); + + let child_inner = child.inner.lock(); + let has_error = child_inner + .event_results + .values() + .any(|r| r.status == EventResultStatus::Error); + let is_completed = child_inner + .event_results + .values() + .any(|r| r.status == EventResultStatus::Completed); + assert!(has_error || is_completed); + bus.stop(); +} diff --git a/bubus-rust/tests/test_ids.rs b/bubus-rust/tests/test_ids.rs new file mode 100644 index 0000000..1e1ae27 --- /dev/null +++ b/bubus-rust/tests/test_ids.rs @@ -0,0 +1,48 @@ +use bubus_rust::{ + event_bus::EventBus, + event_handler::EventHandler, + id::{compute_handler_id, handler_id_namespace}, +}; +use serde_json::Map; +use uuid::Uuid; + +#[test] +fn test_bus_and_event_ids_are_uuid_v7() { + let bus = EventBus::new(Some("BusId".to_string())); + let bus_id = Uuid::parse_str(&bus.id).expect("bus id must parse"); + assert_eq!(bus_id.get_version_num(), 7); + + let event = bubus_rust::base_event::BaseEvent::new("work", Map::new()); + let event_id = Uuid::parse_str(&event.inner.lock().event_id).expect("event id must parse"); + assert_eq!(event_id.get_version_num(), 7); +} + +#[test] +fn test_handler_id_uses_v5_namespace_seed_compatible_with_python_ts() { + let eventbus_id = "018f6f0e-79b2-7cc5-aed9-f0f9a4e5e6b0"; + let handler_name = "module.fn"; + let handler_registered_at = "2026-01-01T00:00:00.000Z"; + let event_pattern = "work"; + let expected_seed = + format!("{eventbus_id}|{handler_name}|unknown|{handler_registered_at}|{event_pattern}"); + + let expected = Uuid::new_v5(&handler_id_namespace(), expected_seed.as_bytes()).to_string(); + let actual = compute_handler_id( + eventbus_id, + handler_name, + None, + handler_registered_at, + event_pattern, + ); + assert_eq!(actual, expected); + + let entry = EventHandler::from_callable( + event_pattern.to_string(), + handler_name.to_string(), + "BusId".to_string(), + eventbus_id.to_string(), + std::sync::Arc::new(|_event| Box::pin(async { Ok(serde_json::Value::Null) })), + ); + let ns = Uuid::parse_str(&entry.id).expect("handler id must parse"); + assert_eq!(ns.get_version_num(), 5); +} diff --git a/bubus-rust/tests/test_typed_events.rs b/bubus-rust/tests/test_typed_events.rs new file mode 100644 index 0000000..1076e34 --- /dev/null +++ b/bubus-rust/tests/test_typed_events.rs @@ -0,0 +1,58 @@ +use bubus_rust::{ + event_bus::EventBus, + typed::{EventSpec, TypedEvent}, +}; +use futures::executor::block_on; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Serialize, Deserialize)] +struct AddPayload { + a: i64, + b: i64, +} + +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] +struct AddResult { + sum: i64, +} + +struct AddEvent; + +impl EventSpec for AddEvent { + type Payload = AddPayload; + type Result = AddResult; + + const EVENT_TYPE: &'static str = "AddEvent"; +} + +#[test] +fn test_on_typed_and_emit_typed_roundtrip() { + let bus = EventBus::new(Some("TypedBus".to_string())); + + bus.on_typed::("add", |event: TypedEvent| async move { + let payload = event.payload(); + Ok(AddResult { + sum: payload.a + payload.b, + }) + }); + + let event = bus.emit::(TypedEvent::::new(AddPayload { a: 4, b: 9 })); + block_on(event.wait_completed()); + + let first = event.first_result(); + assert_eq!(first, Some(AddResult { sum: 13 })); + bus.stop(); +} + +#[test] +fn test_find_typed_returns_typed_payload() { + let bus = EventBus::new(Some("TypedFindBus".to_string())); + + let event = bus.emit::(TypedEvent::::new(AddPayload { a: 7, b: 1 })); + block_on(event.wait_completed()); + + let found = block_on(bus.find_typed::(true, None)).expect("expected typed event"); + assert_eq!(found.payload().a, 7); + assert_eq!(found.payload().b, 1); + bus.stop(); +}