Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions turbopack/crates/turbo-persistence/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,9 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
/// might hold onto a block of the database and it should not be hold long-term.
pub fn get<K: QueryKey>(&self, family: usize, key: &K) -> Result<Option<ArcSlice<u8>>> {
debug_assert!(family < FAMILIES, "Family index out of bounds");
let span =
tracing::trace_span!("database read", family, result_size = tracing::field::Empty)
.entered();
let hash = hash_key(key);
let inner = self.inner.read();
for meta in inner.meta_files.iter().rev() {
Expand Down Expand Up @@ -1368,17 +1371,20 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
LookupValue::Deleted => {
#[cfg(feature = "stats")]
self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
span.record("result_size", "deleted");
return Ok(None);
}
LookupValue::Slice { value } => {
#[cfg(feature = "stats")]
self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
span.record("result_size", value.len());
return Ok(Some(value));
}
LookupValue::Blob { sequence_number } => {
#[cfg(feature = "stats")]
self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
let blob = self.read_blob(sequence_number)?;
span.record("result_size", blob.len());
return Ok(Some(blob));
}
}
Expand All @@ -1392,9 +1398,118 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
}
#[cfg(feature = "stats")]
self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
span.record("result_size", "not found");
Ok(None)
}

pub fn batch_get<K: QueryKey>(
&self,
family: usize,
keys: &[K],
) -> Result<Vec<Option<ArcSlice<u8>>>> {
debug_assert!(family < FAMILIES, "Family index out of bounds");
let span = tracing::trace_span!(
"database batch read",
family,
keys = keys.len(),
not_found = tracing::field::Empty,
deleted = tracing::field::Empty,
result_size = tracing::field::Empty
)
.entered();
let mut cells: Vec<(u64, usize, Option<LookupValue>)> = Vec::with_capacity(keys.len());
let mut empty_cells = keys.len();
for (index, key) in keys.iter().enumerate() {
let hash = hash_key(key);
cells.push((hash, index, None));
}
cells.sort_by_key(|(hash, _, _)| *hash);
let inner = self.inner.read();
for meta in inner.meta_files.iter().rev() {
let _result = meta.batch_lookup(
family as u32,
keys,
&mut cells,
&mut empty_cells,
&self.amqf_cache,
&self.key_block_cache,
&self.value_block_cache,
)?;

#[cfg(feature = "stats")]
{
let crate::meta_file::MetaBatchLookupResult {
family_miss,
range_misses,
quick_filter_misses,
sst_misses,
hits: _,
} = _result;
if family_miss {
self.stats.miss_family.fetch_add(1, Ordering::Relaxed);
}
if range_misses > 0 {
self.stats
.miss_range
.fetch_add(range_misses as u64, Ordering::Relaxed);
}
if quick_filter_misses > 0 {
self.stats
.miss_amqf
.fetch_add(quick_filter_misses as u64, Ordering::Relaxed);
}
if sst_misses > 0 {
self.stats
.miss_key
.fetch_add(sst_misses as u64, Ordering::Relaxed);
}
}

if empty_cells == 0 {
break;
}
}
let mut deleted = 0;
let mut not_found = 0;
let mut result_size = 0;
let mut results = vec![None; keys.len()];
for (hash, index, result) in cells {
if let Some(result) = result {
inner.accessed_key_hashes[family].insert(hash);
let result = match result {
LookupValue::Deleted => {
#[cfg(feature = "stats")]
self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed);
deleted += 1;
None
}
LookupValue::Slice { value } => {
#[cfg(feature = "stats")]
self.stats.hits_small.fetch_add(1, Ordering::Relaxed);
result_size += value.len();
Some(value)
}
LookupValue::Blob { sequence_number } => {
#[cfg(feature = "stats")]
self.stats.hits_blob.fetch_add(1, Ordering::Relaxed);
let blob = self.read_blob(sequence_number)?;
result_size += blob.len();
Some(blob)
}
};
results[index] = result;
} else {
#[cfg(feature = "stats")]
self.stats.miss_global.fetch_add(1, Ordering::Relaxed);
not_found += 1;
}
}
span.record("not_found", not_found);
span.record("deleted", deleted);
span.record("result_size", result_size);
Ok(results)
}

/// Returns database statistics.
#[cfg(feature = "stats")]
pub fn statistics(&self) -> Statistics {
Expand Down
118 changes: 118 additions & 0 deletions turbopack/crates/turbo-persistence/src/meta_file.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
cmp::Ordering,
fmt::Display,
fs::File,
hash::BuildHasherDefault,
Expand All @@ -20,6 +21,7 @@ use turbo_bincode::turbo_bincode_decode;

use crate::{
QueryKey,
lookup_entry::LookupValue,
static_sorted_file::{BlockCache, SstLookupResult, StaticSortedFile, StaticSortedFileMetaData},
};

Expand Down Expand Up @@ -211,6 +213,27 @@ pub enum MetaLookupResult {
SstLookup(SstLookupResult),
}

/// The result of a batch lookup operation.
#[derive(Default)]
pub struct MetaBatchLookupResult {
/// The key was not found because it is from a different key family.
#[cfg(feature = "stats")]
pub family_miss: bool,
/// The key was not found because it is out of the range of this SST file. But it was the
/// correct key family.
#[cfg(feature = "stats")]
pub range_misses: usize,
/// The key was not found because it was not in the AMQF filter. But it was in the range.
#[cfg(feature = "stats")]
pub quick_filter_misses: usize,
/// The key was unsuccessfully looked up in the SST file. It was in the AMQF filter.
#[cfg(feature = "stats")]
pub sst_misses: usize,
/// The key was found in the SST file.
#[cfg(feature = "stats")]
pub hits: usize,
}

/// The key family and hash range of an SST file.
#[derive(Clone, Copy)]
pub struct StaticSortedFileRange {
Expand Down Expand Up @@ -406,4 +429,99 @@ impl MetaFile {
}
Ok(miss_result)
}

pub fn batch_lookup<K: QueryKey>(
&self,
key_family: u32,
keys: &[K],
cells: &mut [(u64, usize, Option<LookupValue>)],
empty_cells: &mut usize,
amqf_cache: &AmqfCache,
key_block_cache: &BlockCache,
value_block_cache: &BlockCache,
) -> Result<MetaBatchLookupResult> {
if key_family != self.family {
#[cfg(feature = "stats")]
return Ok(MetaBatchLookupResult {
family_miss: true,
..Default::default()
});
#[cfg(not(feature = "stats"))]
return Ok(MetaBatchLookupResult {});
}
debug_assert!(
cells.is_sorted_by_key(|(hash, _, _)| *hash),
"Cells must be sorted by key hash"
);
#[allow(unused_mut, reason = "It's used when stats are enabled")]
let mut lookup_result = MetaBatchLookupResult::default();
for entry in self.entries.iter().rev() {
let start_index = cells
.binary_search_by(|(hash, _, _)| hash.cmp(&entry.min_hash).then(Ordering::Greater))
.err()
.unwrap();
if start_index >= cells.len() {
#[cfg(feature = "stats")]
{
lookup_result.range_misses += 1;
}
continue;
}
let end_index = cells
.binary_search_by(|(hash, _, _)| hash.cmp(&entry.max_hash).then(Ordering::Less))
.err()
.unwrap()
.checked_sub(1);
let Some(end_index) = end_index else {
#[cfg(feature = "stats")]
{
lookup_result.range_misses += 1;
}
continue;
};
if start_index > end_index {
#[cfg(feature = "stats")]
{
lookup_result.range_misses += 1;
}
continue;
}
let amqf = entry.amqf(self, amqf_cache)?;
for (hash, index, result) in &mut cells[start_index..=end_index] {
if result.is_some() {
continue;
}
if !amqf.contains_fingerprint(*hash) {
#[cfg(feature = "stats")]
{
lookup_result.quick_filter_misses += 1;
}
continue;
}
let sst_result = entry.sst(self)?.lookup(
*hash,
&keys[*index],
key_block_cache,
value_block_cache,
)?;
if let SstLookupResult::Found(value) = sst_result {
*result = Some(value);
*empty_cells -= 1;
#[cfg(feature = "stats")]
{
lookup_result.hits += 1;
}
if *empty_cells == 0 {
return Ok(lookup_result);
}
} else {
#[cfg(feature = "stats")]
{
lookup_result.sst_misses += 1;
}
}
}
}
Ok(lookup_result)
}
}
Loading
Loading