Skip to content

Commit 82a5e07

Browse files
goffrieConvex, Inc.
authored andcommitted
Use MultiTableIterator in list_snapshot (#44127)
This avoids reinitializing TableIterator for each table in the snapshot. GitOrigin-RevId: dc82e8261936ec513974fa41ca6f58a64dcb3c16
1 parent 03034b0 commit 82a5e07

File tree

3 files changed

+192
-80
lines changed

3 files changed

+192
-80
lines changed

crates/database/src/database.rs

Lines changed: 101 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use common::{
5757
LIST_SNAPSHOT_MAX_AGE_SECS,
5858
SNAPSHOT_LIST_TIME_LIMIT,
5959
},
60+
pause::Fault,
6061
persistence::{
6162
new_idle_repeatable_ts,
6263
ConflictStrategy,
@@ -109,6 +110,7 @@ use errors::{
109110
};
110111
use events::usage::UsageEventLogger;
111112
use futures::{
113+
future::Either,
112114
pin_mut,
113115
stream::BoxStream,
114116
FutureExt,
@@ -222,6 +224,7 @@ use crate::{
222224
BootstrapComponentsModel,
223225
ComponentRegistry,
224226
ComponentsTable,
227+
MultiTableIterator,
225228
SchemasTable,
226229
TableIterator,
227230
Transaction,
@@ -294,7 +297,7 @@ pub struct Database<RT: Runtime> {
294297
Mutex<
295298
Option<(
296299
ListSnapshotTableIteratorCacheEntry,
297-
BoxStream<'static, anyhow::Result<LatestDocument>>,
300+
BoxStream<'static, anyhow::Result<Either<LatestDocument, MultiTableIterator<RT>>>>,
298301
)>,
299302
>,
300303
>,
@@ -303,7 +306,7 @@ pub struct Database<RT: Runtime> {
303306
#[derive(PartialEq, Eq)]
304307
struct ListSnapshotTableIteratorCacheEntry {
305308
snapshot: Timestamp,
306-
tablet_id: TabletId,
309+
tablet_ids: Vec<TabletId>,
307310
by_id: IndexId,
308311
cursor: Option<ResolvedDocumentId>,
309312
}
@@ -2042,7 +2045,7 @@ impl<RT: Runtime> Database<RT> {
20422045
self.snapshot_by_id_indexes(snapshot),
20432046
self.snapshot_component_paths(snapshot)
20442047
)?;
2045-
let tablet_ids: BTreeSet<_> = table_mapping
2048+
let mut tablet_ids: Vec<_> = table_mapping
20462049
.iter()
20472050
.map(|(tablet_id, ..)| tablet_id)
20482051
.filter_map(|tablet_id| {
@@ -2066,9 +2069,9 @@ impl<RT: Runtime> Database<RT> {
20662069
}
20672070
})
20682071
.try_collect()?;
2069-
let mut tablet_ids = tablet_ids.into_iter();
2070-
let tablet_id = match tablet_ids.next() {
2071-
Some(first_table) => first_table,
2072+
tablet_ids.sort_unstable();
2073+
let tablet_id = match tablet_ids.first() {
2074+
Some(first_table) => *first_table,
20722075
None => {
20732076
return Ok(SnapshotPage {
20742077
documents: vec![],
@@ -2082,35 +2085,67 @@ impl<RT: Runtime> Database<RT> {
20822085
.get(&tablet_id)
20832086
.ok_or_else(|| anyhow::anyhow!("by_id index for {tablet_id:?} missing"))?;
20842087
let mut document_stream = {
2085-
let mut cached = self.list_snapshot_table_iterator_cache.lock();
20862088
let expected_cache_key = ListSnapshotTableIteratorCacheEntry {
20872089
snapshot: *snapshot,
2088-
tablet_id,
2090+
tablet_ids: tablet_ids.clone(),
20892091
by_id,
20902092
cursor,
20912093
};
2092-
if let Some((cache_key, _ds)) = &*cached
2093-
&& *cache_key == expected_cache_key
2094+
if let Some((_key, ds)) = self
2095+
.list_snapshot_table_iterator_cache
2096+
.lock()
2097+
.take_if(|(cache_key, _)| *cache_key == expected_cache_key)
20942098
{
2095-
let (_, ds) = cached.take().unwrap();
20962099
ds
20972100
} else {
2098-
let table_iterator = self.table_iterator(snapshot, 100);
2101+
if let Fault::Error(e) = self
2102+
.runtime
2103+
.pause_client()
2104+
.wait("list_snapshot_new_iterator")
2105+
.await
2106+
{
2107+
return Err(e);
2108+
}
2109+
let table_iterator = self.table_iterator(snapshot, 100).multi(tablet_ids.clone());
20992110
table_iterator
2100-
.stream_documents_in_table(tablet_id, by_id, cursor)
2111+
.into_stream_documents_in_table(tablet_id, by_id, cursor)
21012112
.boxed()
21022113
}
21032114
};
21042115

2105-
// new_cursor is set once, when we know the final internal_id.
2106-
let mut new_cursor = None;
2116+
enum PageResult<RT: Runtime> {
2117+
/// Continue iterating the same table. `document_stream` can return
2118+
/// more values.
2119+
ContinueTable {
2120+
new_cursor: ResolvedDocumentId,
2121+
document_stream: BoxStream<
2122+
'static,
2123+
anyhow::Result<Either<LatestDocument, MultiTableIterator<RT>>>,
2124+
>,
2125+
},
2126+
/// `tablet_id` is done, and `document_stream` has returned the
2127+
/// MultiTableIterator. There may or may not be more
2128+
/// tables.
2129+
TableDone(MultiTableIterator<RT>),
2130+
}
21072131
// documents accumulated in (ts, id) order to return.
21082132
let mut documents = vec![];
21092133
let mut rows_read = 0;
21102134
let start_time = Instant::now();
2111-
while let Some(LatestDocument { ts, value: doc, .. }) = document_stream.try_next().await? {
2135+
let page_result = loop {
2136+
let LatestDocument { ts, value: doc, .. } =
2137+
match document_stream.try_next().await?.context(
2138+
"into_stream_documents_in_table stream ended without returning the iterator",
2139+
)? {
2140+
Either::Left(rev) => rev,
2141+
Either::Right(iterator) => {
2142+
drop(document_stream);
2143+
break PageResult::TableDone(iterator);
2144+
},
2145+
};
21122146
rows_read += 1;
21132147
let id = doc.id();
2148+
anyhow::ensure!(id.tablet_id == tablet_id);
21142149
let table_name = table_mapping.tablet_name(id.tablet_id)?;
21152150
let namespace = table_mapping.tablet_namespace(id.tablet_id)?;
21162151
let component_id = ComponentId::from(namespace);
@@ -2132,34 +2167,65 @@ impl<RT: Runtime> Database<RT> {
21322167
|| documents.len() >= rows_returned_limit
21332168
|| start_time.elapsed() > *SNAPSHOT_LIST_TIME_LIMIT
21342169
{
2135-
new_cursor = Some(id);
2136-
break;
2170+
break PageResult::ContinueTable {
2171+
new_cursor: id,
2172+
document_stream,
2173+
};
21372174
}
2138-
}
2139-
let new_cursor = match new_cursor {
2140-
Some(new_cursor) => Some(new_cursor),
2141-
None => match tablet_ids.next() {
2142-
Some(next_tablet_id) => {
2175+
};
2176+
let (new_cursor, next_cache_kv) = match page_result {
2177+
PageResult::ContinueTable {
2178+
new_cursor,
2179+
document_stream,
2180+
} => (
2181+
Some(new_cursor),
2182+
Some((
2183+
ListSnapshotTableIteratorCacheEntry {
2184+
snapshot: *snapshot,
2185+
tablet_ids,
2186+
by_id,
2187+
cursor: Some(new_cursor),
2188+
},
2189+
document_stream,
2190+
)),
2191+
),
2192+
PageResult::TableDone(table_iterator) => match tablet_ids.get(1) {
2193+
Some(&next_tablet_id) => {
21432194
// TODO(lee) just use DeveloperDocumentId::min() once we no longer
21442195
// need to be rollback-safe.
21452196
let next_table_number = table_mapping.tablet_number(next_tablet_id)?;
2146-
Some(ResolvedDocumentId::new(
2197+
let next_by_id = *by_id_indexes.get(&next_tablet_id).ok_or_else(|| {
2198+
anyhow::anyhow!("by_id index for {next_tablet_id:?} missing")
2199+
})?;
2200+
let next_cursor = ResolvedDocumentId::new(
21472201
next_tablet_id,
21482202
DeveloperDocumentId::new(next_table_number, InternalId::MIN),
2149-
))
2203+
);
2204+
let next_document_stream = table_iterator
2205+
.into_stream_documents_in_table(
2206+
next_tablet_id,
2207+
next_by_id,
2208+
Some(next_cursor),
2209+
)
2210+
.boxed();
2211+
(
2212+
Some(next_cursor),
2213+
Some((
2214+
ListSnapshotTableIteratorCacheEntry {
2215+
snapshot: *snapshot,
2216+
tablet_ids: tablet_ids[1..].to_vec(),
2217+
by_id: next_by_id,
2218+
cursor: Some(next_cursor),
2219+
},
2220+
next_document_stream,
2221+
)),
2222+
)
21502223
},
2151-
None => None,
2224+
None => (None, None),
21522225
},
21532226
};
2154-
if let Some(new_cursor) = new_cursor {
2155-
let new_cache_key = ListSnapshotTableIteratorCacheEntry {
2156-
snapshot: *snapshot,
2157-
tablet_id,
2158-
by_id,
2159-
cursor: Some(new_cursor),
2160-
};
2161-
*self.list_snapshot_table_iterator_cache.lock() =
2162-
Some((new_cache_key, document_stream));
2227+
if let Some(kv) = next_cache_kv {
2228+
*self.list_snapshot_table_iterator_cache.lock() = Some(kv);
21632229
}
21642230
let has_more = new_cursor.is_some();
21652231
metrics::log_list_snapshot_page_documents(documents.len());

crates/database/src/table_iteration.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use common::{
4747
};
4848
use errors::ErrorMetadataAnyhowExt;
4949
use futures::{
50+
future::Either,
5051
pin_mut,
5152
stream,
5253
Stream,
@@ -268,6 +269,26 @@ impl<RT: Runtime> MultiTableIterator<RT> {
268269
}
269270
}
270271

272+
/// Wrapper for `stream_documents_in_table` that takes `self` by value, and
273+
/// yields it back when the stream is finished. This is helpful because the
274+
/// resulting stream is `'static`.
275+
#[try_stream(ok = Either<LatestDocument, Self>, error = anyhow::Error)]
276+
pub async fn into_stream_documents_in_table(
277+
mut self,
278+
tablet_id: TabletId,
279+
by_id: IndexId,
280+
cursor: Option<ResolvedDocumentId>,
281+
) {
282+
{
283+
let stream = self.stream_documents_in_table(tablet_id, by_id, cursor);
284+
pin_mut!(stream);
285+
while let Some(rev) = stream.try_next().await? {
286+
yield Either::Left(rev);
287+
}
288+
}
289+
yield Either::Right(self);
290+
}
291+
271292
/// Algorithm overview:
272293
/// Walk a table ordered by an index at snapshot_ts which may be outside
273294
/// retention, so you can't walk the index directly.

0 commit comments

Comments
 (0)