Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 30 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Key concepts:
| **Data Types** | Int, BigInt, String, Float, Double, Boolean, Bytes, Decimal, Date, Time, Timestamp, TimestampLTZ, Char, Binary |
| **Config** | Batch sizing, buffering, retries, compression, timeouts, prefetch, concurrency |
| **Storage** | Memory, Filesystem, S3, OSS (via [OpenDAL](https://opendal.apache.org/)) |
| **Observability** | Connection, writer, and scanner [metrics](https://clients.fluss.apache.org/docs/user-guide/rust/metrics/) via the [`metrics`](https://docs.rs/metrics) facade (Prometheus, StatsD, etc.) |
| **WASM** | Compiles for `wasm32` target |

### Language Bindings
Expand All @@ -83,7 +84,7 @@ fluss-rust/
│ │ ├── src/row/ # GenericRow, InternalRow, Arrow integration
│ │ ├── src/rpc/ # gRPC transport layer
│ │ └── src/config.rs # Client configuration
│ ├── examples/ # 5 runnable examples (log, KV, partitioned, prefix lookup)
│ ├── examples/ # runnable examples (log, KV, partitioned, prefix lookup, metrics)
│ └── fluss-test-cluster/ # Test harness for integration tests
├── bindings/
│ ├── python/ # Python binding (PyO3)
Expand Down Expand Up @@ -236,6 +237,7 @@ async fn main() -> Result<()> {
| `example-partitioned-upsert-lookup` | KV table with partitions |
| `example-prefix-lookup` | Prefix lookup on bucket keys |
| `example-partitioned-prefix-lookup` | Prefix lookup on partitioned tables |
| `example-prometheus-metrics` | Expose client metrics on a Prometheus endpoint |

Build and run any example:

Expand Down
6 changes: 6 additions & 0 deletions crates/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ version = { workspace = true }
fluss = { workspace = true, features = ["storage-all"] }
tokio = { workspace = true }
clap = { workspace = true }
metrics = { workspace = true }
metrics-exporter-prometheus = { version = "0.17", default-features = false, features = ["http-listener"] }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"
Expand All @@ -50,3 +52,7 @@ path = "src/example_prefix_lookup.rs"
[[example]]
name = "example-partitioned-prefix-lookup"
path = "src/example_partitioned_prefix_lookup.rs"

[[example]]
name = "example-prometheus-metrics"
path = "src/example_prometheus_metrics.rs"
136 changes: 136 additions & 0 deletions crates/examples/src/example_prometheus_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Exposes Fluss client metrics on a Prometheus scrape endpoint.
//!
//! Run a local cluster, then:
//! ```shell
//! cargo run -p fluss-examples --example example-prometheus-metrics
//! curl http://localhost:9000/metrics
//! ```
//! The endpoint exposes `fluss_client_writer_*`, `fluss_client_scanner_*`, and
//! `fluss_client_requests_*` series produced by the workload below.
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

use clap::Parser;
use fluss::client::FlussConnection;
use fluss::config::Config;
use fluss::error::Result;
use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath};
use fluss::row::{DataGetters, GenericRow};
use metrics_exporter_prometheus::PrometheusBuilder;
use std::time::Duration;

#[tokio::main]
pub async fn main() -> Result<()> {
// Install the global Prometheus recorder BEFORE creating any connection,
// writer, or scanner: the client caches metric handles on first use and
// binds them to whichever recorder is installed at that moment.
//
// `build()` (rather than `install()`) hands back a `PrometheusHandle` so the
// example can read its own metrics back and self-verify; the returned
// exporter future runs the HTTP scrape endpoint.
let (recorder, exporter) = PrometheusBuilder::new()
.with_http_listener(([0, 0, 0, 0], 9000))
.build()
.expect("failed to build Prometheus recorder");
let metrics_handle = recorder.handle();
metrics::set_global_recorder(recorder).expect("failed to install global recorder");
tokio::spawn(exporter);
println!("Metrics exposed on http://localhost:9000/metrics");

let mut config = Config::parse();
config.bootstrap_servers = "127.0.0.1:9123".to_string();

let conn = FlussConnection::new(config).await?;
let admin = conn.get_admin()?;

let table_path = TablePath::new("fluss", "rust_prometheus_metrics");
let table_descriptor = TableDescriptor::builder()
.schema(
Schema::builder()
.column("id", DataTypes::int())
.column("message", DataTypes::string())
.build()?,
)
.build()?;
admin
.create_table(&table_path, &table_descriptor, true)
.await?;

let table = conn.get_table(&table_path).await?;
let append_writer = table.new_append()?.create_writer()?;
let log_scanner = table.new_scan().create_log_scanner()?;
log_scanner.subscribe(0, 0).await?;

// Continuously write and read so the metrics keep updating and can be
// observed over multiple Prometheus scrapes.
let rows_per_iter = 100;
let mut id = 0i32;
let mut verified = false;
loop {
for _ in 0..rows_per_iter {
let mut row = GenericRow::new(2);
row.set_field(0, id);
row.set_field(1, "metrics demo");
append_writer.append(&row)?;
id += 1;
}
append_writer.flush().await?;

let scan_records = log_scanner.poll(Duration::from_secs(1)).await?;
let mut count = 0;
for record in scan_records {
let row = record.row();
let _ = (row.get_int(0)?, row.get_string(1)?, record.offset());
count += 1;
}
println!(
"appended {rows_per_iter} rows, polled {count} rows; scrape /metrics to see counters"
);

// After the first flush every appended row has been acknowledged, so the
// writer counter must have advanced by at least the rows we sent (retries
// can push it higher). This proves the recorder is wired up correctly.
if !verified {
let rendered = metrics_handle.render();
let sent = counter_value(&rendered, "fluss_client_writer_records_send_total");
assert!(
sent.is_some_and(|v| v >= rows_per_iter as f64),
"expected fluss_client_writer_records_send_total >= {rows_per_iter}, got {sent:?}\n{rendered}"
);
println!("self-check OK: records_send_total = {}", sent.unwrap());
verified = true;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to break out of the loop once verification is done? As is, the example would run indefinitely (is that your intention?)

}

tokio::time::sleep(Duration::from_secs(1)).await;
}
}

/// Parse the value of an unlabeled counter/gauge line from rendered Prometheus
/// exposition text (lines shaped `metric_name <value>`).
fn counter_value(rendered: &str, name: &str) -> Option<f64> {
let prefix = format!("{name} ");
rendered
.lines()
.find(|line| line.starts_with(&prefix))
.and_then(|line| line.rsplit(' ').next())
.and_then(|value| value.parse().ok())
}
Loading
Loading