Skip to content

Commit 5befb03

Browse files
atrakhConvex, Inc.
authored andcommitted
implement add_gauge_max (#44126)
GitOrigin-RevId: dcfe2d551810843aaddb2030f6edbba574f292bd
1 parent ff0b115 commit 5befb03

File tree

1 file changed

+103
-5
lines changed

1 file changed

+103
-5
lines changed

crates/udf_metrics/src/lib.rs

Lines changed: 103 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -213,13 +213,22 @@ impl MetricStore {
213213
self.add(MetricType::Gauge, metric_name, ts, value)
214214
}
215215

216-
fn add(
216+
/// Add a sample to a gauge metric, but only update if the new value is
217+
/// greater than the existing value in the bucket (tracking maximum).
218+
pub fn add_gauge_max(
217219
&mut self,
218-
metric_type: MetricType,
219220
metric_name: &str,
220221
ts: SystemTime,
221222
value: f32,
222223
) -> Result<(), UdfMetricsError> {
224+
self.add_gauge_with_op(metric_name, ts, value, |existing| existing.max(value))
225+
}
226+
227+
/// Validate timestamp and compute the bucket index.
228+
fn validate_and_get_bucket_index(
229+
&self,
230+
ts: SystemTime,
231+
) -> Result<BucketIndex, UdfMetricsError> {
223232
let Ok(since_base) = ts.duration_since(self.base_ts) else {
224233
return Err(UdfMetricsError::SamplePrecedesBaseTimestamp {
225234
ts,
@@ -235,8 +244,17 @@ impl MetricStore {
235244
cutoff: self.bucket_start(*max_bucket_index),
236245
});
237246
}
247+
Ok(bucket_index)
248+
}
238249

239-
let metric_key = match self.metrics_by_name.entry(metric_name.to_string()) {
250+
/// Get or create a metric key, validating that the metric type matches
251+
/// if the metric already exists.
252+
fn get_or_create_metric(
253+
&mut self,
254+
metric_name: &str,
255+
metric_type: MetricType,
256+
) -> Result<MetricKey, UdfMetricsError> {
257+
match self.metrics_by_name.entry(metric_name.to_string()) {
240258
hashmap::Entry::Occupied(entry) => {
241259
let metric = self
242260
.metrics
@@ -248,7 +266,7 @@ impl MetricStore {
248266
expected_type: metric.metric_type,
249267
});
250268
}
251-
*entry.get()
269+
Ok(*entry.get())
252270
},
253271
hashmap::Entry::Vacant(entry) => {
254272
let metric = Metric {
@@ -257,10 +275,57 @@ impl MetricStore {
257275
};
258276
let metric_key = self.metrics.alloc(metric);
259277
entry.insert(metric_key);
260-
metric_key
278+
Ok(metric_key)
279+
},
280+
}
281+
}
282+
283+
fn add_gauge_with_op(
284+
&mut self,
285+
metric_name: &str,
286+
ts: SystemTime,
287+
value: f32,
288+
op: impl FnOnce(f32) -> f32,
289+
) -> Result<(), UdfMetricsError> {
290+
let bucket_index = self.validate_and_get_bucket_index(ts)?;
291+
let metric_key = self.get_or_create_metric(metric_name, MetricType::Gauge)?;
292+
293+
let inserted = match self.bucket_by_metric.entry((metric_key, bucket_index)) {
294+
ordmap::Entry::Occupied(bucket_key) => {
295+
let bucket = self
296+
.gauge_buckets
297+
.get_mut(*bucket_key.get())
298+
.context("Invalid bucket key")?;
299+
bucket.value = op(bucket.value);
300+
false
301+
},
302+
ordmap::Entry::Vacant(entry) => {
303+
let new_bucket = GaugeBucket::new(bucket_index, value);
304+
let new_bucket_key = self.gauge_buckets.alloc(new_bucket);
305+
entry.insert(new_bucket_key);
306+
self.bucket_by_start
307+
.insert((bucket_index, metric_key), new_bucket_key);
308+
true
261309
},
262310
};
263311

312+
if inserted {
313+
self.prune_buckets()?;
314+
}
315+
316+
Ok(())
317+
}
318+
319+
fn add(
320+
&mut self,
321+
metric_type: MetricType,
322+
metric_name: &str,
323+
ts: SystemTime,
324+
value: f32,
325+
) -> Result<(), UdfMetricsError> {
326+
let bucket_index = self.validate_and_get_bucket_index(ts)?;
327+
let metric_key = self.get_or_create_metric(metric_name, metric_type)?;
328+
264329
let inserted = match self.bucket_by_metric.entry((metric_key, bucket_index)) {
265330
// Try to log into the desired bucket if it exists.
266331
ordmap::Entry::Occupied(bucket_key) => {
@@ -997,6 +1062,39 @@ mod tests {
9971062
Ok(())
9981063
}
9991064

1065+
#[test]
1066+
fn test_add_and_query_gauge_max() -> anyhow::Result<()> {
1067+
let mut store = new_store(2);
1068+
1069+
let t0 = store.base_ts;
1070+
let t1 = store.base_ts + Duration::from_secs(60); // next bucket
1071+
1072+
// Add values to the same bucket - should keep the max
1073+
store.add_gauge_max("peak_connections", t0, 5.0)?;
1074+
store.add_gauge_max("peak_connections", t0, 10.0)?; // higher, should update
1075+
store.add_gauge_max("peak_connections", t0, 3.0)?; // lower, should not update
1076+
1077+
// Query first bucket - should have the max value (10.0)
1078+
let result = store.query_gauge("peak_connections", t0..(t0 + Duration::from_secs(1)))?;
1079+
assert_eq!(result.len(), 1);
1080+
assert_eq!(result[0].value, 10.0);
1081+
1082+
// Add values to second bucket
1083+
store.add_gauge_max("peak_connections", t1, 7.0)?;
1084+
store.add_gauge_max("peak_connections", t1, 12.0)?; // higher, should update
1085+
store.add_gauge_max("peak_connections", t1, 8.0)?; // lower, should not update
1086+
1087+
// Query both buckets
1088+
let result = store.query_gauge("peak_connections", t0..(t1 + Duration::from_secs(120)))?;
1089+
assert_eq!(result.len(), 2);
1090+
assert_eq!(result[0].value, 10.0); // max from first bucket
1091+
assert_eq!(result[1].value, 12.0); // max from second bucket
1092+
1093+
store.consistency_check()?;
1094+
1095+
Ok(())
1096+
}
1097+
10001098
#[test]
10011099
fn test_add_and_query_histogram() -> anyhow::Result<()> {
10021100
let mut store = new_store(2);

0 commit comments

Comments
 (0)