Skip to content

Commit 9280839

Browse files
committed
Merge M1.3.2 state machine implementation to main (PR #210)
2 parents 11cd5f0 + efd100d commit 9280839

11 files changed

Lines changed: 2491 additions & 24 deletions

File tree

crates/mimi-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ uuid.workspace = true
1919
chrono.workspace = true
2020
log.workspace = true
2121
num_cpus = "1.16"
22+
rand = "0.8"
2223

2324
[dev-dependencies]
2425
tokio-test = "0.4"
Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
//! Health Monitoring System
2+
//!
3+
//! Extends basic health checks with metric tracking, auto-publishing to Pandora,
4+
//! and auto-escalation on threshold breaches.
5+
6+
use anyhow::Result;
7+
use chrono::{DateTime, Utc};
8+
use serde::{Deserialize, Serialize};
9+
use std::collections::VecDeque;
10+
use std::sync::{Arc, Mutex};
11+
use tracing::{debug, info, warn};
12+
13+
use crate::pandora_client::PandoraClient;
14+
use crate::state_machine::{ComponentHealthCheck, MimiState};
15+
use crate::zenoh_bus::ZenohBusAdapter;
16+
17+
const MAX_METRIC_HISTORY: usize = 1000;
18+
const FAILURE_THRESHOLD: usize = 5;
19+
20+
#[derive(Debug, Clone, Serialize, Deserialize)]
21+
pub struct HealthMetric {
22+
pub timestamp: DateTime<Utc>,
23+
pub component_name: String,
24+
pub metric_type: HealthMetricType,
25+
pub value: f64,
26+
pub threshold: f64,
27+
pub is_healthy: bool,
28+
}
29+
30+
#[derive(Debug, Clone, Serialize, Deserialize)]
31+
#[serde(rename_all = "snake_case")]
32+
pub enum HealthMetricType {
33+
CpuUsage,
34+
MemoryUsage,
35+
Latency,
36+
ErrorRate,
37+
HeartbeatMissed,
38+
}
39+
40+
pub struct HealthMonitor {
41+
metrics: Arc<Mutex<VecDeque<HealthMetric>>>,
42+
pandora: Option<Arc<PandoraClient>>,
43+
zenoh: Option<Arc<ZenohBusAdapter>>,
44+
failure_counts: Arc<Mutex<std::collections::HashMap<String, usize>>>,
45+
}
46+
47+
impl HealthMonitor {
48+
pub fn new() -> Self {
49+
Self {
50+
metrics: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_METRIC_HISTORY))),
51+
pandora: None,
52+
zenoh: None,
53+
failure_counts: Arc::new(Mutex::new(std::collections::HashMap::new())),
54+
}
55+
}
56+
57+
pub async fn with_pandora(mut self, pandora: Arc<PandoraClient>) -> Self {
58+
self.pandora = Some(pandora);
59+
self
60+
}
61+
62+
pub async fn with_zenoh(mut self, zenoh: Arc<ZenohBusAdapter>) -> Self {
63+
self.zenoh = Some(zenoh);
64+
self
65+
}
66+
67+
pub async fn record_metric(&self, metric: HealthMetric) -> Result<()> {
68+
{
69+
let mut metrics = self.metrics.lock().unwrap();
70+
if metrics.len() >= MAX_METRIC_HISTORY {
71+
metrics.pop_front();
72+
}
73+
metrics.push_back(metric.clone());
74+
}
75+
76+
if !metric.is_healthy {
77+
self.track_failure(&metric.component_name).await?;
78+
} else {
79+
self.reset_failure_count(&metric.component_name).await;
80+
}
81+
82+
if let Some(pandora) = &self.pandora {
83+
self.publish_to_pandora(pandora, &metric).await?;
84+
}
85+
86+
if let Some(zenoh) = &self.zenoh {
87+
self.publish_to_zenoh(zenoh, &metric).await?;
88+
}
89+
90+
Ok(())
91+
}
92+
93+
async fn track_failure(&self, component: &str) -> Result<()> {
94+
let count = {
95+
let mut counts = self.failure_counts.lock().unwrap();
96+
let entry = counts.entry(component.to_string()).or_insert(0);
97+
*entry += 1;
98+
*entry
99+
};
100+
101+
if count >= FAILURE_THRESHOLD {
102+
warn!(
103+
"Component {} exceeded failure threshold: {}/{}",
104+
component, count, FAILURE_THRESHOLD
105+
);
106+
self.escalate_failure(component).await?;
107+
}
108+
109+
Ok(())
110+
}
111+
112+
async fn reset_failure_count(&self, component: &str) {
113+
let mut counts = self.failure_counts.lock().unwrap();
114+
counts.remove(component);
115+
}
116+
117+
async fn escalate_failure(&self, component: &str) -> Result<()> {
118+
info!("Escalating failure for component: {}", component);
119+
120+
if let Some(pandora) = &self.pandora {
121+
let metadata = serde_json::json!({
122+
"component": component,
123+
"failure_count": FAILURE_THRESHOLD,
124+
"action": "escalated",
125+
});
126+
127+
pandora
128+
.persist_critical_state(MimiState::FailedComponent, Utc::now(), metadata)
129+
.await?;
130+
}
131+
132+
Ok(())
133+
}
134+
135+
async fn publish_to_pandora(
136+
&self,
137+
pandora: &Arc<PandoraClient>,
138+
metric: &HealthMetric,
139+
) -> Result<()> {
140+
if !metric.is_healthy {
141+
let metadata = serde_json::json!({
142+
"metric_type": format!("{:?}", metric.metric_type),
143+
"value": metric.value,
144+
"threshold": metric.threshold,
145+
"component": metric.component_name,
146+
});
147+
148+
pandora
149+
.persist_critical_state(MimiState::Degraded, metric.timestamp, metadata)
150+
.await?;
151+
152+
debug!(
153+
"Published unhealthy metric to Pandora: {:?}",
154+
metric.metric_type
155+
);
156+
}
157+
158+
Ok(())
159+
}
160+
161+
async fn publish_to_zenoh(
162+
&self,
163+
zenoh: &Arc<ZenohBusAdapter>,
164+
metric: &HealthMetric,
165+
) -> Result<()> {
166+
if !metric.is_healthy {
167+
zenoh
168+
.publish_state_change(MimiState::Idle, MimiState::Degraded, metric.timestamp)
169+
.await?;
170+
171+
debug!("Published health degradation to Zenoh");
172+
}
173+
174+
Ok(())
175+
}
176+
177+
pub fn get_recent_metrics(&self, count: usize) -> Vec<HealthMetric> {
178+
let metrics = self.metrics.lock().unwrap();
179+
metrics.iter().rev().take(count).cloned().collect()
180+
}
181+
182+
pub fn get_metrics_in_window(&self, window_secs: i64) -> Vec<HealthMetric> {
183+
let metrics = self.metrics.lock().unwrap();
184+
let cutoff = Utc::now() - chrono::Duration::seconds(window_secs);
185+
186+
metrics
187+
.iter()
188+
.filter(|m| m.timestamp > cutoff)
189+
.cloned()
190+
.collect()
191+
}
192+
193+
pub async fn check_component_health(&self, health_check: &ComponentHealthCheck) -> Result<()> {
194+
let is_healthy = health_check.is_healthy();
195+
196+
let metric = HealthMetric {
197+
timestamp: Utc::now(),
198+
component_name: "system".to_string(),
199+
metric_type: HealthMetricType::ErrorRate,
200+
value: if is_healthy { 0.0 } else { 100.0 },
201+
threshold: 10.0,
202+
is_healthy,
203+
};
204+
205+
self.record_metric(metric).await?;
206+
Ok(())
207+
}
208+
}
209+
210+
impl Default for HealthMonitor {
211+
fn default() -> Self {
212+
Self::new()
213+
}
214+
}
215+
216+
#[cfg(test)]
217+
mod tests {
218+
use super::*;
219+
220+
#[tokio::test]
221+
async fn test_health_metric_tracking() {
222+
let monitor = HealthMonitor::new();
223+
224+
let metric = HealthMetric {
225+
timestamp: Utc::now(),
226+
component_name: "test_component".to_string(),
227+
metric_type: HealthMetricType::CpuUsage,
228+
value: 45.0,
229+
threshold: 80.0,
230+
is_healthy: true,
231+
};
232+
233+
let result = monitor.record_metric(metric).await;
234+
assert!(result.is_ok());
235+
236+
let recent = monitor.get_recent_metrics(10);
237+
assert_eq!(recent.len(), 1);
238+
assert_eq!(recent[0].component_name, "test_component");
239+
}
240+
241+
#[tokio::test]
242+
async fn test_auto_escalation() {
243+
let monitor = HealthMonitor::new();
244+
245+
for _i in 0..FAILURE_THRESHOLD + 1 {
246+
let metric = HealthMetric {
247+
timestamp: Utc::now(),
248+
component_name: "failing_component".to_string(),
249+
metric_type: HealthMetricType::ErrorRate,
250+
value: 100.0,
251+
threshold: 10.0,
252+
is_healthy: false,
253+
};
254+
255+
let result = monitor.record_metric(metric).await;
256+
assert!(result.is_ok());
257+
}
258+
259+
let counts = monitor.failure_counts.lock().unwrap();
260+
assert!(counts.get("failing_component").unwrap_or(&0) >= &FAILURE_THRESHOLD);
261+
}
262+
263+
#[tokio::test]
264+
async fn test_metrics_in_window() {
265+
let monitor = HealthMonitor::new();
266+
267+
let old_metric = HealthMetric {
268+
timestamp: Utc::now() - chrono::Duration::hours(2),
269+
component_name: "test".to_string(),
270+
metric_type: HealthMetricType::Latency,
271+
value: 50.0,
272+
threshold: 100.0,
273+
is_healthy: true,
274+
};
275+
276+
let new_metric = HealthMetric {
277+
timestamp: Utc::now(),
278+
component_name: "test".to_string(),
279+
metric_type: HealthMetricType::Latency,
280+
value: 60.0,
281+
threshold: 100.0,
282+
is_healthy: true,
283+
};
284+
285+
monitor.record_metric(old_metric).await.unwrap();
286+
monitor.record_metric(new_metric).await.unwrap();
287+
288+
let window_metrics = monitor.get_metrics_in_window(3600);
289+
assert_eq!(window_metrics.len(), 1);
290+
}
291+
292+
#[tokio::test]
293+
async fn test_failure_count_reset() {
294+
let monitor = HealthMonitor::new();
295+
296+
let bad_metric = HealthMetric {
297+
timestamp: Utc::now(),
298+
component_name: "test".to_string(),
299+
metric_type: HealthMetricType::ErrorRate,
300+
value: 100.0,
301+
threshold: 10.0,
302+
is_healthy: false,
303+
};
304+
305+
monitor.record_metric(bad_metric).await.unwrap();
306+
307+
{
308+
let counts = monitor.failure_counts.lock().unwrap();
309+
assert_eq!(counts.get("test"), Some(&1));
310+
}
311+
312+
let good_metric = HealthMetric {
313+
timestamp: Utc::now(),
314+
component_name: "test".to_string(),
315+
metric_type: HealthMetricType::ErrorRate,
316+
value: 5.0,
317+
threshold: 10.0,
318+
is_healthy: true,
319+
};
320+
321+
monitor.record_metric(good_metric).await.unwrap();
322+
323+
{
324+
let counts = monitor.failure_counts.lock().unwrap();
325+
assert_eq!(counts.get("test"), None);
326+
}
327+
}
328+
}

crates/mimi-core/src/lib.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,21 @@
55
66
pub mod config;
77
pub mod error;
8+
pub mod health_monitor;
89
pub mod message;
10+
pub mod pandora_client;
911
pub mod routing;
1012
pub mod serialization;
1113
pub mod state_machine;
14+
pub mod zenoh_bus;
1215

1316
pub use error::{Error, Result};
17+
pub use health_monitor::{HealthMetric, HealthMetricType, HealthMonitor};
18+
pub use pandora_client::{FailurePattern, Neo4jConfig, PandoraClient, StateHistoryRecord};
1419
pub use routing::{MessageRouter, RoutingError, Topic, TopicPattern};
1520
pub use serialization::{MessageSerializer, SerializationError};
16-
pub use state_machine::{MimiState, StateManager};
21+
pub use state_machine::{ComponentHealthCheck, MimiState, StateManager};
22+
pub use zenoh_bus::{StateChangeMessage, ZenohBusAdapter, ZenohConfig};
1723

1824
/// Core version
1925
pub const VERSION: &str = env!("CARGO_PKG_VERSION");

crates/mimi-core/src/message.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use chrono::{DateTime, Utc};
12
use serde::{Deserialize, Serialize};
23
use uuid::Uuid;
34

@@ -10,6 +11,15 @@ pub struct Message {
1011
pub payload: serde_json::Value,
1112
}
1213

14+
/// Task message for Zenoh bus
15+
#[derive(Debug, Clone, Serialize, Deserialize)]
16+
pub struct TaskMessage {
17+
pub id: String,
18+
pub payload: String,
19+
pub priority: u8,
20+
pub created_at: DateTime<Utc>,
21+
}
22+
1323
impl Message {
1424
pub fn new(
1525
source: impl Into<String>,

0 commit comments

Comments
 (0)