Skip to content

Commit dab4f84

Browse files
authored
introduce remote block source add on (#484)
* introduce remote block source add on * add docker compose tests * lint
1 parent c3ec1a2 commit dab4f84

File tree

19 files changed

+701
-37
lines changed

19 files changed

+701
-37
lines changed

crates/chain-orchestrator/src/handle/command.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ use crate::{ChainOrchestratorEvent, ChainOrchestratorStatus};
33
use reth_network_api::FullNetwork;
44
use reth_scroll_node::ScrollNetworkPrimitives;
55
use reth_tokio_util::EventStream;
6-
use rollup_node_primitives::{BlockInfo, L1MessageEnvelope};
6+
use rollup_node_primitives::{BlockInfo, ChainImport, L1MessageEnvelope};
77
use scroll_db::L1MessageKey;
8-
use scroll_network::ScrollNetworkHandle;
8+
use scroll_network::{NewBlockWithPeer, ScrollNetworkHandle};
99
use tokio::sync::oneshot;
1010

1111
/// The commands that can be sent to the rollup manager.
@@ -29,6 +29,13 @@ pub enum ChainOrchestratorCommand<N: FullNetwork<Primitives = ScrollNetworkPrimi
2929
DatabaseQuery(DatabaseQuery),
3030
/// Revert the rollup node state to the specified L1 block number.
3131
RevertToL1Block((u64, oneshot::Sender<bool>)),
32+
/// Import a block from a remote source.
33+
ImportBlock {
34+
/// The block to import with peer info
35+
block_with_peer: NewBlockWithPeer,
36+
/// Response channel
37+
response: oneshot::Sender<Result<ChainImport, String>>,
38+
},
3239
/// Enable gossiping of blocks to peers.
3340
#[cfg(feature = "test-utils")]
3441
SetGossip((bool, oneshot::Sender<()>)),

crates/chain-orchestrator/src/handle/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ use super::ChainOrchestratorEvent;
55
use reth_network_api::FullNetwork;
66
use reth_scroll_node::ScrollNetworkPrimitives;
77
use reth_tokio_util::EventStream;
8-
use rollup_node_primitives::{BlockInfo, L1MessageEnvelope};
8+
use rollup_node_primitives::{BlockInfo, ChainImport, L1MessageEnvelope};
99
use scroll_db::L1MessageKey;
10-
use scroll_network::ScrollNetworkHandle;
10+
use scroll_network::{NewBlockWithPeer, ScrollNetworkHandle};
1111
use tokio::sync::{mpsc, oneshot};
1212
use tracing::error;
1313

@@ -132,6 +132,16 @@ impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> ChainOrchestratorHand
132132
rx.await
133133
}
134134

135+
/// Import a block from a remote source.
136+
pub async fn import_block(
137+
&self,
138+
block_with_peer: NewBlockWithPeer,
139+
) -> Result<Result<ChainImport, String>, oneshot::error::RecvError> {
140+
let (tx, rx) = oneshot::channel();
141+
self.send_command(ChainOrchestratorCommand::ImportBlock { block_with_peer, response: tx });
142+
rx.await
143+
}
144+
135145
/// Sends a command to the rollup manager to enable or disable gossiping of blocks to peers.
136146
#[cfg(feature = "test-utils")]
137147
pub async fn set_gossip(&self, enabled: bool) -> Result<(), oneshot::error::RecvError> {

crates/chain-orchestrator/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,13 @@ impl<
425425
self.notify(ChainOrchestratorEvent::UnwoundToL1Block(block_number));
426426
let _ = tx.send(true);
427427
}
428+
ChainOrchestratorCommand::ImportBlock { block_with_peer, response } => {
429+
let result = self
430+
.import_chain(vec![block_with_peer.block.clone()], block_with_peer)
431+
.await
432+
.map_err(|e| e.to_string());
433+
let _ = response.send(result);
434+
}
428435
#[cfg(feature = "test-utils")]
429436
ChainOrchestratorCommand::SetGossip((enabled, tx)) => {
430437
self.network.handle().set_gossip(enabled).await;
@@ -1218,6 +1225,7 @@ impl<
12181225
chain,
12191226
peer_id: block_with_peer.peer_id,
12201227
signature: block_with_peer.signature,
1228+
result,
12211229
})
12221230
}
12231231

crates/node/Cargo.toml

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ async-trait.workspace = true
1919

2020
# alloy
2121
alloy-chains.workspace = true
22+
alloy-eips.workspace = true
2223
alloy-primitives.workspace = true
2324
alloy-provider.workspace = true
2425
alloy-rpc-client.workspace = true
26+
alloy-rpc-types-engine.workspace = true
2527
alloy-signer-local.workspace = true
2628
alloy-signer-aws = "1.0.30"
2729
alloy-signer = "1.0.30"
@@ -59,6 +61,7 @@ reth-rpc-api.workspace = true
5961
reth-rpc-eth-api.workspace = true
6062
reth-rpc-eth-types.workspace = true
6163
reth-tasks.workspace = true
64+
reth-tokio-util.workspace = true
6265
reth-transaction-pool.workspace = true
6366
reth-trie-db.workspace = true
6467

@@ -76,16 +79,13 @@ aws-config = "1.8.0"
7679
aws-sdk-kms = "1.76.0"
7780

7881
# test-utils
79-
alloy-eips = { workspace = true, optional = true }
8082
alloy-rpc-types-eth = { workspace = true, optional = true }
81-
alloy-rpc-types-engine = { workspace = true, optional = true }
8283
reth-e2e-test-utils = { workspace = true, optional = true }
8384
reth-engine-local = { workspace = true, optional = true }
8485
reth-provider = { workspace = true, optional = true }
8586
reth-rpc-layer = { workspace = true, optional = true }
8687
reth-rpc-server-types = { workspace = true, optional = true }
8788
reth-storage-api = { workspace = true, optional = true }
88-
reth-tokio-util = { workspace = true, optional = true }
8989
scroll-alloy-rpc-types-engine = { workspace = true, optional = true }
9090
scroll-alloy-rpc-types.workspace = true
9191

@@ -154,14 +154,11 @@ test-utils = [
154154
"reth-e2e-test-utils",
155155
"reth-rpc-server-types",
156156
"reth-rpc-layer",
157-
"reth-tokio-util",
158157
"scroll-alloy-rpc-types-engine",
159-
"alloy-rpc-types-engine",
160158
"reth-primitives-traits/test-utils",
161159
"reth-network-p2p/test-utils",
162160
"rollup-node-chain-orchestrator/test-utils",
163161
"scroll-network/test-utils",
164-
"alloy-eips",
165162
"reth-storage-api",
166163
"alloy-rpc-types-eth",
167164
]

crates/node/src/add_ons/mod.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ use std::sync::Arc;
3232
mod handle;
3333
pub use handle::ScrollAddOnsHandle;
3434

35+
mod remote_block_source;
36+
pub use remote_block_source::RemoteBlockSourceAddOn;
37+
3538
mod rpc;
3639
pub use rpc::{
3740
RollupNodeAdminApiClient, RollupNodeAdminApiServer, RollupNodeApiClient, RollupNodeApiServer,
@@ -133,6 +136,8 @@ where
133136

134137
let (tx, rx) = tokio::sync::oneshot::channel();
135138
let rpc_config = rollup_node_manager_addon.config().rpc_args.clone();
139+
let remote_block_source_config =
140+
rollup_node_manager_addon.config().remote_block_source_args.clone();
136141

137142
// Register rollupNode API and rollupNodeAdmin API if enabled
138143
let rollup_node_rpc_ext = Arc::new(RollupNodeRpcExt::<N::Network>::new(rx));
@@ -161,6 +166,22 @@ where
161166
.map_err(|_| eyre::eyre!("failed to send rollup manager handle"))?;
162167
}
163168

169+
// Launch remote block source if enabled
170+
if remote_block_source_config.enabled {
171+
let remote_source = RemoteBlockSourceAddOn::new(
172+
remote_block_source_config,
173+
rollup_manager_handle.clone(),
174+
)
175+
.await?;
176+
ctx.node
177+
.task_executor()
178+
.spawn_critical_with_shutdown_signal("remote_block_source", |shutdown| async move {
179+
if let Err(e) = remote_source.run_until_shutdown(shutdown).await {
180+
tracing::error!(target: "scroll::remote_source", ?e, "Remote block source failed");
181+
}
182+
});
183+
}
184+
164185
Ok(ScrollAddOnsHandle { rollup_manager_handle, rpc_handle })
165186
}
166187
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
//! Remote block source add-on for importing blocks from a remote L2 node
2+
//! and building new blocks on top.
3+
4+
use crate::args::RemoteBlockSourceArgs;
5+
use alloy_primitives::Signature;
6+
use alloy_provider::{Provider, ProviderBuilder};
7+
use alloy_rpc_client::RpcClient;
8+
use alloy_transport::layers::RetryBackoffLayer;
9+
use futures::StreamExt;
10+
use reth_network_api::{FullNetwork, PeerId};
11+
use reth_scroll_node::ScrollNetworkPrimitives;
12+
use reth_tasks::shutdown::Shutdown;
13+
use reth_tokio_util::EventStream;
14+
use rollup_node_chain_orchestrator::{ChainOrchestratorEvent, ChainOrchestratorHandle};
15+
use scroll_alloy_network::Scroll;
16+
use scroll_network::NewBlockWithPeer;
17+
use tokio::time::{interval, Duration};
18+
19+
/// Remote block source add-on that imports blocks from a trusted remote L2 node
20+
/// and triggers block building on top of each imported block.
21+
#[derive(Debug)]
22+
pub struct RemoteBlockSourceAddOn<N>
23+
where
24+
N: FullNetwork<Primitives = ScrollNetworkPrimitives>,
25+
{
26+
/// Configuration for the remote block source.
27+
config: RemoteBlockSourceArgs,
28+
/// Handle to the chain orchestrator for sending commands.
29+
handle: ChainOrchestratorHandle<N>,
30+
/// Tracks the last block number we imported from remote.
31+
/// This is different from local head because we build blocks on top of imports.
32+
last_imported_block: u64,
33+
}
34+
35+
impl<N> RemoteBlockSourceAddOn<N>
36+
where
37+
N: FullNetwork<Primitives = ScrollNetworkPrimitives> + Send + Sync + 'static,
38+
{
39+
/// Creates a new remote block source add-on.
40+
pub async fn new(
41+
config: RemoteBlockSourceArgs,
42+
handle: ChainOrchestratorHandle<N>,
43+
) -> eyre::Result<Self> {
44+
let last_imported_block = handle.status().await?.l2.fcs.head_block_info().number;
45+
Ok(Self { config, handle, last_imported_block })
46+
}
47+
48+
/// Runs the remote block source until shutdown.
49+
pub async fn run_until_shutdown(mut self, mut shutdown: Shutdown) -> eyre::Result<()> {
50+
let Some(url) = self.config.url.clone() else {
51+
tracing::error!(target: "scroll::remote_source", "URL required when remote-source is enabled");
52+
return Err(eyre::eyre!("URL required when remote-source is enabled"));
53+
};
54+
55+
// Build remote provider with retry layer
56+
let retry_layer = RetryBackoffLayer::new(10, 100, 330);
57+
let client = RpcClient::builder().layer(retry_layer).http(url);
58+
let remote = ProviderBuilder::<_, _, Scroll>::default().connect_client(client);
59+
60+
// Get event listener for waiting on block completion
61+
let mut event_stream = match self.handle.get_event_listener().await {
62+
Ok(stream) => stream,
63+
Err(e) => {
64+
tracing::error!(target: "scroll::remote_source", ?e, "Failed to get event listener");
65+
return Err(eyre::eyre!(e));
66+
}
67+
};
68+
69+
let mut poll_interval = interval(Duration::from_millis(self.config.poll_interval_ms));
70+
71+
loop {
72+
tokio::select! {
73+
biased;
74+
_guard = &mut shutdown => break,
75+
_ = poll_interval.tick() => {
76+
if let Err(e) = self.follow_and_build(&remote, &mut event_stream).await {
77+
tracing::error!(target: "scroll::remote_source", ?e, "Sync error");
78+
}
79+
}
80+
}
81+
}
82+
83+
Ok(())
84+
}
85+
86+
/// Follows the remote node and builds blocks on top of imported blocks.
87+
async fn follow_and_build<P: Provider<Scroll>>(
88+
&mut self,
89+
remote: &P,
90+
event_stream: &mut EventStream<ChainOrchestratorEvent>,
91+
) -> eyre::Result<()> {
92+
loop {
93+
// Get remote head
94+
let remote_block = remote
95+
.get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
96+
.full()
97+
.await?
98+
.ok_or_else(|| eyre::eyre!("Remote block not found"))?;
99+
100+
let remote_head = remote_block.header.number;
101+
102+
// Compare against last imported block
103+
if remote_head <= self.last_imported_block {
104+
tracing::trace!(target: "scroll::remote_source",
105+
last_imported = self.last_imported_block,
106+
remote_head,
107+
"Already synced with remote");
108+
return Ok(());
109+
}
110+
111+
let blocks_behind = remote_head - self.last_imported_block;
112+
tracing::info!(target: "scroll::remote_source",
113+
last_imported = self.last_imported_block,
114+
remote_head,
115+
blocks_behind,
116+
"Catching up");
117+
118+
// Fetch and import the next block from remote
119+
let next_block_num = self.last_imported_block + 1;
120+
let block = remote
121+
.get_block_by_number(next_block_num.into())
122+
.full()
123+
.await?
124+
.ok_or_else(|| eyre::eyre!("Block {} not found", next_block_num))?
125+
.into_consensus()
126+
.map_transactions(|tx| tx.inner.into_inner());
127+
128+
// Create NewBlockWithPeer with dummy peer_id and signature (trusted source)
129+
let block_with_peer = NewBlockWithPeer {
130+
peer_id: PeerId::default(),
131+
block,
132+
signature: Signature::new(Default::default(), Default::default(), false),
133+
};
134+
135+
// Import the block (this will cause a reorg if we had a locally built block at this
136+
// height)
137+
let chain_import = match self.handle.import_block(block_with_peer).await {
138+
Ok(Ok(chain_import)) => {
139+
self.last_imported_block = next_block_num;
140+
chain_import
141+
}
142+
Ok(Err(e)) => {
143+
return Err(eyre::eyre!("Import block failed: {}", e));
144+
}
145+
Err(e) => {
146+
return Err(eyre::eyre!("chain orchestrator command channel error: {}", e));
147+
}
148+
};
149+
150+
if !chain_import.result.is_valid() {
151+
tracing::info!(target: "scroll::remote_source",
152+
result = ?chain_import.result,
153+
"Imported block is not valid according to forkchoice, skipping build");
154+
continue;
155+
}
156+
157+
// Trigger block building on top of the imported block
158+
self.handle.build_block();
159+
160+
// Wait for BlockSequenced event
161+
tracing::debug!(target: "scroll::remote_source", "Waiting for block to be built...");
162+
loop {
163+
match event_stream.next().await {
164+
Some(ChainOrchestratorEvent::BlockSequenced(block)) => {
165+
tracing::info!(target: "scroll::remote_source",
166+
block_number = block.header.number,
167+
block_hash = ?block.hash_slow(),
168+
"Block built successfully, proceeding to next");
169+
break;
170+
}
171+
Some(_) => {
172+
// Ignore other events, keep waiting
173+
}
174+
None => {
175+
return Err(eyre::eyre!("Event stream ended unexpectedly"));
176+
}
177+
}
178+
}
179+
180+
// Loop continues to process next block
181+
}
182+
}
183+
}

0 commit comments

Comments
 (0)