Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/abstract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,35 @@ pub trait AbstractTree {
self.get(key, seqno).map(|x| x.is_some())
}

/// Returns `true` if the tree contains any key with the specified prefix.
///
/// This is more efficient than using `.prefix().next().is_some()` because it avoids
/// setting up a full merge iterator and returns as soon as any matching key is found.
///
/// # Examples
///
/// ```
/// # let folder = tempfile::tempdir()?;
/// # use lsm_tree::{AbstractTree, Config, Tree};
/// #
/// let tree = Config::new(folder, Default::default()).open()?;
/// assert!(!tree.contains_prefix("prefix", 0)?);
///
/// tree.insert("prefix:key1", "value1", 0);
/// tree.insert("prefix:key2", "value2", 1);
/// tree.insert("other", "value3", 2);
///
/// assert!(tree.contains_prefix("prefix", 3)?);
/// assert!(!tree.contains_prefix("nonexistent", 3)?);
/// #
/// # Ok::<(), lsm_tree::Error>(())
/// ```
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn contains_prefix<K: AsRef<[u8]>>(&self, prefix: K, seqno: SeqNo) -> crate::Result<bool>;

/// Inserts a key-value pair into the tree.
///
/// If the key already exists, the item will be overwritten.
Expand Down
177 changes: 177 additions & 0 deletions src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,81 @@ impl AbstractTree for Tree {
let value = InternalValue::new_weak_tombstone(key, seqno);
self.append_entry(value)
}

/// Checks if any key with the given prefix exists.
///
/// This is more efficient than using `.prefix().next().is_some()` because it avoids
/// setting up a full merge iterator and returns as soon as any matching key is found.
///
/// # Arguments
///
/// * `prefix` - The prefix to search for
/// * `seqno` - Sequence number for MVCC snapshot isolation
///
/// # Returns
///
/// * `Ok(true)` - If at least one key with the prefix exists
/// * `Ok(false)` - If no keys with the prefix exist
/// * `Err(_)` - If an I/O error occurred
///
/// # Examples
///
/// ```
/// # use lsm_tree::{Config, Tree};
/// # let folder = tempfile::tempdir()?;
/// let tree = Config::new(folder).open()?;
///
/// tree.insert("prefix:key1", "value1", 0);
/// tree.insert("prefix:key2", "value2", 1);
/// tree.insert("other", "value3", 2);
///
/// assert!(tree.contains_prefix("prefix", 3)?);
/// assert!(!tree.contains_prefix("nonexistent", 3)?);
/// # Ok::<(), lsm_tree::Error>(())
/// ```
fn contains_prefix<K: AsRef<[u8]>>(&self, prefix: K, seqno: SeqNo) -> crate::Result<bool> {
use crate::range::prefix_to_range;
use std::ops::Bound;

let prefix_bytes = prefix.as_ref();
let (lower_bound, upper_bound) = prefix_to_range(prefix_bytes);

let super_version = self
.version_history
.read()
.expect("lock is poisoned")
.get_version_for_snapshot(seqno);

// Helper to check if a key starts with the prefix and respects seqno
let key_matches = |key: &[u8]| key.starts_with(prefix_bytes);

// 1. Check active memtable
if Self::contains_prefix_in_memtable(
&super_version.active_memtable,
&lower_bound,
&upper_bound,
key_matches,
seqno,
) {
return Ok(true);
}

// 2. Check sealed memtables (newest first)
for mt in super_version.sealed_memtables.iter().rev() {
if Self::contains_prefix_in_memtable(mt, &lower_bound, &upper_bound, key_matches, seqno)
{
return Ok(true);
}
}

// 3. Check SST tables
if Self::contains_prefix_in_tables(&super_version.version, &lower_bound, &upper_bound, seqno)?
{
return Ok(true);
}

Ok(false)
}
}

impl Tree {
Expand Down Expand Up @@ -705,6 +780,108 @@ impl Tree {
None
}

fn contains_prefix_in_memtable<F>(
memtable: &Arc<Memtable>,
lower_bound: &Bound<UserKey>,
upper_bound: &Bound<UserKey>,
key_matches: F,
seqno: SeqNo,
) -> bool
where
F: Fn(&[u8]) -> bool,
{
use crate::key::InternalKey;
use crate::ValueType;
use std::ops::Bound::{Excluded, Included, Unbounded};

// Convert bounds to InternalKey for memtable range query
let internal_lower = match lower_bound {
Included(key) => Bound::Included(InternalKey::new(
key.as_ref(),
SeqNo::MAX,
ValueType::Tombstone,
)),
Excluded(key) => Bound::Excluded(InternalKey::new(
key.as_ref(),
0,
ValueType::Tombstone,
)),
Unbounded => Bound::Unbounded,
};

let internal_upper = match upper_bound {
Included(key) => Bound::Included(InternalKey::new(
key.as_ref(),
0,
ValueType::Value,
)),
Excluded(key) => Bound::Excluded(InternalKey::new(
key.as_ref(),
SeqNo::MAX,
ValueType::Value,
)),
Unbounded => Bound::Unbounded,
};

// Iterate through memtable in the prefix range
memtable
.range((internal_lower, internal_upper))
.any(|entry| {
let key_bytes = entry.key().user_key.as_ref();
// Check if key matches prefix and respects seqno
key_matches(key_bytes) && entry.key().seqno < seqno
})
}

fn contains_prefix_in_tables(
version: &Version,
lower_bound: &Bound<UserKey>,
upper_bound: &Bound<UserKey>,
seqno: SeqNo,
) -> crate::Result<bool> {
use std::ops::Bound::{Excluded, Included, Unbounded};

// Convert bounds for table range checks
let range = (lower_bound, upper_bound);

for level in version.iter_levels() {
for run in level.iter() {
for table in run.iter() {
// Check if table's key range overlaps with prefix range
if !table.check_key_range_overlap(&range) {
continue;
}

// Use table's range iterator to check for any key in the prefix range
let table_lower = match lower_bound {
Included(key) => Bound::Included(key.as_ref()),
Excluded(key) => Bound::Excluded(key.as_ref()),
Unbounded => Bound::Unbounded,
};

let table_upper = match upper_bound {
Included(key) => Bound::Included(key.as_ref()),
Excluded(key) => Bound::Excluded(key.as_ref()),
Unbounded => Bound::Unbounded,
};

let mut iter = table.range((table_lower, table_upper))?;

// Check if any key in range matches the seqno requirement
while let Some(entry) = iter.next() {
let entry = entry?;

if entry.key().seqno < seqno && !entry.is_tombstone() {
return Ok(true);
}
}
}
}
}

Ok(false)
}

pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
self.version_history
.read()
Expand Down
Loading