Skip to content

Commit ee48083

Browse files
authored
Add fallback for ancestor_block to handle cache truncation (#6262)
* ethereum: Add RPC fallback for ancestor_block to handle cache truncation * ethereum: Extract walk_back_ancestor logic and add tests * ethereum: Fetch full blocks with receipts in ancestor_block RPC fallback When ancestor_block falls back to RPC due to cache deserialization failures, fetch full blocks with receipts instead of light blocks. Empty receipts caused missing block handler call triggers and log triggers.
1 parent 500df82 commit ee48083

File tree

1 file changed

+253
-43
lines changed

1 file changed

+253
-43
lines changed

chain/ethereum/src/chain.rs

Lines changed: 253 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,38 @@ impl std::fmt::Debug for Chain {
355355
}
356356
}
357357

358+
/// Walk back from a block pointer by following parent pointers.
359+
/// This is the core logic used as a fallback when the cache doesn't have ancestor block.
360+
///
361+
async fn walk_back_ancestor<F, Fut, E>(
362+
start_ptr: BlockPtr,
363+
offset: BlockNumber,
364+
root: Option<BlockHash>,
365+
mut parent_getter: F,
366+
) -> Result<Option<BlockPtr>, E>
367+
where
368+
F: FnMut(BlockPtr) -> Fut,
369+
Fut: std::future::Future<Output = Result<Option<BlockPtr>, E>>,
370+
{
371+
let mut current_ptr = start_ptr;
372+
373+
for _ in 0..offset {
374+
match parent_getter(current_ptr.clone()).await? {
375+
Some(parent) => {
376+
if let Some(root_hash) = &root {
377+
if parent.hash == *root_hash {
378+
break;
379+
}
380+
}
381+
current_ptr = parent;
382+
}
383+
None => return Ok(None),
384+
}
385+
}
386+
387+
Ok(Some(current_ptr))
388+
}
389+
358390
impl Chain {
359391
/// Creates a new Ethereum [`Chain`].
360392
pub fn new(
@@ -1030,6 +1062,20 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
10301062
}
10311063
}
10321064

1065+
// Find an ancestor block at the specified offset from the given block pointer.
1066+
// Primarily used for reorg detection to verify if the indexed position remains
1067+
// on the main chain.
1068+
//
1069+
// Parameters:
1070+
// - ptr: Starting block pointer from which to walk backwards (typically the chain head)
1071+
// - offset: Number of blocks to traverse backwards (0 returns ptr, 1 returns parent, etc.)
1072+
// - root: Optional block hash that serves as a boundary for traversal. This is ESSENTIAL
1073+
// for chains with skipped blocks (e.g., Filecoin EVM) where block numbers are not
1074+
// consecutive. When provided, traversal stops upon reaching the child of root,
1075+
// ensuring correct ancestor relationships even with gaps in block numbers.
1076+
//
1077+
// The function attempts to use the database cache first for performance,
1078+
// with RPC fallback implemented to handle cases where the cache is unavailable.
10331079
async fn ancestor_block(
10341080
&self,
10351081
ptr: BlockPtr,
@@ -1040,56 +1086,83 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
10401086
let cached = self
10411087
.chain_store
10421088
.cheap_clone()
1043-
.ancestor_block(ptr, offset, root)
1089+
.ancestor_block(ptr.clone(), offset, root.clone())
10441090
.await?;
10451091

1046-
let Some((json_value, block_ptr)) = cached else {
1047-
return Ok(None);
1048-
};
1049-
1050-
match json::from_value::<EthereumBlock>(json_value.clone()) {
1051-
Ok(block) => Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
1052-
ethereum_block: block,
1053-
calls: None,
1054-
}))),
1055-
Err(e) => {
1056-
warn!(
1092+
// First check if we have the ancestor in cache and can deserialize it
1093+
let block_ptr = match cached {
1094+
Some((json, ptr)) => {
1095+
// Try to deserialize the cached block
1096+
match json::from_value::<EthereumBlock>(json.clone()) {
1097+
Ok(block) => {
1098+
// Successfully cached and deserialized
1099+
return Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
1100+
ethereum_block: block,
1101+
calls: None,
1102+
})));
1103+
}
1104+
Err(e) => {
1105+
// Cache hit but deserialization failed
1106+
warn!(
1107+
self.logger,
1108+
"Failed to deserialize cached ancestor block {} (offset {} from {}): {}. \
1109+
This may indicate stale cache data from a previous version. \
1110+
Falling back to Firehose/RPC.",
1111+
ptr.hash_hex(),
1112+
offset,
1113+
ptr_for_log.hash_hex(),
1114+
e
1115+
);
1116+
ptr
1117+
}
1118+
}
1119+
}
1120+
None => {
1121+
// Cache miss - fall back to walking the chain via parent_ptr() calls.
1122+
// This provides resilience when the block cache is empty (e.g., after truncation).
1123+
debug!(
10571124
self.logger,
1058-
"Failed to deserialize cached ancestor block {} (offset {} from {}): {}. \
1059-
This may indicate stale cache data from a previous version. \
1060-
Falling back to Firehose/RPC.",
1061-
block_ptr.hash_hex(),
1062-
offset,
1125+
"ancestor_block cache miss for {} at offset {}, walking back via parent_ptr",
10631126
ptr_for_log.hash_hex(),
1064-
e
1127+
offset
10651128
);
10661129

1067-
match self.chain_client.as_ref() {
1068-
ChainClient::Firehose(endpoints) => {
1069-
let block = self
1070-
.fetch_block_with_firehose(endpoints, &block_ptr)
1071-
.await?;
1072-
let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?;
1073-
Ok(Some(BlockFinality::NonFinal(ethereum_block)))
1074-
}
1075-
ChainClient::Rpc(adapters) => {
1076-
match self
1077-
.fetch_light_block_with_rpc(adapters, &block_ptr)
1078-
.await?
1079-
{
1080-
Some(light_block) => {
1081-
let ethereum_block = EthereumBlock {
1082-
block: light_block,
1083-
transaction_receipts: vec![],
1084-
};
1085-
Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
1086-
ethereum_block,
1087-
calls: None,
1088-
})))
1089-
}
1090-
None => Ok(None),
1091-
}
1130+
match walk_back_ancestor(
1131+
ptr.clone(),
1132+
offset,
1133+
root.clone(),
1134+
|block_ptr| async move { self.parent_ptr(&block_ptr).await },
1135+
)
1136+
.await?
1137+
{
1138+
Some(ptr) => ptr,
1139+
None => return Ok(None),
1140+
}
1141+
}
1142+
};
1143+
1144+
// Fetch the actual block data for the identified block pointer.
1145+
// This path is taken for both cache misses and deserialization failures.
1146+
match self.chain_client.as_ref() {
1147+
ChainClient::Firehose(endpoints) => {
1148+
let block = self
1149+
.fetch_block_with_firehose(endpoints, &block_ptr)
1150+
.await?;
1151+
let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?;
1152+
Ok(Some(BlockFinality::NonFinal(ethereum_block)))
1153+
}
1154+
ChainClient::Rpc(adapters) => {
1155+
match self
1156+
.fetch_full_block_with_rpc(adapters, &block_ptr)
1157+
.await?
1158+
{
1159+
Some(ethereum_block) => {
1160+
Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls {
1161+
ethereum_block,
1162+
calls: None,
1163+
})))
10921164
}
1165+
None => Ok(None),
10931166
}
10941167
}
10951168
}
@@ -1185,6 +1258,29 @@ impl TriggersAdapter {
11851258

11861259
Ok(blocks.into_iter().next())
11871260
}
1261+
1262+
async fn fetch_full_block_with_rpc(
1263+
&self,
1264+
adapters: &EthereumNetworkAdapters,
1265+
block_ptr: &BlockPtr,
1266+
) -> Result<Option<EthereumBlock>, Error> {
1267+
let adapter = adapters.cheapest_with(&self.capabilities).await?;
1268+
1269+
let block = adapter
1270+
.block_by_hash(&self.logger, block_ptr.hash.as_b256())
1271+
.await?;
1272+
1273+
match block {
1274+
Some(block) => {
1275+
let ethereum_block = adapter
1276+
.load_full_block(&self.logger, block)
1277+
.await
1278+
.map_err(|e| anyhow!("Failed to load full block: {}", e))?;
1279+
Ok(Some(ethereum_block))
1280+
}
1281+
None => Ok(None),
1282+
}
1283+
}
11881284
}
11891285

11901286
pub struct FirehoseMapper {
@@ -1461,4 +1557,118 @@ mod tests {
14611557
assert!(missing.contains(&2));
14621558
assert!(missing.contains(&3));
14631559
}
1560+
1561+
#[tokio::test]
1562+
async fn test_walk_back_ancestor() {
1563+
use std::collections::HashMap;
1564+
1565+
let block_100_hash = BlockHash("block100".as_bytes().to_vec().into_boxed_slice());
1566+
let block_101_hash = BlockHash("block101".as_bytes().to_vec().into_boxed_slice());
1567+
let block_102_hash = BlockHash("block102".as_bytes().to_vec().into_boxed_slice());
1568+
let block_103_hash = BlockHash("block103".as_bytes().to_vec().into_boxed_slice());
1569+
let block_104_hash = BlockHash("block104".as_bytes().to_vec().into_boxed_slice());
1570+
let block_105_hash = BlockHash("block105".as_bytes().to_vec().into_boxed_slice());
1571+
1572+
let block_105 = BlockPtr::new(block_105_hash.clone(), 105);
1573+
let block_104 = BlockPtr::new(block_104_hash.clone(), 104);
1574+
let block_103 = BlockPtr::new(block_103_hash.clone(), 103);
1575+
let block_102 = BlockPtr::new(block_102_hash.clone(), 102);
1576+
let block_101 = BlockPtr::new(block_101_hash.clone(), 101);
1577+
let block_100 = BlockPtr::new(block_100_hash.clone(), 100);
1578+
1579+
let mut parent_map = HashMap::new();
1580+
parent_map.insert(block_105_hash.clone(), block_104.clone());
1581+
parent_map.insert(block_104_hash.clone(), block_103.clone());
1582+
parent_map.insert(block_103_hash.clone(), block_102.clone());
1583+
parent_map.insert(block_102_hash.clone(), block_101.clone());
1584+
parent_map.insert(block_101_hash.clone(), block_100.clone());
1585+
1586+
let result = super::walk_back_ancestor(block_105.clone(), 2, None, |block_ptr| {
1587+
let parent = parent_map.get(&block_ptr.hash).cloned();
1588+
async move { Ok::<_, std::convert::Infallible>(parent) }
1589+
})
1590+
.await
1591+
.unwrap();
1592+
assert_eq!(result, Some(block_103.clone()));
1593+
1594+
let result = super::walk_back_ancestor(
1595+
block_105.clone(),
1596+
10,
1597+
Some(block_102_hash.clone()),
1598+
|block_ptr| {
1599+
let parent = parent_map.get(&block_ptr.hash).cloned();
1600+
async move { Ok::<_, std::convert::Infallible>(parent) }
1601+
},
1602+
)
1603+
.await
1604+
.unwrap();
1605+
assert_eq!(
1606+
result,
1607+
Some(block_103.clone()),
1608+
"Should stop at child of root"
1609+
);
1610+
}
1611+
1612+
#[tokio::test]
1613+
async fn test_walk_back_ancestor_skipped_blocks_with_root() {
1614+
use std::collections::HashMap;
1615+
1616+
let block_100_hash = BlockHash("block100".as_bytes().to_vec().into_boxed_slice());
1617+
let block_101_hash = BlockHash("block101".as_bytes().to_vec().into_boxed_slice());
1618+
let block_102_hash = BlockHash("block102".as_bytes().to_vec().into_boxed_slice());
1619+
let block_110_hash = BlockHash("block110".as_bytes().to_vec().into_boxed_slice());
1620+
let block_111_hash = BlockHash("block111".as_bytes().to_vec().into_boxed_slice());
1621+
let block_112_hash = BlockHash("block112".as_bytes().to_vec().into_boxed_slice());
1622+
let block_120_hash = BlockHash("block120".as_bytes().to_vec().into_boxed_slice());
1623+
1624+
let block_120 = BlockPtr::new(block_120_hash.clone(), 120);
1625+
let block_112 = BlockPtr::new(block_112_hash.clone(), 112);
1626+
let block_111 = BlockPtr::new(block_111_hash.clone(), 111);
1627+
let block_110 = BlockPtr::new(block_110_hash.clone(), 110);
1628+
let block_102 = BlockPtr::new(block_102_hash.clone(), 102);
1629+
let block_101 = BlockPtr::new(block_101_hash.clone(), 101);
1630+
let block_100 = BlockPtr::new(block_100_hash.clone(), 100);
1631+
1632+
let mut parent_map = HashMap::new();
1633+
parent_map.insert(block_120_hash.clone(), block_112.clone());
1634+
parent_map.insert(block_112_hash.clone(), block_111.clone());
1635+
parent_map.insert(block_111_hash.clone(), block_110.clone());
1636+
parent_map.insert(block_110_hash.clone(), block_102.clone());
1637+
parent_map.insert(block_102_hash.clone(), block_101.clone());
1638+
parent_map.insert(block_101_hash.clone(), block_100.clone());
1639+
1640+
let result = super::walk_back_ancestor(
1641+
block_120.clone(),
1642+
10,
1643+
Some(block_110_hash.clone()),
1644+
|block_ptr| {
1645+
let parent = parent_map.get(&block_ptr.hash).cloned();
1646+
async move { Ok::<_, std::convert::Infallible>(parent) }
1647+
},
1648+
)
1649+
.await
1650+
.unwrap();
1651+
assert_eq!(
1652+
result,
1653+
Some(block_111.clone()),
1654+
"root=110: should stop at 111 (child of root)"
1655+
);
1656+
1657+
let result = super::walk_back_ancestor(
1658+
block_120.clone(),
1659+
10,
1660+
Some(block_101_hash.clone()),
1661+
|block_ptr| {
1662+
let parent = parent_map.get(&block_ptr.hash).cloned();
1663+
async move { Ok::<_, std::convert::Infallible>(parent) }
1664+
},
1665+
)
1666+
.await
1667+
.unwrap();
1668+
assert_eq!(
1669+
result,
1670+
Some(block_102.clone()),
1671+
"root=101: should stop at 102 (child of root, across skip)"
1672+
);
1673+
}
14641674
}

0 commit comments

Comments
 (0)