Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/couchbase-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ env_logger = "0.11.9"
envconfig = "0.11.1"
log = "0.4.29"
serial_test = "3.4"
tracing-subscriber = "0.3.22"

couchbase-connstr = { path = "../couchbase-connstr", version = "1.0.0-beta.1" }

Expand Down
11 changes: 0 additions & 11 deletions sdk/couchbase-core/src/options/orphan_reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,15 @@
*
*/

use std::sync::Arc;
use std::time::Duration;

#[derive(Clone)]
#[non_exhaustive]
pub struct OrphanReporterConfig {
pub reporter_interval: Duration,
pub sample_size: usize,
pub log_sink: Option<Arc<OrphanSinkFn>>,
}

// Type to capture orphan reporter output, primarily used for testing currently
pub type OrphanSinkFn = dyn Fn(&str) + Send + Sync + 'static;

impl OrphanReporterConfig {
pub fn reporter_interval(mut self, reporter_interval: Duration) -> Self {
self.reporter_interval = reporter_interval;
Expand All @@ -40,19 +35,13 @@ impl OrphanReporterConfig {
self.sample_size = sample_size;
self
}

pub fn log_sink(mut self, log_sink: Arc<OrphanSinkFn>) -> Self {
self.log_sink = Some(log_sink);
self
}
}

impl Default for OrphanReporterConfig {
fn default() -> Self {
Self {
reporter_interval: Duration::from_secs(10),
sample_size: 10,
log_sink: None,
}
}
}
19 changes: 3 additions & 16 deletions sdk/couchbase-core/src/orphan_reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use crate::memdx::extframe::decode_res_ext_frames;
use crate::memdx::packet::ResponsePacket;
use crate::options::orphan_reporter::{OrphanReporterConfig, OrphanSinkFn};
use crate::options::orphan_reporter::OrphanReporterConfig;
use serde_json::json;
use std::cmp::Reverse;
use std::collections::BinaryHeap;
Expand All @@ -30,7 +30,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::time::{interval_at, Instant, MissedTickBehavior};
use tracing::{debug, trace, warn};
use tracing::{trace, warn};

#[derive(Debug, Clone)]
pub struct OrphanContext {
Expand Down Expand Up @@ -119,15 +119,13 @@ pub struct OrphanReporter {
heap: Arc<RwLock<BinaryHeap<Reverse<OrphanLogItem>>>>,
sample_size: usize,
reporter_interval: Duration,
log_sink: Option<Arc<OrphanSinkFn>>,
}

impl OrphanReporter {
pub fn new(config: OrphanReporterConfig) -> Self {
let heap = Arc::new(RwLock::new(BinaryHeap::with_capacity(config.sample_size)));
let total_count = Arc::new(AtomicU64::new(0));

let log_sink = config.log_sink.clone();
let heap_clone = heap.clone();
let total_count_clone = total_count.clone();

Expand All @@ -150,12 +148,7 @@ impl OrphanReporter {
}
let mut write_guard = heap_clone.write().unwrap();
let obj = Self::create_log_object("kv".to_string(), mem::take(&mut write_guard), count);
let msg = format!("Orphaned responses observed: {}", obj);
if let Some(ref sink) = log_sink {
sink(&msg);
} else {
debug!("{}", msg);
}
warn!("Orphaned responses observed: {}", obj);
}
}
}
Expand All @@ -165,7 +158,6 @@ impl OrphanReporter {
heap,
sample_size: config.sample_size,
reporter_interval: config.reporter_interval,
log_sink: config.log_sink,
}
}

Expand Down Expand Up @@ -250,9 +242,4 @@ impl OrphanReporter {
services.insert(service, entry);
OrphanLogService(services)
}

pub fn with_sink_fn(mut self, sink: Arc<OrphanSinkFn>) -> Self {
self.log_sink = Some(sink);
self
}
}
167 changes: 91 additions & 76 deletions sdk/couchbase-core/tests/orphan_reporter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ use couchbase_core::options::orphan_reporter::OrphanReporterConfig;
use couchbase_core::options::waituntilready::WaitUntilReadyOptions;
use couchbase_core::orphan_reporter::{OrphanContext, OrphanReporter};
use couchbase_core::retryfailfast::FailFastRetryStrategy;
use std::io::Write;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing_subscriber::layer::SubscriberExt;

mod common;

Expand All @@ -46,17 +48,47 @@ fn make_server_duration_frame(micros: u16) -> Vec<u8> {
buf
}

#[tokio::test]
#[derive(Clone, Default)]
struct CaptureWriter(Arc<Mutex<Vec<u8>>>);

impl Write for CaptureWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.lock().unwrap().extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for CaptureWriter {
type Writer = Self;
fn make_writer(&'a self) -> Self::Writer {
self.clone()
}
}

fn captured_lines(writer: &CaptureWriter) -> Vec<String> {
let bytes = writer.0.lock().unwrap().clone();
let s = String::from_utf8(bytes.clone())
.unwrap_or_else(|_| panic!("captured output is not valid UTF-8: {:?}", bytes));
s.lines().map(|l| l.to_string()).collect()
}

#[tokio::test(flavor = "current_thread")]
async fn orphan_reporter_emits_entries() {
let buf: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let sink_buf = buf.clone();
let writer = CaptureWriter::default();
let subscriber = tracing_subscriber::fmt()
.with_writer(writer.clone())
.with_ansi(false)
.with_max_level(tracing::Level::DEBUG)
.finish();

let _guard = tracing::subscriber::set_default(subscriber);

let cfg = OrphanReporterConfig::default()
.reporter_interval(Duration::from_millis(500))
.sample_size(5)
.log_sink(Arc::new(move |line: &str| {
let mut v = sink_buf.lock().unwrap();
v.push(line.to_string());
}));
.sample_size(5);

let reporter = OrphanReporter::new(cfg);
let handle = reporter.get_handle();
Expand Down Expand Up @@ -84,27 +116,23 @@ async fn orphan_reporter_emits_entries() {
// Allow time for background task to flush
tokio::time::sleep(Duration::from_secs(2)).await;

let out = buf.lock().unwrap();
let log_out = out.last().unwrap();
let lines = captured_lines(&writer);
let log_out = lines
.iter()
.find(|l| l.contains("Orphaned responses observed:"))
.expect("expected a log line containing 'Orphaned responses observed:'");

let prefix = "Orphaned responses observed: ";
let json_str_opt = log_out.split_once(prefix).map(|x| x.1);
assert!(
json_str_opt.is_some(),
"expected JSON payload after prefix in log line: {}",
log_out
);
let json_str = json_str_opt.unwrap();
let json_str = log_out
.split_once(prefix)
.map(|x| x.1)
.expect("expected JSON payload after prefix in log line");

// Parse JSON
let v_res: Result<serde_json::Value, _> = serde_json::from_str(json_str);
assert!(v_res.is_ok(), "valid JSON expected, got: {}", json_str);
let v = v_res.unwrap();
let v: serde_json::Value = serde_json::from_str(json_str).expect("valid JSON expected");

// Expect service map with key "kv"
let obj = v.as_object();
assert!(obj.is_some(), "service map object expected: {}", json_str);
let obj = obj.unwrap();
let obj = v.as_object().expect("service map object expected");
let (svc_key, entry_val) = obj
.get_key_value("kv")
.or_else(|| obj.iter().next())
Expand Down Expand Up @@ -189,22 +217,25 @@ async fn orphan_reporter_emits_entries() {
#[test]
fn test_orphan_reporter_logs() {
setup_test(async |config| {
let buf: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));

let sink_buf = buf.clone();
let mut orphan_reporter = OrphanReporter::new(
OrphanReporterConfig::default()
.reporter_interval(Duration::from_secs(1))
.sample_size(2)
.log_sink(Arc::new(move |line: &str| {
let mut v = sink_buf.lock().unwrap();
v.push(line.to_string());
})),
);
let writer = CaptureWriter::default();
let capture_layer = tracing_subscriber::fmt::layer()
.with_writer(writer.clone())
.with_ansi(false);

let agent_opts = create_default_options(config)
.await
.orphan_reporter_handler(orphan_reporter.get_handle());
.orphan_reporter_handler(
OrphanReporter::new(
OrphanReporterConfig::default()
.reporter_interval(Duration::from_secs(1))
.sample_size(2),
)
.get_handle(),
);

// Install the capture layer for the duration of this test
let subscriber = tracing_subscriber::registry().with(capture_layer);
let _guard = tracing::subscriber::set_default(subscriber);

let agent = Agent::new(agent_opts).await.unwrap();
agent
Expand Down Expand Up @@ -245,61 +276,51 @@ fn test_orphan_reporter_logs() {
// Allow reporter to flush
tokio::time::sleep(Duration::from_secs(2)).await;

let out = buf.lock().unwrap();
let log_out = out.last().unwrap();
let lines = captured_lines(&writer);
let log_out = lines
.iter()
.find(|l| l.contains("Orphaned responses observed:"))
.expect("expected a log line containing 'Orphaned responses observed:'");

let prefix = "Orphaned responses observed: ";
let json_str_opt = log_out.split_once(prefix).map(|x| x.1);
assert!(
json_str_opt.is_some(),
"expected JSON payload after prefix in log line: {}",
log_out
);
let json_str = json_str_opt.unwrap();
let json_str = log_out
.split_once(prefix)
.map(|x| x.1)
.expect("expected JSON payload after prefix in log line");

// Try top-level service map first: {"kv": {"total_count":..,"top_requests":[..]}}
let v_res: Result<serde_json::Value, _> = serde_json::from_str(json_str);
assert!(v_res.is_ok(), "valid JSON expected, got: {}", json_str);
let v = v_res.unwrap();
let v: serde_json::Value = serde_json::from_str(json_str).expect("valid JSON expected");

let is_entry_level = v.get("top_requests").is_some();
let (entry, service_key) = if is_entry_level {
(v, None::<&String>)
} else {
let obj = v.as_object();
assert!(obj.is_some(), "service map object expected: {}", json_str);
let obj = obj.unwrap();
let obj = v.as_object().expect("service map object expected");
let kv_or_first = obj.get_key_value("kv").or_else(|| obj.iter().next());
assert!(
kv_or_first.is_some(),
"service map non-empty expected: {}",
json_str
);
assert!(kv_or_first.is_some(), "service map non-empty expected");
let (k, e) = kv_or_first.unwrap();
(e.clone(), Some(k))
};

// Validate entry shape
let total_count_opt = entry.get("total_count").and_then(|x| x.as_u64());
assert!(total_count_opt.is_some(), "total_count is missing");
let total_count = total_count_opt.unwrap();
let total_count = entry
.get("total_count")
.and_then(|x| x.as_u64())
.expect("total_count missing");
assert!(total_count >= 1, "total_count should be >= 1");

let top_opt = entry.get("top_requests").and_then(|x| x.as_array());
assert!(top_opt.is_some(), "top_requests is missing");
let top = top_opt.unwrap();
let top = entry
.get("top_requests")
.and_then(|x| x.as_array())
.expect("top_requests missing");
assert!(!top.is_empty(), "top_requests is empty");

// Validate sorting: total_server_duration_us must be non-increasing
let mut prev: Option<u64> = None;
for (idx, it) in top.iter().enumerate() {
let v_opt = it.get("total_server_duration_us").and_then(|x| x.as_u64());
assert!(
v_opt.is_some(),
"top[{}].total_server_duration_us missing/u64",
idx
);
let v = v_opt.unwrap();
let v = it
.get("total_server_duration_us")
.and_then(|x| x.as_u64())
.unwrap_or_else(|| panic!("top[{}].total_server_duration_us missing", idx));
if let Some(p) = prev {
assert!(
p >= v,
Expand Down Expand Up @@ -333,11 +354,5 @@ fn test_orphan_reporter_logs() {
if let Some(svc) = service_key {
assert_eq!(svc, "kv", "expected kv service key");
}

assert!(
log_out.contains(&"Orphaned responses observed:".to_string()),
"expected orphan reporter output, got:\n{}",
log_out
);
});
}
Loading