Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.

Commit c83c910

Browse files
authored
add additional prometheus operator metrics, add taskname to node metrics (#428)
* add additional prometheus operator metrics, add taskname to node metrics
1 parent 61a0a27 commit c83c910

File tree

6 files changed

+254
-35
lines changed

6 files changed

+254
-35
lines changed

crates/orchestrator/src/api/routes/heartbeat.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ async fn heartbeat(
2828
}
2929
};
3030

31+
// Track heartbeat request in metrics
32+
app_state
33+
.metrics
34+
.increment_heartbeat_requests(&heartbeat.address);
35+
3136
let node_opt = app_state
3237
.store_context
3338
.node_store
@@ -257,13 +262,15 @@ mod tests {
257262
// Verify Prometheus registry is initially empty (no sync service has run)
258263
let prometheus_metrics_before = app_state.metrics.export_metrics().unwrap();
259264
assert!(!prometheus_metrics_before.contains("performance/batch_avg_seq_length"));
265+
assert!(prometheus_metrics_before.contains(&format!("orchestrator_heartbeat_requests_total{{node_address=\"0x0000000000000000000000000000000000000000\",pool_id=\"{}\"}} 1", app_state.metrics.pool_id)));
260266

261267
// Create and run sync service manually to test the sync functionality
262268
let sync_service = MetricsSyncService::new(
263269
app_state.store_context.clone(),
264270
app_state.metrics.clone(),
265271
ServerMode::Full, // Test app uses Full mode
266272
10,
273+
None, // No node groups plugin in test
267274
);
268275

269276
// Manually trigger a sync operation

crates/orchestrator/src/api/routes/storage.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,12 @@ async fn request_upload(
274274
file_name
275275
);
276276

277+
app_state.metrics.increment_file_upload_requests(
278+
&request_upload.task_id,
279+
&task.name,
280+
address,
281+
);
282+
277283
#[cfg(test)]
278284
return HttpResponse::Ok().json(serde_json::json!({
279285
"success": true,
@@ -380,6 +386,10 @@ mod tests {
380386
json["file_name"],
381387
serde_json::Value::String("model_123/user_uploads/test.parquet".to_string())
382388
);
389+
390+
let metrics = app_state.metrics.export_metrics().unwrap();
391+
assert!(metrics.contains("orchestrator_file_upload_requests_total"));
392+
assert!(metrics.contains(&format!("orchestrator_file_upload_requests_total{{node_address=\"test_address\",pool_id=\"{}\",task_id=\"{}\",task_name=\"test-task\"}} 1", app_state.metrics.pool_id, task.id)));
383393
}
384394

385395
#[actix_web::test]

crates/orchestrator/src/main.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -272,12 +272,14 @@ async fn main() -> Result<()> {
272272
// Start metrics sync service to centralize metrics from Redis to Prometheus
273273
let metrics_sync_store_context = store_context.clone();
274274
let metrics_sync_context = metrics_context.clone();
275+
let metrics_sync_node_groups = node_groups_plugin.clone();
275276
tasks.spawn(async move {
276277
let sync_service = MetricsSyncService::new(
277278
metrics_sync_store_context,
278279
metrics_sync_context,
279280
server_mode,
280281
10,
282+
metrics_sync_node_groups,
281283
);
282284
sync_service.run().await
283285
});
@@ -328,8 +330,6 @@ async fn main() -> Result<()> {
328330

329331
let status_update_store_context = store_context.clone();
330332
let status_update_heartbeats = heartbeats.clone();
331-
let status_update_metrics = metrics_context.clone();
332-
333333
tasks.spawn({
334334
let contracts = contracts.clone();
335335
async move {
@@ -342,7 +342,6 @@ async fn main() -> Result<()> {
342342
args.disable_ejection,
343343
status_update_heartbeats.clone(),
344344
status_update_plugins,
345-
status_update_metrics,
346345
);
347346
status_updater.run().await
348347
}
Lines changed: 122 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,154 @@
1-
use prometheus::{GaugeVec, Opts, Registry, TextEncoder};
1+
use prometheus::{CounterVec, GaugeVec, Opts, Registry, TextEncoder};
22
pub mod sync_service;
33
pub mod webhook_sender;
44

55
pub struct MetricsContext {
66
pub compute_task_gauges: GaugeVec,
77
pub pool_id: String,
88
pub registry: Registry,
9+
pub file_upload_requests_total: CounterVec,
10+
pub nodes_total: GaugeVec,
11+
pub tasks_total: GaugeVec,
12+
pub groups_total: GaugeVec,
13+
pub heartbeat_requests_total: CounterVec,
14+
pub nodes_per_task: GaugeVec,
915
}
1016

1117
impl MetricsContext {
1218
pub fn new(pool_id: String) -> Self {
1319
// For current state/rate metrics
1420
let compute_task_gauges = GaugeVec::new(
1521
Opts::new("compute_gauges", "Compute task gauge metrics"),
16-
&["node_address", "task_id", "label", "pool_id"],
22+
&["node_address", "task_id", "task_name", "label", "pool_id"],
1723
)
1824
.unwrap();
25+
26+
// New metrics for orchestrator statistics
27+
let file_upload_requests_total = CounterVec::new(
28+
Opts::new(
29+
"orchestrator_file_upload_requests_total",
30+
"Total number of file upload requests",
31+
),
32+
&["task_id", "task_name", "node_address", "pool_id"],
33+
)
34+
.unwrap();
35+
36+
let nodes_total = GaugeVec::new(
37+
Opts::new(
38+
"orchestrator_nodes_total",
39+
"Total number of nodes by status",
40+
),
41+
&["status", "pool_id"],
42+
)
43+
.unwrap();
44+
45+
let tasks_total = GaugeVec::new(
46+
Opts::new("orchestrator_tasks_total", "Total number of tasks"),
47+
&["pool_id"],
48+
)
49+
.unwrap();
50+
51+
let groups_total = GaugeVec::new(
52+
Opts::new(
53+
"orchestrator_groups_total",
54+
"Total number of node groups by configuration",
55+
),
56+
&["configuration_name", "pool_id"],
57+
)
58+
.unwrap();
59+
60+
let heartbeat_requests_total = CounterVec::new(
61+
Opts::new(
62+
"orchestrator_heartbeat_requests_total",
63+
"Total number of heartbeat requests per node",
64+
),
65+
&["node_address", "pool_id"],
66+
)
67+
.unwrap();
68+
69+
let nodes_per_task = GaugeVec::new(
70+
Opts::new(
71+
"orchestrator_nodes_per_task",
72+
"Number of nodes actively working on each task",
73+
),
74+
&["task_id", "task_name", "pool_id"],
75+
)
76+
.unwrap();
77+
1978
let registry = Registry::new();
2079
let _ = registry.register(Box::new(compute_task_gauges.clone()));
80+
let _ = registry.register(Box::new(file_upload_requests_total.clone()));
81+
let _ = registry.register(Box::new(nodes_total.clone()));
82+
let _ = registry.register(Box::new(tasks_total.clone()));
83+
let _ = registry.register(Box::new(groups_total.clone()));
84+
let _ = registry.register(Box::new(heartbeat_requests_total.clone()));
85+
let _ = registry.register(Box::new(nodes_per_task.clone()));
2186

2287
Self {
2388
compute_task_gauges,
2489
pool_id,
2590
registry,
91+
file_upload_requests_total,
92+
nodes_total,
93+
tasks_total,
94+
groups_total,
95+
heartbeat_requests_total,
96+
nodes_per_task,
2697
}
2798
}
2899

29100
pub fn record_compute_task_gauge(
30101
&self,
31102
node_address: &str,
32103
task_id: &str,
104+
task_name: &str,
33105
label: &str,
34106
value: f64,
35107
) {
36108
self.compute_task_gauges
37-
.with_label_values(&[node_address, task_id, label, &self.pool_id])
109+
.with_label_values(&[node_address, task_id, task_name, label, &self.pool_id])
38110
.set(value);
39111
}
40112

41-
pub fn remove_compute_task_gauge(&self, node_address: &str, task_id: &str, label: &str) {
42-
if let Err(e) = self.compute_task_gauges.remove_label_values(&[
43-
node_address,
44-
task_id,
45-
label,
46-
&self.pool_id,
47-
]) {
48-
println!("Error removing compute task gauge: {}", e);
49-
}
113+
pub fn increment_file_upload_requests(
114+
&self,
115+
task_id: &str,
116+
task_name: &str,
117+
node_address: &str,
118+
) {
119+
self.file_upload_requests_total
120+
.with_label_values(&[task_id, task_name, node_address, &self.pool_id])
121+
.inc();
122+
}
123+
124+
pub fn increment_heartbeat_requests(&self, node_address: &str) {
125+
self.heartbeat_requests_total
126+
.with_label_values(&[node_address, &self.pool_id])
127+
.inc();
128+
}
129+
130+
pub fn set_nodes_count(&self, status: &str, count: f64) {
131+
self.nodes_total
132+
.with_label_values(&[status, &self.pool_id])
133+
.set(count);
134+
}
135+
136+
pub fn set_tasks_count(&self, count: f64) {
137+
self.tasks_total
138+
.with_label_values(&[&self.pool_id])
139+
.set(count);
140+
}
141+
142+
pub fn set_groups_count(&self, configuration_name: &str, count: f64) {
143+
self.groups_total
144+
.with_label_values(&[configuration_name, &self.pool_id])
145+
.set(count);
146+
}
147+
148+
pub fn set_nodes_per_task(&self, task_id: &str, task_name: &str, count: f64) {
149+
self.nodes_per_task
150+
.with_label_values(&[task_id, task_name, &self.pool_id])
151+
.set(count);
50152
}
51153

52154
pub fn export_metrics(&self) -> Result<String, prometheus::Error> {
@@ -61,4 +163,12 @@ impl MetricsContext {
61163
// This removes all existing metrics so we can rebuild from Redis
62164
self.compute_task_gauges.reset();
63165
}
166+
167+
/// Clear all orchestrator statistics metrics
168+
pub fn clear_orchestrator_statistics(&self) {
169+
self.nodes_total.reset();
170+
self.tasks_total.reset();
171+
self.groups_total.reset();
172+
self.nodes_per_task.reset();
173+
}
64174
}

0 commit comments

Comments
 (0)