diff --git a/Makefile b/Makefile index 7da521fe..f6ac364c 100644 --- a/Makefile +++ b/Makefile @@ -99,7 +99,7 @@ export-sample-test-data: .PHONY: docs docs: - cargo docs --document-private-items --exclude rollup-node-chain-orchestrator + cargo +$(NIGHTLY_TOOLCHAIN) docs --document-private-items --exclude rollup-node-chain-orchestrator .PHONY: pr pr: lint test docs diff --git a/crates/chain-orchestrator/src/event.rs b/crates/chain-orchestrator/src/event.rs index 027dc118..2c061f20 100644 --- a/crates/chain-orchestrator/src/event.rs +++ b/crates/chain-orchestrator/src/event.rs @@ -14,6 +14,8 @@ use scroll_network::NewBlockWithPeer; pub enum ChainOrchestratorEvent { /// A received block failed the consensus checks. BlockFailedConsensusChecks(B256, PeerId), + /// A finalized block was received from a peer. + L2FinalizedBlockReceived(B256, PeerId), /// A new block has been received from the network but we have insufficient data to process it /// due to being in optimistic mode. InsufficientDataForReceivedBlock(B256), diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 64d5d657..8778d515 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -937,6 +937,17 @@ impl< ) -> Result, ChainOrchestratorError> { tracing::debug!(target: "scroll::chain_orchestrator", block_hash = ?block_with_peer.block.header.hash_slow(), block_number = ?block_with_peer.block.number, peer_id = ?block_with_peer.peer_id, "Received new block from peer"); + // Check we are not handling a finalized block. + if block_with_peer.block.header.number <= self.engine.fcs().finalized_block_info().number { + self.network + .handle() + .block_import_outcome(BlockImportOutcome::finalized_block(block_with_peer.peer_id)); + return Ok(Some(ChainOrchestratorEvent::L2FinalizedBlockReceived( + block_with_peer.block.header.hash_slow(), + block_with_peer.peer_id, + ))); + } + if let Err(err) = self.consensus.validate_new_block(&block_with_peer.block, &block_with_peer.signature) { diff --git a/crates/network/src/import.rs b/crates/network/src/import.rs index 281998b7..df723eb4 100644 --- a/crates/network/src/import.rs +++ b/crates/network/src/import.rs @@ -17,6 +17,11 @@ pub struct BlockImportOutcome { } impl BlockImportOutcome { + /// Creates a new `BlockImportOutcome` instance for a finalized block with the given peer ID. + pub fn finalized_block(peer: PeerId) -> Self { + Self { peer, result: Err(BlockImportError::L2FinalizedBlockReceived(peer)) } + } + /// Creates a new `BlockImportOutcome` instance for an invalid block with the given peer ID. pub fn invalid_block(peer: PeerId) -> Self { Self { peer, result: Err(BlockImportError::Validation(BlockValidationError::InvalidBlock)) } @@ -56,6 +61,8 @@ pub enum BlockImportError { Consensus(ConsensusError), /// An error occurred during block validation. Validation(BlockValidationError), + /// A finalized block was received from a peer. + L2FinalizedBlockReceived(PeerId), } /// A consensus related error that can occur during block import. diff --git a/crates/network/src/manager.rs b/crates/network/src/manager.rs index f1353ac5..52d986df 100644 --- a/crates/network/src/manager.rs +++ b/crates/network/src/manager.rs @@ -20,8 +20,8 @@ use reth_tokio_util::{EventSender, EventStream}; use rollup_node_primitives::{sig_encode_hash, BlockInfo}; use scroll_alloy_hardforks::ScrollHardforks; use scroll_wire::{ - NewBlock, ScrollWireConfig, ScrollWireEvent, ScrollWireManager, ScrollWireProtocolHandler, - LRU_CACHE_SIZE, + NewBlock, PeerBlockState, ScrollWireConfig, ScrollWireEvent, ScrollWireManager, + ScrollWireProtocolHandler, LRU_CACHE_SIZE, }; use std::{ pin::Pin, @@ -184,12 +184,26 @@ impl< // Compute the block hash. let hash = block.block.hash_slow(); - // Filter the peers that have not seen this block hash. + // Filter the peers that have not seen this block hash via either protocol. + // We iterate over all connected scroll-wire peers. let peers: Vec> = self .scroll_wire - .state() - .iter() - .filter_map(|(peer_id, blocks)| (!blocks.contains(&hash)).then_some(*peer_id)) + .connected_peers() + .filter_map(|peer_id| { + // Check if peer has seen this block via any protocol + let has_seen = self + .scroll_wire + .peer_block_state() + .get(peer_id) + .is_some_and(|state| state.has_seen(&hash)); + + // Only announce if peer hasn't seen this block + if !has_seen { + Some(*peer_id) + } else { + None + } + }) .collect(); // TODO: remove this once we deprecate l2geth. @@ -240,15 +254,35 @@ impl< ScrollWireEvent::NewBlock { peer_id, block, signature } => { let block_hash = block.hash_slow(); trace!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, signature = ?signature, "Received new block"); + + // Check if this peer has already received this block via scroll-wire, if so + // penalize it. + let state = self + .scroll_wire + .peer_block_state_mut() + .entry(peer_id) + .or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE)); + if state.has_seen_via_scroll_wire(&block_hash) { + tracing::warn!(target: "scroll::network::manager", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block via scroll-wire, penalizing"); + self.inner_network_handle.reputation_change( + peer_id, + reth_network_api::ReputationChangeKind::BadBlock, + ); + return None; + } else { + // Update the state: peer has seen this block via scroll-wire + state.insert_scroll_wire(block_hash); + } + if self.blocks_seen.contains(&(block_hash, signature)) { None } else { - // Update the state of the peer cache i.e. peer has seen this block. + // Update the state: peer has seen this block via scroll-wire self.scroll_wire - .state_mut() + .peer_block_state_mut() .entry(peer_id) - .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) - .insert(block_hash); + .or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE)) + .insert_scroll_wire(block_hash); // Update the state of the block cache i.e. we have seen this block. self.blocks_seen.insert((block.hash_slow(), signature)); @@ -310,6 +344,11 @@ impl< self.inner_network_handle .reputation_change(peer, reth_network_api::ReputationChangeKind::BadBlock); } + Err(BlockImportError::L2FinalizedBlockReceived(peer)) => { + trace!(target: "scroll::network::manager", peer_id = ?peer, "Block import failed - finalized block received - penalizing peer"); + self.inner_network_handle + .reputation_change(peer, reth_network_api::ReputationChangeKind::BadBlock); + } } } @@ -339,17 +378,35 @@ impl< .and_then(|i| Signature::from_raw(&extra_data[i..]).ok()) { let block_hash = block.hash_slow(); + + // Check if this peer has already sent this block to us via eth-wire, if so penalize it. + let state = self + .scroll_wire + .peer_block_state_mut() + .entry(peer_id) + .or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE)); + + if state.has_seen_via_eth_wire(&block_hash) { + tracing::warn!(target: "scroll::bridge::import", peer_id = ?peer_id, block = ?block_hash, "Peer sent duplicate block via eth-wire, penalizing"); + self.inner_network_handle + .reputation_change(peer_id, reth_network_api::ReputationChangeKind::BadBlock); + return None; + } else { + // Update the state: peer has seen this block via eth-wire + state.insert_eth_wire(block_hash); + } + if self.blocks_seen.contains(&(block_hash, signature)) { return None; } trace!(target: "scroll::bridge::import", peer_id = %peer_id, block_hash = %block_hash, signature = %signature.to_string(), extra_data = %extra_data.to_string(), "Received new block from eth-wire protocol"); - // Update the state of the peer cache i.e. peer has seen this block. + // Update the state: peer has seen this block via eth-wire self.scroll_wire - .state_mut() + .peer_block_state_mut() .entry(peer_id) - .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) - .insert(block_hash); + .or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE)) + .insert_eth_wire(block_hash); // Update the state of the block cache i.e. we have seen this block. self.blocks_seen.insert((block_hash, signature)); diff --git a/crates/scroll-wire/src/lib.rs b/crates/scroll-wire/src/lib.rs index 886b06ba..25f996f5 100644 --- a/crates/scroll-wire/src/lib.rs +++ b/crates/scroll-wire/src/lib.rs @@ -5,7 +5,7 @@ pub use config::ScrollWireConfig; mod connection; mod manager; -pub use manager::{ScrollWireManager, LRU_CACHE_SIZE}; +pub use manager::{PeerBlockState, ScrollWireManager, LRU_CACHE_SIZE}; mod protocol; pub use protocol::{NewBlock, ScrollWireEvent, ScrollWireProtocolHandler}; diff --git a/crates/scroll-wire/src/manager.rs b/crates/scroll-wire/src/manager.rs index 10a6b1dd..6fc3541d 100644 --- a/crates/scroll-wire/src/manager.rs +++ b/crates/scroll-wire/src/manager.rs @@ -16,6 +16,63 @@ use tracing::trace; /// The size of the LRU cache used to track blocks that have been seen by peers. pub const LRU_CACHE_SIZE: u32 = 100; +/// Tracks block announced and received state for a peer. +#[derive(Debug)] +pub struct PeerBlockState { + /// blocks announced to the peer + announced: LruCache, + /// blocks received via scroll-wire protocol, this is used to penalize peers that send + /// duplicate blocks via scroll-wire. + scroll_wire_received: LruCache, + /// blocks received via eth-wire protocol, this is used to penalize peers that send duplicate + /// blocks via eth-wire. + eth_wire_received: LruCache, +} + +impl PeerBlockState { + /// Creates a new `PeerBlockState` with the specified LRU cache capacity. + pub fn new(capacity: u32) -> Self { + Self { + announced: LruCache::new(capacity), + scroll_wire_received: LruCache::new(capacity), + eth_wire_received: LruCache::new(capacity), + } + } + + /// Check if peer knows about this block (either received or announced). + pub fn has_seen(&self, hash: &B256) -> bool { + self.announced.contains(hash) || + self.scroll_wire_received.contains(hash) || + self.eth_wire_received.contains(hash) + } + + /// Check if peer has received this block via scroll-wire specifically (for duplicate + /// detection). + pub fn has_seen_via_scroll_wire(&self, hash: &B256) -> bool { + self.scroll_wire_received.contains(hash) + } + + /// Check if peer has received this block via eth-wire specifically (for duplicate detection). + pub fn has_seen_via_eth_wire(&self, hash: &B256) -> bool { + self.eth_wire_received.contains(hash) + } + + /// Record that this peer has received a block via scroll-wire. + pub fn insert_scroll_wire(&mut self, hash: B256) { + self.scroll_wire_received.insert(hash); // Track for duplicate detection + } + + /// Record that this peer has received a block via eth-wire. + pub fn insert_eth_wire(&mut self, hash: B256) { + self.eth_wire_received.insert(hash); // Track for duplicate detection + } + + /// Record that we have announced a block to this peer. + pub fn insert_announced(&mut self, hash: B256) { + self.announced.insert(hash); // Only update unified announced, not protocol-specific + } +} + /// A manager for the `ScrollWire` protocol. #[derive(Debug)] pub struct ScrollWireManager { @@ -23,46 +80,54 @@ pub struct ScrollWireManager { events: UnboundedReceiverStream, /// A map of connections to peers. connections: HashMap>, - /// A map of the state of the scroll wire protocol. Currently the state for each peer - /// is just a cache of the last 100 blocks seen by each peer. - state: HashMap>, + /// Unified state tracking block state and blocks received from each peer via both protocols. + peer_block_state: HashMap, } impl ScrollWireManager { /// Creates a new [`ScrollWireManager`] instance. pub fn new(events: UnboundedReceiver) -> Self { trace!(target: "scroll::wire::manager", "Creating new ScrollWireManager instance"); - Self { events: events.into(), connections: HashMap::new(), state: HashMap::new() } + Self { + events: events.into(), + connections: HashMap::new(), + peer_block_state: HashMap::new(), + } } /// Announces a new block to the specified peer. pub fn announce_block(&mut self, peer_id: PeerId, block: &NewBlock, hash: B256) { if let Entry::Occupied(to_connection) = self.connections.entry(peer_id) { // We send the block to the peer. If we receive an error we remove the peer from the - // connections map and delete its state as the connection is no longer valid. + // connections map and peer_block_state as the connection is no longer valid. if to_connection.get().send(ScrollMessage::new_block(block.clone())).is_err() { trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Failed to send block to peer - dropping peer."); - self.state.remove(&peer_id); + self.peer_block_state.remove(&peer_id); to_connection.remove(); } else { - // Upon successful sending of the block we update the state of the peer. trace!(target: "scroll::wire::manager", peer_id = %peer_id, "Announced block to peer"); - self.state + // Record that we announced this block to the peer + self.peer_block_state .entry(peer_id) - .or_insert_with(|| LruCache::new(LRU_CACHE_SIZE)) - .insert(hash); + .or_insert_with(|| PeerBlockState::new(LRU_CACHE_SIZE)) + .insert_announced(hash); } } } - /// Returns the state of the `ScrollWire` protocol. - pub const fn state(&self) -> &HashMap> { - &self.state + /// Returns an iterator over the connected peer IDs. + pub fn connected_peers(&self) -> impl Iterator { + self.connections.keys() + } + + /// Returns a reference to the peer block state map. + pub const fn peer_block_state(&self) -> &HashMap { + &self.peer_block_state } - /// Returns a mutable reference to the state of the `ScrollWire` protocol. - pub const fn state_mut(&mut self) -> &mut HashMap> { - &mut self.state + /// Returns a mutable reference to the peer block state map. + pub const fn peer_block_state_mut(&mut self) -> &mut HashMap { + &mut self.peer_block_state } } @@ -94,7 +159,6 @@ impl Future for ScrollWireManager { direction ); this.connections.insert(peer_id, to_connection); - this.state.insert(peer_id, LruCache::new(100)); } None => break, }