Skip to content

Commit 5da6f4b

Browse files
committed
store: Refactor the herd cache for recent blocks lookups
1 parent 4834b4a commit 5da6f4b

File tree

1 file changed

+56
-68
lines changed

1 file changed

+56
-68
lines changed

store/postgres/src/chain_store.rs

Lines changed: 56 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use diesel_async::{scoped_futures::ScopedFutureExt, RunQueryDsl};
77

88
use graph::components::store::ChainHeadStore;
99
use graph::data::store::ethereum::call;
10-
use graph::derive::CheapClone;
1110
use graph::env::ENV_VARS;
1211
use graph::parking_lot::RwLock;
1312
use graph::prelude::MetricsRegistry;
@@ -17,6 +16,7 @@ use graph::stable_hash::crypto_stable_hash;
1716
use graph::util::herd_cache::HerdCache;
1817

1918
use std::collections::BTreeMap;
19+
use std::future::Future;
2020
use std::{
2121
collections::HashMap,
2222
convert::{TryFrom, TryInto},
@@ -1968,13 +1968,6 @@ impl ChainStoreMetrics {
19681968
}
19691969
}
19701970

1971-
#[derive(Clone, CheapClone)]
1972-
enum BlocksLookupResult {
1973-
ByHash(Arc<Result<Vec<JsonBlock>, StoreError>>),
1974-
ByNumber(Arc<Result<BTreeMap<BlockNumber, Vec<JsonBlock>>, StoreError>>),
1975-
Ancestor(Arc<Result<Option<(json::Value, BlockPtr)>, StoreError>>),
1976-
}
1977-
19781971
pub struct ChainStore {
19791972
logger: Logger,
19801973
pool: ConnectionPool,
@@ -1989,7 +1982,11 @@ pub struct ChainStore {
19891982
// with the database and to correctly implement invalidation. So, a
19901983
// conservative approach is acceptable.
19911984
recent_blocks_cache: RecentBlocksCache,
1992-
lookup_herd: HerdCache<BlocksLookupResult>,
1985+
// Typed herd caches to avoid thundering herd on concurrent lookups
1986+
blocks_by_hash_cache: HerdCache<Arc<Result<Vec<JsonBlock>, StoreError>>>,
1987+
blocks_by_number_cache:
1988+
HerdCache<Arc<Result<BTreeMap<BlockNumber, Vec<JsonBlock>>, StoreError>>>,
1989+
ancestor_cache: HerdCache<Arc<Result<Option<(json::Value, BlockPtr)>, StoreError>>>,
19931990
}
19941991

19951992
impl ChainStore {
@@ -2005,7 +2002,9 @@ impl ChainStore {
20052002
) -> Self {
20062003
let recent_blocks_cache =
20072004
RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics);
2008-
let lookup_herd = HerdCache::new(format!("chain_{}_herd_cache", chain));
2005+
let blocks_by_hash_cache = HerdCache::new(format!("chain_{}_blocks_by_hash", chain));
2006+
let blocks_by_number_cache = HerdCache::new(format!("chain_{}_blocks_by_number", chain));
2007+
let ancestor_cache = HerdCache::new(format!("chain_{}_ancestor", chain));
20092008
ChainStore {
20102009
logger,
20112010
pool,
@@ -2014,14 +2013,36 @@ impl ChainStore {
20142013
status,
20152014
chain_head_update_sender,
20162015
recent_blocks_cache,
2017-
lookup_herd,
2016+
blocks_by_hash_cache,
2017+
blocks_by_number_cache,
2018+
ancestor_cache,
20182019
}
20192020
}
20202021

20212022
pub fn is_ingestible(&self) -> bool {
20222023
matches!(self.status, ChainStatus::Ingestible)
20232024
}
20242025

2026+
/// Execute a cached query, avoiding thundering herd for identical requests.
2027+
/// Returns `(result, was_cached)`.
2028+
async fn cached_lookup<K, T, F>(
2029+
&self,
2030+
cache: &HerdCache<Arc<Result<T, StoreError>>>,
2031+
key: &K,
2032+
lookup: F,
2033+
) -> (Result<T, StoreError>, bool)
2034+
where
2035+
K: graph::stable_hash::StableHash,
2036+
T: Clone + Send + 'static,
2037+
F: Future<Output = Result<T, StoreError>> + Send + 'static,
2038+
{
2039+
let hash = crypto_stable_hash(key);
2040+
let lookup_fut = async move { Arc::new(lookup.await) };
2041+
let (arc_result, cached) = cache.cached_query(hash, lookup_fut, &self.logger).await;
2042+
let result = Arc::try_unwrap(arc_result).unwrap_or_else(|arc| (*arc).clone());
2043+
(result, cached)
2044+
}
2045+
20252046
pub(crate) async fn create(&self, ident: &ChainIdentifier) -> Result<(), Error> {
20262047
use public::ethereum_networks::dsl::*;
20272048

@@ -2489,19 +2510,13 @@ impl ChainStoreTrait for ChainStore {
24892510
.cloned()
24902511
.collect::<Vec<_>>();
24912512

2492-
let hash = crypto_stable_hash(&missing_numbers);
24932513
let this = self.clone();
2494-
let lookup_fut = async move {
2495-
let res = this.blocks_from_store_by_numbers(missing_numbers).await;
2496-
BlocksLookupResult::ByNumber(Arc::new(res))
2497-
};
2498-
let lookup_herd = self.lookup_herd.cheap_clone();
2499-
let logger = self.logger.cheap_clone();
2500-
let res = match lookup_herd.cached_query(hash, lookup_fut, &logger).await {
2501-
(BlocksLookupResult::ByNumber(res), _) => res,
2502-
_ => unreachable!(),
2503-
};
2504-
let res = Arc::try_unwrap(res).unwrap_or_else(|arc| (*arc).clone());
2514+
let missing_clone = missing_numbers.clone();
2515+
let (res, _) = self
2516+
.cached_lookup(&self.blocks_by_number_cache, &missing_numbers, async move {
2517+
this.blocks_from_store_by_numbers(missing_clone).await
2518+
})
2519+
.await;
25052520

25062521
match res {
25072522
Ok(blocks) => {
@@ -2575,31 +2590,14 @@ impl ChainStoreTrait for ChainStore {
25752590
// the database for one block hash, `h3`, is not much faster
25762591
// than looking up `[h1, h3]` though it would require less
25772592
// IO bandwidth
2578-
let hash = crypto_stable_hash(&hashes);
25792593
let this = self.clone();
2580-
let lookup_fut = async move {
2581-
let res = this.blocks_from_store(hashes).await;
2582-
BlocksLookupResult::ByHash(Arc::new(res))
2583-
};
2584-
let lookup_herd = self.lookup_herd.cheap_clone();
2585-
let logger = self.logger.cheap_clone();
2586-
// This match can only return ByHash because lookup_fut explicitly constructs
2587-
// BlocksLookupResult::ByHash. The cache preserves the exact future result,
2588-
// so ByNumber variant is structurally impossible here.
2589-
let res = match lookup_herd.cached_query(hash, lookup_fut, &logger).await {
2590-
(BlocksLookupResult::ByHash(res), _) => res,
2591-
(BlocksLookupResult::ByNumber(_) | BlocksLookupResult::Ancestor(_), _) => {
2592-
Arc::new(Err(StoreError::Unknown(anyhow::anyhow!(
2593-
"Unexpected BlocksLookupResult variant returned from cached block lookup by hash"
2594-
))))
2595-
}
2596-
};
2594+
let hashes_clone = hashes.clone();
2595+
let (res, _) = self
2596+
.cached_lookup(&self.blocks_by_hash_cache, &hashes, async move {
2597+
this.blocks_from_store(hashes_clone).await
2598+
})
2599+
.await;
25972600

2598-
// Try to avoid cloning a non-concurrent lookup; it's not
2599-
// entirely clear whether that will actually avoid a clone
2600-
// since it depends on a lot of the details of how the
2601-
// `HerdCache` is implemented
2602-
let res = Arc::try_unwrap(res).unwrap_or_else(|arc| (*arc).clone());
26032601
let stored = match res {
26042602
Ok(blocks) => {
26052603
for block in &blocks {
@@ -2640,14 +2638,16 @@ impl ChainStoreTrait for ChainStore {
26402638
// request the same ancestor block simultaneously. The cache check
26412639
// is inside the future so that only one caller checks and populates
26422640
// the cache.
2643-
let hash = crypto_stable_hash(&(&block_ptr, offset, &root));
2641+
let key = (&block_ptr, offset, &root);
26442642
let this = self.cheap_clone();
2645-
let lookup_fut = async move {
2646-
let res: Result<Option<(json::Value, BlockPtr)>, StoreError> = async {
2643+
let block_ptr_clone = block_ptr.clone();
2644+
let root_clone = root.clone();
2645+
let (res, cached) = self
2646+
.cached_lookup(&self.ancestor_cache, &key, async move {
26472647
// Check the local cache first.
26482648
let block_cache = this
26492649
.recent_blocks_cache
2650-
.get_ancestor(&block_ptr, offset)
2650+
.get_ancestor(&block_ptr_clone, offset)
26512651
.and_then(|x| Some(x.0).zip(x.1));
26522652
if let Some((ptr, data)) = block_cache {
26532653
return Ok(Some((data, ptr)));
@@ -2657,7 +2657,7 @@ impl ChainStoreTrait for ChainStore {
26572657
let mut conn = this.pool.get_permitted().await?;
26582658
let result = this
26592659
.storage
2660-
.ancestor_block(&mut conn, block_ptr, offset, root)
2660+
.ancestor_block(&mut conn, block_ptr_clone, offset, root_clone)
26612661
.await
26622662
.map_err(StoreError::from)?;
26632663

@@ -2678,26 +2678,14 @@ impl ChainStoreTrait for ChainStore {
26782678
}
26792679

26802680
Ok(result)
2681-
}
2681+
})
26822682
.await;
2683-
BlocksLookupResult::Ancestor(Arc::new(res))
2684-
};
2685-
let lookup_herd = self.lookup_herd.cheap_clone();
2686-
let logger = self.logger.cheap_clone();
2687-
let (res, cached) = match lookup_herd.cached_query(hash, lookup_fut, &logger).await {
2688-
(BlocksLookupResult::Ancestor(res), cached) => (res, cached),
2689-
_ => {
2690-
return Err(anyhow!(
2691-
"Unexpected BlocksLookupResult variant returned from cached ancestor block lookup"
2692-
))
2693-
}
2694-
};
2695-
let result = Arc::try_unwrap(res).unwrap_or_else(|arc| (*arc).clone())?;
2683+
let result = res?;
26962684

26972685
if cached {
2698-
// If we had a hit in the herd cache, we never ran lookup_fut
2699-
// but we want to pretend that we actually looked the value up
2700-
// from the recent blocks cache
2686+
// If we had a hit in the herd cache, we never ran the future
2687+
// that we pass to cached_lookup but we want to pretend that we
2688+
// actually looked the value up from the recent blocks cache
27012689
self.recent_blocks_cache.register_hit();
27022690
}
27032691

0 commit comments

Comments
 (0)