Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
CLAUDE.md
fuzz-*.log
default.sled
timing_test*
Expand Down
70 changes: 70 additions & 0 deletions src/block_checker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::collections::BTreeMap;
use std::panic::Location;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{LazyLock, Mutex};

static COUNTER: AtomicU64 = AtomicU64::new(0);
static CHECK_INS: LazyLock<BlockChecker> = LazyLock::new(|| {
std::thread::spawn(move || {
let mut last_top_10 = Default::default();
loop {
std::thread::sleep(std::time::Duration::from_secs(5));
last_top_10 = CHECK_INS.report(last_top_10);
}
});

BlockChecker::default()
});

type LocationMap = BTreeMap<u64, &'static Location<'static>>;

#[derive(Default)]
pub(crate) struct BlockChecker {
state: Mutex<LocationMap>,
}

impl BlockChecker {
fn report(&self, last_top_10: LocationMap) -> LocationMap {
let state = self.state.lock().unwrap();
println!("top 10 longest blocking sections:");

let top_10: LocationMap =
state.iter().take(10).map(|(k, v)| (*k, *v)).collect();

for (id, location) in &top_10 {
if last_top_10.contains_key(id) {
println!("id: {}, location: {:?}", id, location);
}
}

top_10
}

fn check_in(&self, location: &'static Location) -> BlockGuard {
let next_id = COUNTER.fetch_add(1, Ordering::Relaxed);
let mut state = self.state.lock().unwrap();
state.insert(next_id, location);
BlockGuard { id: next_id }
}

fn check_out(&self, id: u64) {
let mut state = self.state.lock().unwrap();
state.remove(&id);
}
}

pub(crate) struct BlockGuard {
id: u64,
}

impl Drop for BlockGuard {
fn drop(&mut self) {
CHECK_INS.check_out(self.id)
}
}

#[track_caller]
pub(crate) fn track_blocks() -> BlockGuard {
let caller = Location::caller();
CHECK_INS.check_in(caller)
}
5 changes: 5 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ impl<const LEAF_FANOUT: usize> Db<LEAF_FANOUT> {
let mut ever_seen = std::collections::HashSet::new();
let before = std::time::Instant::now();

#[cfg(feature = "for-internal-testing-only")]
let _b0 = crate::block_checker::track_blocks();

for (_cid, tree) in self.trees.lock().iter() {
let mut hi_none_count = 0;
let mut last_hi = None;
Expand All @@ -200,6 +203,8 @@ impl<const LEAF_FANOUT: usize> Db<LEAF_FANOUT> {
hi_none_count += 1;
}
}
// each tree should have exactly one leaf with no max hi key
assert_eq!(hi_none_count, 1);
}

log::debug!(
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@
//! ).unwrap();
//! # let _ = std::fs::remove_dir_all("my_db");
//! ```
#[cfg(feature = "for-internal-testing-only")]
mod block_checker;
mod config;
mod db;
mod flush_epoch;
Expand Down
39 changes: 26 additions & 13 deletions src/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@
) -> io::Result<()> {
let mut ca = self.cache_advisor.borrow_mut();
let to_evict = ca.accessed_reuse_buffer(*object_id, size);
let mut not_found = 0;
for (node_to_evict, _rough_size) in to_evict {
let object_id =
if let Some(object_id) = ObjectId::new(*node_to_evict) {
Expand All @@ -412,18 +413,11 @@

let node = if let Some(n) = self.object_id_index.get(&object_id) {
if *n.object_id != *node_to_evict {
log::debug!(
"during cache eviction, node to evict did not match current occupant for {:?}",
node_to_evict
);
continue;
}
n
} else {
log::debug!(
"during cache eviction, unable to find node to evict for {:?}",
node_to_evict
);
not_found += 1;
continue;
};

Expand Down Expand Up @@ -456,6 +450,13 @@
}
}

if not_found > 0 {
log::trace!(
"during cache eviction, did not find {} nodes that we were trying to evict",
not_found
);
}

Ok(())
}

Expand Down Expand Up @@ -701,16 +702,21 @@
self.event_verifier.print_debug_history_for_object(
dirty_object_id,
);

unreachable!(
"a leaf was expected to be cooperatively serialized but it was not available"
"a leaf was expected to be cooperatively serialized but it was not available. \
violation of flush responsibility for second read \
of expected cooperative serialization. leaf in question's \
dirty_flush_epoch is {:?}, our expected key was {:?}. node.deleted: {:?}",
leaf_ref.dirty_flush_epoch,
(dirty_epoch, dirty_object_id),
leaf_ref.deleted,
);
}
};

write_batch.push(Update::Store {
object_id: dirty_object_id,
collection_id: collection_id,

Check warning on line 719 in src/object_cache.rs

View workflow job for this annotation

GitHub Actions / clippy

redundant field names in struct initialization

warning: redundant field names in struct initialization --> src/object_cache.rs:719:25 | 719 | collection_id: collection_id, | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `collection_id` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_field_names = note: `#[warn(clippy::redundant_field_names)]` on by default
low_key,
data,
});
Expand All @@ -736,16 +742,16 @@
let before_compute_defrag = Instant::now();

if cfg!(not(feature = "monotonic-behavior")) {
let mut object_not_found = 0;

for fragmented_object_id in objects_to_defrag {
let object_opt =
self.object_id_index.get(&fragmented_object_id);

let object = if let Some(object) = object_opt {
object
} else {
log::debug!(
"defragmenting object not found in object_id_index: {fragmented_object_id:?}"
);
object_not_found += 1;
continue;
};

Expand Down Expand Up @@ -781,6 +787,13 @@
data,
});
}

if object_not_found > 0 {
log::debug!(
"{} objects not found while defragmenting",
object_not_found
);
}
}

let compute_defrag_latency = before_compute_defrag.elapsed();
Expand Down
Loading
Loading