Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 93 additions & 25 deletions crates/fluss/src/client/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::PartitionId;
use crate::cluster::{Cluster, ServerNode, ServerType};
use crate::error::{Error, FlussError, Result};
use crate::metadata::{PhysicalTablePath, TableBucket, TablePath};
use crate::metrics::{CLIENT_METADATA_ERRORS_TOTAL, CLIENT_METADATA_REFRESHES_TOTAL};
use crate::proto::MetadataResponse;
use crate::rpc::message::UpdateMetadataRequest;
use crate::rpc::{RpcClient, ServerConnection};
Expand Down Expand Up @@ -149,33 +150,43 @@ impl Metadata {
physical_table_paths: &HashSet<&Arc<PhysicalTablePath>>,
partition_ids: Vec<i64>,
) -> Result<()> {
let maybe_server = {
let guard = self.cluster.read();
guard.get_one_available_server().cloned()
};

let server = match maybe_server {
Some(s) => s,
None => {
info!(
"No available tablet server to update metadata, attempting to re-initialize cluster using bootstrap server."
);
self.reinit_cluster().await?;
return Ok(());
}
};
metrics::counter!(CLIENT_METADATA_REFRESHES_TOTAL).increment(1);
// Run the refresh in an inner block so every failure path (no-server
// reinit, connection, RPC, apply) increments the error counter once.
let result: Result<()> = async {
let maybe_server = {
let guard = self.cluster.read();
guard.get_one_available_server().cloned()
};

let server = match maybe_server {
Some(s) => s,
None => {
info!(
"No available tablet server to update metadata, attempting to re-initialize cluster using bootstrap server."
);
self.reinit_cluster().await?;
return Ok(());
}
};

let conn = self.connections.get_connection(&server).await?;
let conn = self.connections.get_connection(&server).await?;

let response = conn
.request(UpdateMetadataRequest::new(
table_paths,
physical_table_paths,
partition_ids,
))
.await?;
self.update(response).await?;
Ok(())
let response = conn
.request(UpdateMetadataRequest::new(
table_paths,
physical_table_paths,
partition_ids,
))
.await?;
self.update(response).await?;
Ok(())
}
.await;
if result.is_err() {
metrics::counter!(CLIENT_METADATA_ERRORS_TOTAL).increment(1);
}
result
}

pub async fn update_table_metadata(&self, table_path: &TablePath) -> Result<()> {
Expand Down Expand Up @@ -338,6 +349,63 @@ mod tests {
assert!(cluster.get_tablet_server(1).is_none());
}

#[test]
fn metadata_refresh_and_error_counters_increment() {
use crate::cluster::Cluster;
use metrics_util::debugging::{DebugValue, DebuggingRecorder};

let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

let table_path = TablePath::new("db".to_string(), "tbl".to_string());
// An empty cluster has no available server, so the refresh falls back to
// `reinit_cluster`, which fails fast in `parse_bootstrap` against the
// empty test bootstrap — no network, fully deterministic.
let metadata = Metadata::new_for_test(Arc::new(Cluster::default()));

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

// `block_on` keeps all `counter!` calls on this thread so the
// thread-local recorder installed by `with_local_recorder` sees them.
metrics::with_local_recorder(&recorder, || {
rt.block_on(async {
let result = metadata.update_table_metadata(&table_path).await;
assert!(result.is_err(), "refresh with empty bootstrap must fail");
});
});

let entries: Vec<_> = snapshotter.snapshot().into_vec();
let counter = |name: &str| -> u64 {
entries
.iter()
.find_map(|(key, _, _, val)| {
if key.key().name() == name {
match val {
DebugValue::Counter(v) => Some(*v),
_ => None,
}
} else {
None
}
})
.unwrap_or(0)
};

assert_eq!(
counter(CLIENT_METADATA_REFRESHES_TOTAL),
1,
"one refresh attempt should be counted"
);
assert_eq!(
counter(CLIENT_METADATA_ERRORS_TOTAL),
1,
"the failed refresh should be counted as an error"
);
}

#[test]
fn parse_bootstrap_variants() {
// valid IP
Expand Down
4 changes: 3 additions & 1 deletion crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::error::{ApiError, Error, FlussError, Result};
use crate::metadata::{
LogFormat, PhysicalTablePath, RowType, SchemaInfo, TableBucket, TableInfo, TablePath,
};
use crate::metrics::ScannerMetrics;
use crate::metrics::{SCANNER_ERROR_KIND_BUCKET, SCANNER_ERROR_KIND_RPC, ScannerMetrics};
use crate::proto::{
ErrorResponse, FetchLogRequest, FetchLogResponse, PbFetchLogReqForBucket, PbFetchLogReqForTable,
};
Expand Down Expand Up @@ -1431,6 +1431,7 @@ impl LogFetcher {
warn!(
"Retrying after error fetching log from destination node {server_node:?}: {e:?}"
);
metrics.record_error(SCANNER_ERROR_KIND_RPC);
Self::handle_fetch_failure(metadata, &leader, &fetch_request).await;
return;
}
Expand Down Expand Up @@ -1511,6 +1512,7 @@ impl LogFetcher {
if let Some(error_code) = fetch_log_for_bucket.error_code
&& error_code != FlussError::None.code()
{
metrics.record_error(SCANNER_ERROR_KIND_BUCKET);
let api_error: ApiError = ErrorResponse {
error_code,
error_message: fetch_log_for_bucket.error_message.clone(),
Expand Down
101 changes: 94 additions & 7 deletions crates/fluss/src/client/write/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use crate::client::{ReadyWriteBatch, RecordAccumulator};
use crate::error::Error::UnexpectedError;
use crate::error::{FlussError, Result};
use crate::metadata::{PhysicalTablePath, TableBucket, TablePath};
use crate::metrics::WriterMetrics;
use crate::metrics::{
WRITER_ERROR_KIND_LOCAL_BUILD, WRITER_ERROR_KIND_MAX_RETRIES_EXCEEDED,
WRITER_ERROR_KIND_NON_RETRIABLE, WRITER_ERROR_KIND_WRITER_ID_CHANGED, WriterMetrics,
};
use crate::proto::{
PbProduceLogRespForBucket, PbPutKvRespForBucket, PbTablePath, ProduceLogResponse, PutKvResponse,
};
Expand Down Expand Up @@ -581,7 +584,9 @@ impl Sender {
error: broadcast::Error,
fluss_error: Option<FlussError>,
adjust_sequences: bool,
error_kind: &'static str,
) {
self.metrics.record_error(error_kind);
if self.idempotence_manager.is_enabled()
&& ready_write_batch.write_batch.batch_sequence() != NO_BATCH_SEQUENCE
{
Expand Down Expand Up @@ -642,6 +647,7 @@ impl Sender {
},
None,
true,
WRITER_ERROR_KIND_LOCAL_BUILD,
);
}
Ok(())
Expand Down Expand Up @@ -715,6 +721,7 @@ impl Sender {
},
Some(FlussError::UnknownWriterIdException),
false,
WRITER_ERROR_KIND_WRITER_ID_CHANGED,
);
return Ok(
Self::is_invalid_metadata_error(error).then_some(physical_table_path)
Expand All @@ -731,6 +738,16 @@ impl Sender {
// reset all writer state internally (matching Java).
// For other errors, only adjust sequences if the batch didn't exhaust its retries.
let can_adjust = ready_write_batch.write_batch.attempts() < self.retries;
// `max_retries_exceeded` means retryable-in-principle AND exhausted retry
// budget. Retryable-in-principle includes idempotence-specific cases
// (`can_retry_for_error`), not only `is_retriable_error`.
let retriable_in_principle = self.is_retriable_in_principle(&ready_write_batch, error);
let exhausted_retries = ready_write_batch.write_batch.attempts() >= self.retries;
let error_kind = if exhausted_retries && retriable_in_principle {
WRITER_ERROR_KIND_MAX_RETRIES_EXCEEDED
} else {
WRITER_ERROR_KIND_NON_RETRIABLE
};
self.fail_batch(
ready_write_batch,
broadcast::Error::WriteFailed {
Expand All @@ -739,6 +756,7 @@ impl Sender {
},
Some(error),
can_adjust,
error_kind,
);
Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path))
}
Expand Down Expand Up @@ -785,20 +803,26 @@ impl Sender {
{
return false;
}
self.is_retriable_in_principle(ready_write_batch, error)
}

fn is_retriable_in_principle(
&self,
ready_write_batch: &ReadyWriteBatch,
error: FlussError,
) -> bool {
if Self::is_retriable_error(error) {
return true;
}
// Idempotent-specific retry logic
let seq = ready_write_batch.write_batch.batch_sequence();
if self.idempotence_manager.is_enabled() && seq != NO_BATCH_SEQUENCE {
return self.idempotence_manager.can_retry_for_error(
self.idempotence_manager.is_enabled()
&& seq != NO_BATCH_SEQUENCE
&& self.idempotence_manager.can_retry_for_error(
&ready_write_batch.table_bucket,
seq,
ready_write_batch.write_batch.batch_id(),
error,
);
}
false
)
}

async fn update_metadata_if_needed(
Expand Down Expand Up @@ -1216,6 +1240,69 @@ mod tests {
assert_eq!(retry_total, Some(1));
}

#[test]
fn exhausted_idempotence_retriable_error_is_classified_as_max_retries_exceeded() {
use metrics_util::debugging::{DebugValue, DebuggingRecorder};

let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

let result: Result<()> = metrics::with_local_recorder(&recorder, || {
let table_path = Arc::new(TablePath::new("db".to_string(), "tbl".to_string()));
let cluster = build_cluster_arc(table_path.as_ref(), 1, 1);
let metadata = Arc::new(Metadata::new_for_test(cluster.clone()));
let idempotence = enabled_idempotence();
idempotence.set_writer_id(42);
let accumulator = Arc::new(RecordAccumulator::new(
Config::default(),
Arc::clone(&idempotence),
));
// retries=0 forces the terminal path while this error remains
// retryable-in-principle through idempotence_manager.can_retry_for_error.
let sender = Sender::new(
metadata,
accumulator.clone(),
1024 * 1024,
1000,
-1,
0,
idempotence,
Arc::new(crate::metrics::WriterMetrics::new()),
);

let (mut batch, _handle) =
build_ready_batch(accumulator.as_ref(), cluster.clone(), table_path.clone())?;
// OOS is idempotence-retriable when the sequence is not the next expected.
batch.write_batch.set_writer_state(42, 1);
sender.handle_write_batch_error(
batch,
FlussError::OutOfOrderSequenceException,
"out of order".to_string(),
)?;
Ok(())
});
result.expect("error-kind classification");

let entries = snapshotter.snapshot().into_vec();
let max_retries = entries.iter().find_map(|(key, _, _, val)| {
if key.key().name() != crate::metrics::WRITER_ERRORS_TOTAL {
return None;
}
let is_max_retries = key.key().labels().any(|l| {
l.key() == crate::metrics::LABEL_ERROR_KIND
&& l.value() == crate::metrics::WRITER_ERROR_KIND_MAX_RETRIES_EXCEEDED
});
if !is_max_retries {
return None;
}
match val {
DebugValue::Counter(v) => Some(*v),
_ => None,
}
});
assert_eq!(max_retries, Some(1));
}

#[test]
fn record_request_batch_metrics_emits_per_batch_send_stats() {
use metrics_util::debugging::{DebugValue, DebuggingRecorder};
Expand Down
Loading