diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 1e3ee7fe..bedfce9a 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -19,6 +19,7 @@ use crate::PartitionId; use crate::cluster::{Cluster, ServerNode, ServerType}; use crate::error::{Error, FlussError, Result}; use crate::metadata::{PhysicalTablePath, TableBucket, TablePath}; +use crate::metrics::{CLIENT_METADATA_ERRORS_TOTAL, CLIENT_METADATA_REFRESHES_TOTAL}; use crate::proto::MetadataResponse; use crate::rpc::message::UpdateMetadataRequest; use crate::rpc::{RpcClient, ServerConnection}; @@ -149,33 +150,43 @@ impl Metadata { physical_table_paths: &HashSet<&Arc>, partition_ids: Vec, ) -> Result<()> { - let maybe_server = { - let guard = self.cluster.read(); - guard.get_one_available_server().cloned() - }; - - let server = match maybe_server { - Some(s) => s, - None => { - info!( - "No available tablet server to update metadata, attempting to re-initialize cluster using bootstrap server." - ); - self.reinit_cluster().await?; - return Ok(()); - } - }; + metrics::counter!(CLIENT_METADATA_REFRESHES_TOTAL).increment(1); + // Run the refresh in an inner block so every failure path (no-server + // reinit, connection, RPC, apply) increments the error counter once. + let result: Result<()> = async { + let maybe_server = { + let guard = self.cluster.read(); + guard.get_one_available_server().cloned() + }; + + let server = match maybe_server { + Some(s) => s, + None => { + info!( + "No available tablet server to update metadata, attempting to re-initialize cluster using bootstrap server." + ); + self.reinit_cluster().await?; + return Ok(()); + } + }; - let conn = self.connections.get_connection(&server).await?; + let conn = self.connections.get_connection(&server).await?; - let response = conn - .request(UpdateMetadataRequest::new( - table_paths, - physical_table_paths, - partition_ids, - )) - .await?; - self.update(response).await?; - Ok(()) + let response = conn + .request(UpdateMetadataRequest::new( + table_paths, + physical_table_paths, + partition_ids, + )) + .await?; + self.update(response).await?; + Ok(()) + } + .await; + if result.is_err() { + metrics::counter!(CLIENT_METADATA_ERRORS_TOTAL).increment(1); + } + result } pub async fn update_table_metadata(&self, table_path: &TablePath) -> Result<()> { @@ -338,6 +349,63 @@ mod tests { assert!(cluster.get_tablet_server(1).is_none()); } + #[test] + fn metadata_refresh_and_error_counters_increment() { + use crate::cluster::Cluster; + use metrics_util::debugging::{DebugValue, DebuggingRecorder}; + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + // An empty cluster has no available server, so the refresh falls back to + // `reinit_cluster`, which fails fast in `parse_bootstrap` against the + // empty test bootstrap — no network, fully deterministic. + let metadata = Metadata::new_for_test(Arc::new(Cluster::default())); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // `block_on` keeps all `counter!` calls on this thread so the + // thread-local recorder installed by `with_local_recorder` sees them. + metrics::with_local_recorder(&recorder, || { + rt.block_on(async { + let result = metadata.update_table_metadata(&table_path).await; + assert!(result.is_err(), "refresh with empty bootstrap must fail"); + }); + }); + + let entries: Vec<_> = snapshotter.snapshot().into_vec(); + let counter = |name: &str| -> u64 { + entries + .iter() + .find_map(|(key, _, _, val)| { + if key.key().name() == name { + match val { + DebugValue::Counter(v) => Some(*v), + _ => None, + } + } else { + None + } + }) + .unwrap_or(0) + }; + + assert_eq!( + counter(CLIENT_METADATA_REFRESHES_TOTAL), + 1, + "one refresh attempt should be counted" + ); + assert_eq!( + counter(CLIENT_METADATA_ERRORS_TOTAL), + 1, + "the failed refresh should be counted as an error" + ); + } + #[test] fn parse_bootstrap_variants() { // valid IP diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 8578daa4..d05e9fee 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -31,7 +31,7 @@ use crate::error::{ApiError, Error, FlussError, Result}; use crate::metadata::{ LogFormat, PhysicalTablePath, RowType, SchemaInfo, TableBucket, TableInfo, TablePath, }; -use crate::metrics::ScannerMetrics; +use crate::metrics::{SCANNER_ERROR_KIND_BUCKET, SCANNER_ERROR_KIND_RPC, ScannerMetrics}; use crate::proto::{ ErrorResponse, FetchLogRequest, FetchLogResponse, PbFetchLogReqForBucket, PbFetchLogReqForTable, }; @@ -1431,6 +1431,7 @@ impl LogFetcher { warn!( "Retrying after error fetching log from destination node {server_node:?}: {e:?}" ); + metrics.record_error(SCANNER_ERROR_KIND_RPC); Self::handle_fetch_failure(metadata, &leader, &fetch_request).await; return; } @@ -1511,6 +1512,7 @@ impl LogFetcher { if let Some(error_code) = fetch_log_for_bucket.error_code && error_code != FlussError::None.code() { + metrics.record_error(SCANNER_ERROR_KIND_BUCKET); let api_error: ApiError = ErrorResponse { error_code, error_message: fetch_log_for_bucket.error_message.clone(), diff --git a/crates/fluss/src/client/write/sender.rs b/crates/fluss/src/client/write/sender.rs index baf5a9b3..76d3dcf5 100644 --- a/crates/fluss/src/client/write/sender.rs +++ b/crates/fluss/src/client/write/sender.rs @@ -23,7 +23,10 @@ use crate::client::{ReadyWriteBatch, RecordAccumulator}; use crate::error::Error::UnexpectedError; use crate::error::{FlussError, Result}; use crate::metadata::{PhysicalTablePath, TableBucket, TablePath}; -use crate::metrics::WriterMetrics; +use crate::metrics::{ + WRITER_ERROR_KIND_LOCAL_BUILD, WRITER_ERROR_KIND_MAX_RETRIES_EXCEEDED, + WRITER_ERROR_KIND_NON_RETRIABLE, WRITER_ERROR_KIND_WRITER_ID_CHANGED, WriterMetrics, +}; use crate::proto::{ PbProduceLogRespForBucket, PbPutKvRespForBucket, PbTablePath, ProduceLogResponse, PutKvResponse, }; @@ -581,7 +584,9 @@ impl Sender { error: broadcast::Error, fluss_error: Option, adjust_sequences: bool, + error_kind: &'static str, ) { + self.metrics.record_error(error_kind); if self.idempotence_manager.is_enabled() && ready_write_batch.write_batch.batch_sequence() != NO_BATCH_SEQUENCE { @@ -642,6 +647,7 @@ impl Sender { }, None, true, + WRITER_ERROR_KIND_LOCAL_BUILD, ); } Ok(()) @@ -715,6 +721,7 @@ impl Sender { }, Some(FlussError::UnknownWriterIdException), false, + WRITER_ERROR_KIND_WRITER_ID_CHANGED, ); return Ok( Self::is_invalid_metadata_error(error).then_some(physical_table_path) @@ -731,6 +738,16 @@ impl Sender { // reset all writer state internally (matching Java). // For other errors, only adjust sequences if the batch didn't exhaust its retries. let can_adjust = ready_write_batch.write_batch.attempts() < self.retries; + // `max_retries_exceeded` means retryable-in-principle AND exhausted retry + // budget. Retryable-in-principle includes idempotence-specific cases + // (`can_retry_for_error`), not only `is_retriable_error`. + let retriable_in_principle = self.is_retriable_in_principle(&ready_write_batch, error); + let exhausted_retries = ready_write_batch.write_batch.attempts() >= self.retries; + let error_kind = if exhausted_retries && retriable_in_principle { + WRITER_ERROR_KIND_MAX_RETRIES_EXCEEDED + } else { + WRITER_ERROR_KIND_NON_RETRIABLE + }; self.fail_batch( ready_write_batch, broadcast::Error::WriteFailed { @@ -739,6 +756,7 @@ impl Sender { }, Some(error), can_adjust, + error_kind, ); Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path)) } @@ -785,20 +803,26 @@ impl Sender { { return false; } + self.is_retriable_in_principle(ready_write_batch, error) + } + + fn is_retriable_in_principle( + &self, + ready_write_batch: &ReadyWriteBatch, + error: FlussError, + ) -> bool { if Self::is_retriable_error(error) { return true; } - // Idempotent-specific retry logic let seq = ready_write_batch.write_batch.batch_sequence(); - if self.idempotence_manager.is_enabled() && seq != NO_BATCH_SEQUENCE { - return self.idempotence_manager.can_retry_for_error( + self.idempotence_manager.is_enabled() + && seq != NO_BATCH_SEQUENCE + && self.idempotence_manager.can_retry_for_error( &ready_write_batch.table_bucket, seq, ready_write_batch.write_batch.batch_id(), error, - ); - } - false + ) } async fn update_metadata_if_needed( @@ -1216,6 +1240,69 @@ mod tests { assert_eq!(retry_total, Some(1)); } + #[test] + fn exhausted_idempotence_retriable_error_is_classified_as_max_retries_exceeded() { + use metrics_util::debugging::{DebugValue, DebuggingRecorder}; + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + let result: Result<()> = metrics::with_local_recorder(&recorder, || { + let table_path = Arc::new(TablePath::new("db".to_string(), "tbl".to_string())); + let cluster = build_cluster_arc(table_path.as_ref(), 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster.clone())); + let idempotence = enabled_idempotence(); + idempotence.set_writer_id(42); + let accumulator = Arc::new(RecordAccumulator::new( + Config::default(), + Arc::clone(&idempotence), + )); + // retries=0 forces the terminal path while this error remains + // retryable-in-principle through idempotence_manager.can_retry_for_error. + let sender = Sender::new( + metadata, + accumulator.clone(), + 1024 * 1024, + 1000, + -1, + 0, + idempotence, + Arc::new(crate::metrics::WriterMetrics::new()), + ); + + let (mut batch, _handle) = + build_ready_batch(accumulator.as_ref(), cluster.clone(), table_path.clone())?; + // OOS is idempotence-retriable when the sequence is not the next expected. + batch.write_batch.set_writer_state(42, 1); + sender.handle_write_batch_error( + batch, + FlussError::OutOfOrderSequenceException, + "out of order".to_string(), + )?; + Ok(()) + }); + result.expect("error-kind classification"); + + let entries = snapshotter.snapshot().into_vec(); + let max_retries = entries.iter().find_map(|(key, _, _, val)| { + if key.key().name() != crate::metrics::WRITER_ERRORS_TOTAL { + return None; + } + let is_max_retries = key.key().labels().any(|l| { + l.key() == crate::metrics::LABEL_ERROR_KIND + && l.value() == crate::metrics::WRITER_ERROR_KIND_MAX_RETRIES_EXCEEDED + }); + if !is_max_retries { + return None; + } + match val { + DebugValue::Counter(v) => Some(*v), + _ => None, + } + }); + assert_eq!(max_retries, Some(1)); + } + #[test] fn record_request_batch_metrics_emits_per_batch_send_stats() { use metrics_util::debugging::{DebugValue, DebuggingRecorder}; diff --git a/crates/fluss/src/metrics.rs b/crates/fluss/src/metrics.rs index e707f0e6..36958ded 100644 --- a/crates/fluss/src/metrics.rs +++ b/crates/fluss/src/metrics.rs @@ -35,6 +35,11 @@ pub const LABEL_API_KEY: &str = "api_key"; pub const LABEL_DATABASE: &str = "database"; pub const LABEL_TABLE: &str = "table"; +/// Classifies an error counter sample. See the per-metric docs below for the +/// value set each counter uses (writer terminal failures vs scanner fetch +/// errors). +pub const LABEL_ERROR_KIND: &str = "error_kind"; + // --------------------------------------------------------------------------- // Connection / RPC metrics // @@ -54,6 +59,30 @@ pub const CLIENT_BYTES_RECEIVED_TOTAL: &str = "fluss.client.bytes_received.total pub const CLIENT_REQUEST_LATENCY_MS: &str = "fluss.client.request_latency_ms"; pub const CLIENT_REQUESTS_IN_FLIGHT: &str = "fluss.client.requests_in_flight"; +// --------------------------------------------------------------------------- +// Client-level error / refresh tracking metrics +// +// Java tracks request failures on the +// server (`TableMetricGroup.failed*RequestsPerSecond`, `RequestsMetrics`), +// which a client cannot observe. Those added here give +// operators a client-side error rate for alerting (e.g. +// `rate(fluss_client_connections_poisoned_total)`). +// +// All three are unlabeled global-per-process counters, matching the unlabeled +// convention already used by the connection metrics above. +// --------------------------------------------------------------------------- + +/// Counter: total client connections that transitioned to the poisoned state +/// (one increment per connection death, not per failed in-flight request). +pub const CLIENT_CONNECTIONS_POISONED_TOTAL: &str = "fluss.client.connections_poisoned.total"; + +/// Counter: total metadata refresh attempts (one per `update_tables_metadata` +/// call). `refreshes_total - errors_total` is the success count. +pub const CLIENT_METADATA_REFRESHES_TOTAL: &str = "fluss.client.metadata_refreshes.total"; + +/// Counter: total metadata refresh failures. +pub const CLIENT_METADATA_ERRORS_TOTAL: &str = "fluss.client.metadata_errors.total"; + // --------------------------------------------------------------------------- // Scanner poll-timing metrics // @@ -129,6 +158,22 @@ pub const SCANNER_REMOTE_FETCH_BYTES_TOTAL: &str = "fluss.client.scanner.remote_ pub const SCANNER_REMOTE_FETCH_ERRORS_TOTAL: &str = "fluss.client.scanner.remote_fetch_errors.total"; +/// Counter: total FetchLog errors, labeled by `error_kind`: +/// * `rpc` — the FetchLog RPC itself returned an error (transport / leader +/// lookup failure), counted once per failed request. +/// * `bucket` — the FetchLog response succeeded but carried a per-bucket +/// error code, counted once per erroring bucket. +/// +/// Java's `LogFetcher` only logs fetch errors and invalidates metadata. +/// Distinct from `SCANNER_REMOTE_FETCH_ERRORS_TOTAL`, +/// which counts remote-storage download failures. +pub const SCANNER_ERRORS_TOTAL: &str = "fluss.client.scanner.errors.total"; + +/// `error_kind` value for a failed FetchLog RPC (transport / leader lookup). +pub(crate) const SCANNER_ERROR_KIND_RPC: &str = "rpc"; +/// `error_kind` value for a per-bucket error code in a successful response. +pub(crate) const SCANNER_ERROR_KIND_BUCKET: &str = "bucket"; + // --------------------------------------------------------------------------- // Per-table scanner metric handles // --------------------------------------------------------------------------- @@ -151,6 +196,13 @@ pub const SCANNER_REMOTE_FETCH_ERRORS_TOTAL: &str = /// the `metrics::with_local_recorder(...)` closure. With no recorder /// installed, all `record_*` calls are zero-overhead no-ops. pub(crate) struct ScannerMetrics { + // Owned label values for metrics whose label set varies per call (the + // `errors_total` counter, keyed additionally by `error_kind`). The cached + // handles below already bake `(database, table)` in, but a varying + // `error_kind` would need one field per kind; instead `record_error` + // resolves the handle at its (cold) callsite using these. + database: String, + table: String, time_between_poll_ms: metrics::Gauge, poll_idle_ratio: metrics::Gauge, last_poll_seconds_ago: metrics::Gauge, @@ -169,6 +221,8 @@ impl ScannerMetrics { let database = table_path.database(); let table = table_path.table(); Self { + database: database.to_string(), + table: table.to_string(), time_between_poll_ms: scanner_gauge(SCANNER_TIME_BETWEEN_POLL_MS, database, table), poll_idle_ratio: scanner_gauge(SCANNER_POLL_IDLE_RATIO, database, table), last_poll_seconds_ago: scanner_gauge(SCANNER_LAST_POLL_SECONDS_AGO, database, table), @@ -228,6 +282,20 @@ impl ScannerMetrics { pub(crate) fn record_remote_fetch_error(&self) { self.remote_fetch_errors_total.increment(1); } + + /// Record one FetchLog error. `error_kind` is one of + /// [`SCANNER_ERROR_KIND_RPC`] / [`SCANNER_ERROR_KIND_BUCKET`]. Resolved at + /// the callsite (cold path) rather than cached, to avoid one struct field + /// per kind. + pub(crate) fn record_error(&self, error_kind: &'static str) { + metrics::counter!( + SCANNER_ERRORS_TOTAL, + LABEL_DATABASE => self.database.clone(), + LABEL_TABLE => self.table.clone(), + LABEL_ERROR_KIND => error_kind, + ) + .increment(1); + } } // Per-table scanner handle factories. These centralize the @@ -295,6 +363,27 @@ pub const WRITER_BYTES_SEND_TOTAL: &str = "fluss.client.writer.bytes_send.total" /// Counter: total records re-enqueued for retry. pub const WRITER_RECORDS_RETRY_TOTAL: &str = "fluss.client.writer.records_retry.total"; +/// Counter: total batches that terminally failed, labeled by `error_kind`: +/// * `non_retriable` — server returned a non-retriable error. +/// * `max_retries_exceeded` — a retriable error that exhausted its retries. +/// * `writer_id_changed` — idempotent retry abandoned after a writer-id reset. +/// * `local_build` — batch failed to build locally (never sent). +/// +/// Counts terminal failures only; retries are tracked by +/// `WRITER_RECORDS_RETRY_TOTAL`, so there is no double counting. +/// Carries only `error_kind` (no table label), matching the +/// unlabeled writer-metric convention. +pub const WRITER_ERRORS_TOTAL: &str = "fluss.client.writer.errors.total"; + +/// `error_kind` value for a non-retriable server error. +pub(crate) const WRITER_ERROR_KIND_NON_RETRIABLE: &str = "non_retriable"; +/// `error_kind` value for a retriable error that exhausted its retries. +pub(crate) const WRITER_ERROR_KIND_MAX_RETRIES_EXCEEDED: &str = "max_retries_exceeded"; +/// `error_kind` value for an idempotent retry abandoned after a writer-id reset. +pub(crate) const WRITER_ERROR_KIND_WRITER_ID_CHANGED: &str = "writer_id_changed"; +/// `error_kind` value for a batch that failed to build locally (never sent). +pub(crate) const WRITER_ERROR_KIND_LOCAL_BUILD: &str = "local_build"; + /// Histogram: records per sent batch. pub const WRITER_RECORDS_PER_BATCH: &str = "fluss.client.writer.records_per_batch"; @@ -389,6 +478,13 @@ impl WriterMetrics { self.buffer_available_bytes.set(available_bytes as f64); self.buffer_waiting_threads.set(waiting_threads as f64); } + + /// Record one terminal batch failure. `error_kind` is one of the + /// `WRITER_ERROR_KIND_*` values. Resolved at the callsite (cold path) + /// rather than cached, to avoid one struct field per kind. + pub(crate) fn record_error(&self, error_kind: &'static str) { + metrics::counter!(WRITER_ERRORS_TOTAL, LABEL_ERROR_KIND => error_kind).increment(1); + } } /// Returns a label value for reportable API keys, matching Java's @@ -841,6 +937,121 @@ mod tests { ); } + #[test] + fn error_retry_metric_names_follow_convention() { + for name in [ + CLIENT_CONNECTIONS_POISONED_TOTAL, + CLIENT_METADATA_REFRESHES_TOTAL, + CLIENT_METADATA_ERRORS_TOTAL, + WRITER_ERRORS_TOTAL, + SCANNER_ERRORS_TOTAL, + ] { + assert!(!name.is_empty()); + assert!( + name.starts_with("fluss.client."), + "{name} must use the fluss.client. prefix" + ); + } + } + + #[test] + fn writer_error_metrics_classify_by_kind() { + use std::collections::HashMap; + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + let m = WriterMetrics::new(); + m.record_error(WRITER_ERROR_KIND_NON_RETRIABLE); + m.record_error(WRITER_ERROR_KIND_NON_RETRIABLE); + m.record_error(WRITER_ERROR_KIND_MAX_RETRIES_EXCEEDED); + m.record_error(WRITER_ERROR_KIND_WRITER_ID_CHANGED); + m.record_error(WRITER_ERROR_KIND_LOCAL_BUILD); + }); + + let snapshot = snapshotter.snapshot(); + let entries: Vec<_> = snapshot.into_vec(); + + let mut by_kind: HashMap = HashMap::new(); + for (key, _, _, val) in &entries { + if key.key().name() != WRITER_ERRORS_TOTAL { + continue; + } + // Writer errors carry only the error_kind label (no table label). + assert_eq!( + key.key().labels().count(), + 1, + "writer error metric must carry exactly the error_kind label" + ); + let kind = key + .key() + .labels() + .find(|l| l.key() == LABEL_ERROR_KIND) + .map(|l| l.value().to_string()) + .expect("writer error metric must carry error_kind"); + if let metrics_util::debugging::DebugValue::Counter(v) = val { + by_kind.insert(kind, *v); + } + } + + assert_eq!(by_kind.get("non_retriable"), Some(&2)); + assert_eq!(by_kind.get("max_retries_exceeded"), Some(&1)); + assert_eq!(by_kind.get("writer_id_changed"), Some(&1)); + assert_eq!(by_kind.get("local_build"), Some(&1)); + } + + #[test] + fn scanner_error_metrics_separate_rpc_and_bucket() { + use std::collections::HashMap; + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + let table_path = TablePath::new("db", "tbl"); + let m = ScannerMetrics::new(&table_path); + m.record_error(SCANNER_ERROR_KIND_RPC); + m.record_error(SCANNER_ERROR_KIND_BUCKET); + m.record_error(SCANNER_ERROR_KIND_BUCKET); + }); + + let snapshot = snapshotter.snapshot(); + let entries: Vec<_> = snapshot.into_vec(); + + let mut by_kind: HashMap = HashMap::new(); + for (key, _, _, val) in &entries { + if key.key().name() != SCANNER_ERRORS_TOTAL { + continue; + } + // Scanner errors stay table-labeled, plus error_kind. + let has_db = key + .key() + .labels() + .any(|l| l.key() == LABEL_DATABASE && l.value() == "db"); + let has_table = key + .key() + .labels() + .any(|l| l.key() == LABEL_TABLE && l.value() == "tbl"); + assert!( + has_db && has_table, + "scanner error metric must carry database + table labels" + ); + let kind = key + .key() + .labels() + .find(|l| l.key() == LABEL_ERROR_KIND) + .map(|l| l.value().to_string()) + .expect("scanner error metric must carry error_kind"); + if let metrics_util::debugging::DebugValue::Counter(v) = val { + by_kind.insert(kind, *v); + } + } + + assert_eq!(by_kind.get("rpc"), Some(&1)); + assert_eq!(by_kind.get("bucket"), Some(&2)); + } + /// Writer metrics carry no labels. #[test] fn writer_metrics_are_unlabeled() { diff --git a/crates/fluss/src/rpc/server_connection.rs b/crates/fluss/src/rpc/server_connection.rs index 762c7ea9..9529f6b6 100644 --- a/crates/fluss/src/rpc/server_connection.rs +++ b/crates/fluss/src/rpc/server_connection.rs @@ -18,9 +18,9 @@ use crate::cluster::{ServerNode, ServerType}; use crate::error::Error; use crate::metrics::{ - CLIENT_BYTES_RECEIVED_TOTAL, CLIENT_BYTES_SENT_TOTAL, CLIENT_REQUEST_LATENCY_MS, - CLIENT_REQUESTS_IN_FLIGHT, CLIENT_REQUESTS_TOTAL, CLIENT_RESPONSES_TOTAL, LABEL_API_KEY, - api_key_label, + CLIENT_BYTES_RECEIVED_TOTAL, CLIENT_BYTES_SENT_TOTAL, CLIENT_CONNECTIONS_POISONED_TOTAL, + CLIENT_REQUEST_LATENCY_MS, CLIENT_REQUESTS_IN_FLIGHT, CLIENT_REQUESTS_TOTAL, + CLIENT_RESPONSES_TOTAL, LABEL_API_KEY, api_key_label, }; use crate::proto::PbApiVersion; use crate::rpc::api_key::ApiKey; @@ -414,6 +414,11 @@ impl ConnectionState { Self::RequestMap(map) => { let err = Arc::new(err); + // Counted on the live -> poisoned transition only, so the value + // reflects connection deaths rather than how many callers + // observed the poison (the `Self::Poison` arm below is re-entry). + metrics::counter!(CLIENT_CONNECTIONS_POISONED_TOTAL).increment(1); + // inform all active requests for (_request_id, active_request) in map.drain() { // it's OK if the other side is gone @@ -985,6 +990,30 @@ mod tests { // -- Tests ----------------------------------------------------------- + #[tokio::test] + async fn poison_counts_once_per_connection_death() { + let _test_guard = test_lock().lock().await; + let snapshotter = test_snapshotter(); + + let before: Vec<_> = snapshotter.snapshot().into_vec(); + let poisoned_before = counter_sum(&before, CLIENT_CONNECTIONS_POISONED_TOTAL); + + let mut state = ConnectionState::RequestMap(HashMap::new()); + // First call transitions live -> poisoned (counted); the second is + // re-entry on the already-poisoned state and must NOT be counted. + state.poison(RpcError::ConnectionError("boom".to_string())); + state.poison(RpcError::ConnectionError("boom again".to_string())); + + let after: Vec<_> = snapshotter.snapshot().into_vec(); + let poisoned_after = counter_sum(&after, CLIENT_CONNECTIONS_POISONED_TOTAL); + + assert_eq!( + poisoned_after - poisoned_before, + 1, + "poison() must increment the counter exactly once per connection death" + ); + } + #[tokio::test] async fn request_records_metrics_for_reportable_api_key() { let _test_guard = test_lock().lock().await; diff --git a/crates/fluss/tests/integration/metrics.rs b/crates/fluss/tests/integration/metrics.rs new file mode 100644 index 00000000..5c8404bc --- /dev/null +++ b/crates/fluss/tests/integration/metrics.rs @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#[cfg(test)] +mod metrics_test { + use crate::integration::utils::get_shared_cluster; + use fluss::metadata::TablePath; + use fluss::metrics::{CLIENT_METADATA_ERRORS_TOTAL, CLIENT_METADATA_REFRESHES_TOTAL}; + use metrics_util::debugging::{DebugValue, DebuggingRecorder}; + + fn counter_value( + entries: &[( + metrics_util::CompositeKey, + Option, + Option, + DebugValue, + )], + name: &str, + ) -> u64 { + entries + .iter() + .find_map(|(key, _, _, value)| { + if key.key().name() != name { + return None; + } + match value { + DebugValue::Counter(v) => Some(*v), + _ => None, + } + }) + .unwrap_or(0) + } + + #[test] + fn metadata_refresh_error_metrics_increment_for_failed_update() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + let before = snapshotter.snapshot().into_vec(); + let refresh_before = counter_value(&before, CLIENT_METADATA_REFRESHES_TOTAL); + let errors_before = counter_value(&before, CLIENT_METADATA_ERRORS_TOTAL); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("runtime"); + + metrics::with_local_recorder(&recorder, || { + rt.block_on(async { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let missing_table = TablePath::new("fluss", "metrics_missing_table"); + let result = connection + .get_metadata() + .update_table_metadata(&missing_table) + .await; + assert!(result.is_err(), "missing-table metadata update should fail"); + }) + }); + + let after = snapshotter.snapshot().into_vec(); + let refresh_after = counter_value(&after, CLIENT_METADATA_REFRESHES_TOTAL); + let errors_after = counter_value(&after, CLIENT_METADATA_ERRORS_TOTAL); + + assert_eq!(refresh_after - refresh_before, 1); + assert_eq!(errors_after - errors_before, 1); + } +} diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs index 2d2bd152..7767d5ad 100644 --- a/crates/fluss/tests/test_fluss.rs +++ b/crates/fluss/tests/test_fluss.rs @@ -25,6 +25,7 @@ mod integration { mod fluss_cluster; mod kv_table; mod log_table; + mod metrics; mod record_batch_log_reader; mod sasl_auth;