From c0ba172444f8e81ce59d4b08747d43624e0938ec Mon Sep 17 00:00:00 2001 From: playX18 Date: Tue, 28 Apr 2026 13:17:40 +0700 Subject: [PATCH 1/8] feat(ethexe/blob-loader): extend BlobLoader tests and add Mock BlobReader --- Cargo.lock | 1 + ethexe/blob-loader/Cargo.toml | 1 + ethexe/blob-loader/src/lib.rs | 338 +++++++++++++++++++++++++++++++--- 3 files changed, 311 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8ec364773b2..ee0836851f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5110,6 +5110,7 @@ version = "1.10.0" dependencies = [ "alloy", "ethexe-common", + "ethexe-db", "ethexe-ethereum", "futures", "gear-workspace-hack", diff --git a/ethexe/blob-loader/Cargo.toml b/ethexe/blob-loader/Cargo.toml index 32ab8b739f2..4cc42d1e9a3 100644 --- a/ethexe/blob-loader/Cargo.toml +++ b/ethexe/blob-loader/Cargo.toml @@ -22,6 +22,7 @@ tokio = { workspace = true, features = ["rt-multi-thread", "fs"] } gear-workspace-hack.workspace = true [dev-dependencies] +ethexe-db.workspace = true gprimitives = { workspace = true, features = ["ethexe", "std"] } gsigner.workspace = true ethexe-ethereum.workspace = true diff --git a/ethexe/blob-loader/src/lib.rs b/ethexe/blob-loader/src/lib.rs index b10ea14e976..47fa872a34e 100644 --- a/ethexe/blob-loader/src/lib.rs +++ b/ethexe/blob-loader/src/lib.rs @@ -86,6 +86,26 @@ enum ReaderError { type LoaderResult = Result; type ReaderResult = Result; +trait BlobReader: Clone + Send + Unpin + 'static { + fn read_blob( + &self, + expected_code_id: CodeId, + tx_hash: H256, + ) -> BoxFuture<'static, ReaderResult>>; +} + +impl BlobReader for ConsensusLayerBlobReader { + fn read_blob( + &self, + expected_code_id: CodeId, + tx_hash: H256, + ) -> BoxFuture<'static, ReaderResult>> { + let reader = self.clone(); + async move { ConsensusLayerBlobReader::read_blob(&reader, expected_code_id, tx_hash).await } + .boxed() + } +} + pub trait BlobLoaderService: Stream> + FusedStream + Send + Unpin { @@ -246,15 +266,16 @@ impl ConsensusLayerBlobReader { pub trait Database: CodesStorageRO + OnChainStorageRO + Unpin + Send + Clone + 'static {} impl Database for T {} -pub struct BlobLoader { +#[allow(private_bounds, private_interfaces)] +pub struct BlobLoader { futures: FuturesUnordered>>, codes_loading: HashSet, - blobs_reader: ConsensusLayerBlobReader, + blobs_reader: R, db: DB, } -impl Stream for BlobLoader { +impl Stream for BlobLoader { type Item = LoaderResult; fn poll_next( @@ -276,13 +297,13 @@ impl Stream for BlobLoader { } } -impl FusedStream for BlobLoader { +impl FusedStream for BlobLoader { fn is_terminated(&self) -> bool { false } } -impl BlobLoader { +impl BlobLoader { pub async fn new(db: DB, consensus_cfg: ConsensusLayerConfig) -> LoaderResult { Ok(Self { futures: FuturesUnordered::new(), @@ -300,7 +321,20 @@ impl BlobLoader { } } -impl BlobLoaderService for BlobLoader { +#[allow(private_bounds)] +impl BlobLoader { + #[cfg(test)] + fn new_with_reader(db: DB, blobs_reader: R) -> Self { + Self { + futures: FuturesUnordered::new(), + codes_loading: HashSet::new(), + blobs_reader, + db, + } + } +} + +impl BlobLoaderService for BlobLoader { fn into_box(self) -> Box { Box::new(self) } @@ -383,14 +417,109 @@ fn handle_blob( #[cfg(test)] mod tests { use super::*; - use alloy::{node_bindings::Anvil, providers::ext::AnvilApi}; - use ethexe_common::gear_core::ids::prelude::CodeIdExt; + use alloy::node_bindings::Anvil; + use ethexe_common::{ + CodeBlobInfo, + db::{CodesStorageRW, OnChainStorageRW}, + gear_core::ids::prelude::CodeIdExt, + }; + use ethexe_db::Database as EthexeDatabase; use ethexe_ethereum::deploy::EthereumDeployer; + use futures::{FutureExt, StreamExt}; use gsigner::secp256k1::{PrivateKey, Signer}; + use std::{ + collections::HashMap, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + }; + use tokio::time::{Duration, timeout}; + + const BLOB_CHUNK_SIZE: usize = 128 * 1024; + + #[derive(Clone)] + #[allow(dead_code)] + enum MockReadResult { + Ok(Vec), + TransactionNotFound, + } - #[ignore = "until blob will be available on beacon node"] - #[tokio::test] - async fn test_read_code_from_tx_hash() { + impl MockReadResult { + fn into_reader_result(self, tx_hash: H256) -> ReaderResult> { + match self { + Self::Ok(code) => Ok(code), + Self::TransactionNotFound => Err(ReaderError::TransactionNotFound(tx_hash)), + } + } + } + + #[derive(Clone, Default)] + struct MockBlobReader { + responses: Arc>, + calls: Arc, + } + + #[allow(dead_code)] + impl MockBlobReader { + fn with_response(tx_hash: H256, response: MockReadResult) -> Self { + Self { + responses: Arc::new(HashMap::from([(tx_hash, response)])), + calls: Arc::new(AtomicUsize::new(0)), + } + } + + fn calls(&self) -> usize { + self.calls.load(Ordering::SeqCst) + } + } + + impl BlobReader for MockBlobReader { + fn read_blob( + &self, + _expected_code_id: CodeId, + tx_hash: H256, + ) -> BoxFuture<'static, ReaderResult>> { + self.calls.fetch_add(1, Ordering::SeqCst); + let response = self + .responses + .get(&tx_hash) + .cloned() + .unwrap_or(MockReadResult::TransactionNotFound) + .into_reader_result(tx_hash); + + futures::future::ready(response).boxed() + } + } + + fn generated_code(len: usize) -> Vec { + (0..len).map(|i| (i % 251) as u8).collect() + } + + fn set_blob_info(db: &EthexeDatabase, code_id: CodeId, tx_hash: H256) { + db.set_code_blob_info( + code_id, + CodeBlobInfo { + timestamp: 0, + tx_hash, + }, + ); + } + + async fn expect_blob_loaded( + loader: &mut BlobLoader, + ) -> CodeAndIdUnchecked { + match timeout(Duration::from_secs(2), loader.next()) + .await + .expect("loader must emit before timeout") + .expect("loader stream should yield an event") + .expect("loader event should be ok") + { + BlobLoaderEvent::BlobLoaded(code_and_id) => code_and_id, + } + } + + async fn run_anvil_blob_loader_test(code: Vec) { let signer = Signer::memory(); let private_key: PrivateKey = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" @@ -417,30 +546,181 @@ mod tests { attempts: const { NonZero::new(3).unwrap() }, }; - let blobs_reader = ConsensusLayerBlobReader { - provider: ProviderBuilder::default() - .connect(&consensus_cfg.ethereum_rpc) - .await - .unwrap(), - http_client: Client::new(), - config: consensus_cfg, - }; - - let code = &[]; - let (tx_hash, expected_code_id) = ethereum + let (tx_hash, code_id) = ethereum .router() - .request_code_validation(code) + .request_code_validation(&code) .await .unwrap(); - // set chain id to 1 to avoid anvil special case - blobs_reader.provider.anvil_set_chain_id(1).await.unwrap(); + // SAFETY: The in-memory database is isolated to this test and does not share state. + let db = unsafe { EthexeDatabase::memory() }; + set_blob_info(&db, code_id, tx_hash); - let code = blobs_reader - .read_blob(expected_code_id, tx_hash) + let mut loader = BlobLoader::new(db, consensus_cfg) .await - .unwrap(); - assert_eq!(expected_code_id, CodeId::generate(&code)); + .expect("blob loader should connect to anvil"); + loader + .load_codes(HashSet::from([code_id])) + .expect("CodeBlobInfo was inserted"); + + let loaded = expect_blob_loaded(&mut loader).await; + assert_eq!(loaded.code_id, code_id); + assert_eq!(loaded.code, code); + } + + #[tokio::test] + async fn load_codes_fails_when_code_blob_info_is_missing() { + // SAFETY: The in-memory database is isolated to this test and does not share state. + let db = unsafe { EthexeDatabase::memory() }; + let mut loader = BlobLoader::new_with_reader(db, MockBlobReader::default()); + let code_id = CodeId::generate(&[1, 2, 3, 4]); + + let err = loader + .load_codes(HashSet::from([code_id])) + .expect_err("missing CodeBlobInfo must fail"); + + assert!(matches!(err, BlobLoaderError::CodeBlobInfoNotFound(id) if id == code_id)); + assert_eq!(loader.pending_codes_len(), 0); + } + + #[tokio::test] + async fn already_loaded_code_is_emitted_without_remote_read() { + // SAFETY: The in-memory database is isolated to this test and does not share state. + let db = unsafe { EthexeDatabase::memory() }; + let code = generated_code(64); + let code_id = db.set_original_code(&code); + let tx_hash = H256::random(); + set_blob_info(&db, code_id, tx_hash); + + let reader = MockBlobReader::with_response(tx_hash, MockReadResult::TransactionNotFound); + let mut loader = BlobLoader::new_with_reader(db, reader.clone()); + + loader + .load_codes(HashSet::from([code_id])) + .expect("metadata exists"); + + assert_eq!(loader.pending_codes_len(), 1); + let loaded = expect_blob_loaded(&mut loader).await; + + assert_eq!(loaded.code_id, code_id); + assert_eq!(loaded.code, code); + assert_eq!(reader.calls(), 0); + assert_eq!(loader.pending_codes_len(), 0); + } + + #[tokio::test] + async fn remote_code_is_emitted_and_pending_state_is_cleared() { + // SAFETY: The in-memory database is isolated to this test and does not share state. + let db = unsafe { EthexeDatabase::memory() }; + let code = generated_code(128); + let code_id = CodeId::generate(&code); + let tx_hash = H256::random(); + set_blob_info(&db, code_id, tx_hash); + + let reader = MockBlobReader::with_response(tx_hash, MockReadResult::Ok(code.clone())); + let mut loader = BlobLoader::new_with_reader(db, reader.clone()); + + loader + .load_codes(HashSet::from([code_id])) + .expect("metadata exists"); + + assert_eq!(loader.pending_codes_len(), 1); + let loaded = expect_blob_loaded(&mut loader).await; + + assert_eq!(loaded.code_id, code_id); + assert_eq!(loaded.code, code); + assert_eq!(reader.calls(), 1); + assert_eq!(loader.pending_codes_len(), 0); + } + + #[tokio::test] + async fn remote_code_larger_than_three_blob_chunks_round_trips() { + // SAFETY: The in-memory database is isolated to this test and does not share state. + let db = unsafe { EthexeDatabase::memory() }; + let code = generated_code(3 * BLOB_CHUNK_SIZE + 17); + let code_id = CodeId::generate(&code); + let tx_hash = H256::random(); + set_blob_info(&db, code_id, tx_hash); + + let reader = MockBlobReader::with_response(tx_hash, MockReadResult::Ok(code.clone())); + let mut loader = BlobLoader::new_with_reader(db, reader.clone()); + + loader + .load_codes(HashSet::from([code_id])) + .expect("metadata exists"); + + let loaded = expect_blob_loaded(&mut loader).await; + + assert_eq!(loaded.code_id, code_id); + assert_eq!(loaded.code.len(), 3 * BLOB_CHUNK_SIZE + 17); + assert_eq!(loaded.code, code); + assert_eq!(reader.calls(), 1); + assert_eq!(loader.pending_codes_len(), 0); + } + + #[tokio::test] + async fn reader_failure_does_not_emit_success_or_terminate_stream() { + // SAFETY: The in-memory database is isolated to this test and does not share state. + let db = unsafe { EthexeDatabase::memory() }; + let code = generated_code(128); + let code_id = CodeId::generate(&code); + let tx_hash = H256::random(); + set_blob_info(&db, code_id, tx_hash); + + let reader = MockBlobReader::with_response(tx_hash, MockReadResult::TransactionNotFound); + let mut loader = BlobLoader::new_with_reader(db, reader.clone()); + + loader + .load_codes(HashSet::from([code_id])) + .expect("metadata exists"); + + let no_event = timeout(Duration::from_millis(100), loader.next()).await; + + assert!( + no_event.is_err(), + "reader failure should be logged and skipped, not emitted as success" + ); + assert!(!loader.is_terminated()); + assert_eq!(reader.calls(), 1); + } + + #[tokio::test] + async fn repeated_load_codes_for_pending_code_schedules_one_remote_read() { + // SAFETY: The in-memory database is isolated to this test and does not share state. + let db = unsafe { EthexeDatabase::memory() }; + let code = generated_code(128); + let code_id = CodeId::generate(&code); + let tx_hash = H256::random(); + set_blob_info(&db, code_id, tx_hash); + + let reader = MockBlobReader::with_response(tx_hash, MockReadResult::Ok(code)); + let mut loader = BlobLoader::new_with_reader(db, reader.clone()); + + loader + .load_codes(HashSet::from([code_id])) + .expect("first request should be accepted"); + loader + .load_codes(HashSet::from([code_id])) + .expect("duplicate pending request should be ignored"); + + assert_eq!(loader.pending_codes_len(), 1); + let _ = expect_blob_loaded(&mut loader).await; + assert!( + loader.next().now_or_never().is_none(), + "duplicate pending request must not queue another ready event" + ); + assert_eq!(reader.calls(), 1); + assert_eq!(loader.pending_codes_len(), 0); + } + + #[tokio::test] + async fn blob_loader_reads_code_from_anvil_tx() { + run_anvil_blob_loader_test(generated_code(128)).await; + } + + #[tokio::test] + async fn blob_loader_reads_code_larger_than_three_blob_chunks_from_anvil_tx() { + run_anvil_blob_loader_test(generated_code(3 * BLOB_CHUNK_SIZE + 17)).await; } #[test] From 33db4b20b5efbd00075a6e47129d00c0fac093ad Mon Sep 17 00:00:00 2001 From: playX18 Date: Tue, 28 Apr 2026 13:24:32 +0700 Subject: [PATCH 2/8] hakari --- utils/gear-workspace-hack/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils/gear-workspace-hack/Cargo.toml b/utils/gear-workspace-hack/Cargo.toml index fad54de2c84..d148da49993 100644 --- a/utils/gear-workspace-hack/Cargo.toml +++ b/utils/gear-workspace-hack/Cargo.toml @@ -214,7 +214,7 @@ branch = "gear-polkadot-stable2409-wasm32v1-none" features = ["test-helpers"] ### BEGIN HAKARI SECTION -[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +[dependencies] aes = { version = "0.8", default-features = false, features = ["zeroize"] } ahash = { version = "0.8" } alloy = { version = "2", features = ["kzg", "node-bindings", "provider-anvil-api", "provider-ws", "rpc-types-beacon", "rpc-types-eth", "signer-mnemonic"] } @@ -486,7 +486,7 @@ wasmtime-runtime = { version = "8", default-features = false, features = ["async winnow = { version = "0.7" } zeroize = { version = "1", features = ["derive", "std"] } -[target.'cfg(not(target_arch = "wasm32"))'.build-dependencies] +[build-dependencies] aes = { version = "0.8", default-features = false, features = ["zeroize"] } ahash = { version = "0.8" } alloy = { version = "2", features = ["kzg", "node-bindings", "provider-anvil-api", "provider-ws", "rpc-types-beacon", "rpc-types-eth", "signer-mnemonic"] } From e3474165e3e6db20e08ccd899d108eb2d5a0d8fa Mon Sep 17 00:00:00 2001 From: playX18 Date: Tue, 28 Apr 2026 15:18:48 +0700 Subject: [PATCH 3/8] remove mock blob reader --- ethexe/blob-loader/src/lib.rs | 457 +++++++++++++++++++++------------- 1 file changed, 288 insertions(+), 169 deletions(-) diff --git a/ethexe/blob-loader/src/lib.rs b/ethexe/blob-loader/src/lib.rs index 47fa872a34e..1c88fe0929f 100644 --- a/ethexe/blob-loader/src/lib.rs +++ b/ethexe/blob-loader/src/lib.rs @@ -86,26 +86,6 @@ enum ReaderError { type LoaderResult = Result; type ReaderResult = Result; -trait BlobReader: Clone + Send + Unpin + 'static { - fn read_blob( - &self, - expected_code_id: CodeId, - tx_hash: H256, - ) -> BoxFuture<'static, ReaderResult>>; -} - -impl BlobReader for ConsensusLayerBlobReader { - fn read_blob( - &self, - expected_code_id: CodeId, - tx_hash: H256, - ) -> BoxFuture<'static, ReaderResult>> { - let reader = self.clone(); - async move { ConsensusLayerBlobReader::read_blob(&reader, expected_code_id, tx_hash).await } - .boxed() - } -} - pub trait BlobLoaderService: Stream> + FusedStream + Send + Unpin { @@ -134,9 +114,9 @@ pub struct ConsensusLayerConfig { #[derive(Clone)] struct ConsensusLayerBlobReader { - pub provider: RootProvider, - pub http_client: Client, - pub config: ConsensusLayerConfig, + provider: RootProvider, + http_client: Client, + config: ConsensusLayerConfig, } impl ConsensusLayerBlobReader { @@ -266,16 +246,15 @@ impl ConsensusLayerBlobReader { pub trait Database: CodesStorageRO + OnChainStorageRO + Unpin + Send + Clone + 'static {} impl Database for T {} -#[allow(private_bounds, private_interfaces)] -pub struct BlobLoader { +pub struct BlobLoader { futures: FuturesUnordered>>, codes_loading: HashSet, - blobs_reader: R, + blobs_reader: ConsensusLayerBlobReader, db: DB, } -impl Stream for BlobLoader { +impl Stream for BlobLoader { type Item = LoaderResult; fn poll_next( @@ -297,13 +276,13 @@ impl Stream for BlobLoader { } } -impl FusedStream for BlobLoader { +impl FusedStream for BlobLoader { fn is_terminated(&self) -> bool { false } } -impl BlobLoader { +impl BlobLoader { pub async fn new(db: DB, consensus_cfg: ConsensusLayerConfig) -> LoaderResult { Ok(Self { futures: FuturesUnordered::new(), @@ -319,12 +298,9 @@ impl BlobLoader { db, }) } -} -#[allow(private_bounds)] -impl BlobLoader { #[cfg(test)] - fn new_with_reader(db: DB, blobs_reader: R) -> Self { + fn new_with_consensus_reader(db: DB, blobs_reader: ConsensusLayerBlobReader) -> Self { Self { futures: FuturesUnordered::new(), codes_loading: HashSet::new(), @@ -334,7 +310,7 @@ impl BlobLoader { } } -impl BlobLoaderService for BlobLoader { +impl BlobLoaderService for BlobLoader { fn into_box(self) -> Box { Box::new(self) } @@ -366,8 +342,8 @@ impl BlobLoaderService for BlobLoader { async move { blobs_reader .read_blob(code_id, tx_hash) - .map(|res| res.map(|code| CodeAndIdUnchecked { code_id, code })) .await + .map(|code| CodeAndIdUnchecked { code_id, code }) } .boxed(), ); @@ -427,88 +403,126 @@ mod tests { use ethexe_ethereum::deploy::EthereumDeployer; use futures::{FutureExt, StreamExt}; use gsigner::secp256k1::{PrivateKey, Signer}; - use std::{ - collections::HashMap, - sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, - }, + use std::{collections::VecDeque, sync::Arc}; + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpListener, + sync::Mutex, + time::{Duration, timeout}, }; - use tokio::time::{Duration, timeout}; const BLOB_CHUNK_SIZE: usize = 128 * 1024; - #[derive(Clone)] - #[allow(dead_code)] - enum MockReadResult { - Ok(Vec), - TransactionNotFound, + fn generated_code(len: usize) -> Vec { + (0..len).map(|i| (i % 251) as u8).collect() } - impl MockReadResult { - fn into_reader_result(self, tx_hash: H256) -> ReaderResult> { - match self { - Self::Ok(code) => Ok(code), - Self::TransactionNotFound => Err(ReaderError::TransactionNotFound(tx_hash)), - } + fn memory_db() -> EthexeDatabase { + #[allow(unused_unsafe)] + unsafe { + EthexeDatabase::memory() } } - #[derive(Clone, Default)] - struct MockBlobReader { - responses: Arc>, - calls: Arc, + fn set_blob_info(db: &EthexeDatabase, code_id: CodeId, tx_hash: H256) { + db.set_code_blob_info( + code_id, + CodeBlobInfo { + timestamp: 0, + tx_hash, + }, + ); } - #[allow(dead_code)] - impl MockBlobReader { - fn with_response(tx_hash: H256, response: MockReadResult) -> Self { - Self { - responses: Arc::new(HashMap::from([(tx_hash, response)])), - calls: Arc::new(AtomicUsize::new(0)), - } - } - - fn calls(&self) -> usize { - self.calls.load(Ordering::SeqCst) - } + async fn test_reader( + ethereum_rpc: String, + ethereum_beacon_rpc: String, + ) -> ConsensusLayerBlobReader { + test_reader_with_block_time(ethereum_rpc, ethereum_beacon_rpc, Duration::from_millis(10)) + .await } - impl BlobReader for MockBlobReader { - fn read_blob( - &self, - _expected_code_id: CodeId, - tx_hash: H256, - ) -> BoxFuture<'static, ReaderResult>> { - self.calls.fetch_add(1, Ordering::SeqCst); - let response = self - .responses - .get(&tx_hash) - .cloned() - .unwrap_or(MockReadResult::TransactionNotFound) - .into_reader_result(tx_hash); - - futures::future::ready(response).boxed() + async fn test_reader_with_block_time( + ethereum_rpc: String, + ethereum_beacon_rpc: String, + beacon_block_time: Duration, + ) -> ConsensusLayerBlobReader { + ConsensusLayerBlobReader { + provider: ProviderBuilder::default() + .connect(ðereum_rpc) + .await + .expect("test reader should connect to ethereum rpc"), + http_client: Client::new(), + config: ConsensusLayerConfig { + ethereum_rpc, + ethereum_beacon_rpc, + beacon_block_time, + attempts: const { NonZero::new(3).unwrap() }, + }, } } - fn generated_code(len: usize) -> Vec { - (0..len).map(|i| (i % 251) as u8).collect() + async fn run_beacon_server(responses: Vec) -> (String, Arc>>) { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("test beacon server should bind"); + let url = format!("http://{}", listener.local_addr().unwrap()); + let paths = Arc::new(Mutex::new(Vec::new())); + let responses = Arc::new(Mutex::new(VecDeque::from(responses))); + + tokio::spawn({ + let paths = paths.clone(); + let responses = responses.clone(); + async move { + loop { + let Ok((mut socket, _)) = listener.accept().await else { + break; + }; + let paths = paths.clone(); + let responses = responses.clone(); + tokio::spawn(async move { + let mut buf = [0; 2048]; + let Ok(n) = socket.read(&mut buf).await else { + return; + }; + let request = String::from_utf8_lossy(&buf[..n]); + let path = request + .lines() + .next() + .and_then(|line| line.split_whitespace().nth(1)) + .unwrap_or_default() + .to_string(); + paths.lock().await.push(path); + + let body = responses + .lock() + .await + .pop_front() + .unwrap_or_else(|| r#"{"data":[]}"#.to_string()); + let response = format!( + "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}", + body.len(), + body + ); + let _ = socket.write_all(response.as_bytes()).await; + }); + } + } + }); + + (url, paths) } - fn set_blob_info(db: &EthexeDatabase, code_id: CodeId, tx_hash: H256) { - db.set_code_blob_info( - code_id, - CodeBlobInfo { - timestamp: 0, - tx_hash, - }, - ); + async fn unused_local_http_url() -> String { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("unused local port should bind"); + let url = format!("http://{}", listener.local_addr().unwrap()); + drop(listener); + url } - async fn expect_blob_loaded( - loader: &mut BlobLoader, - ) -> CodeAndIdUnchecked { + async fn expect_blob_loaded(loader: &mut BlobLoader) -> CodeAndIdUnchecked { match timeout(Duration::from_secs(2), loader.next()) .await .expect("loader must emit before timeout") @@ -552,8 +566,7 @@ mod tests { .await .unwrap(); - // SAFETY: The in-memory database is isolated to this test and does not share state. - let db = unsafe { EthexeDatabase::memory() }; + let db = memory_db(); set_blob_info(&db, code_id, tx_hash); let mut loader = BlobLoader::new(db, consensus_cfg) @@ -568,11 +581,46 @@ mod tests { assert_eq!(loaded.code, code); } + async fn request_code_validation( + chain_id: u64, + beacon_block_time: Duration, + code: &[u8], + ) -> (alloy::node_bindings::AnvilInstance, H256, CodeId) { + let signer = Signer::memory(); + let private_key: PrivateKey = + "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" + .parse() + .unwrap(); + let public_key = signer.import(private_key).unwrap(); + let alice_address = signer.address(public_key); + let anvil = Anvil::new() + .chain_id(chain_id) + .block_time(beacon_block_time.as_secs()) + .spawn(); + + let ethereum = EthereumDeployer::new(&anvil.ws_endpoint(), signer.clone(), alice_address) + .await + .unwrap() + .with_validators(vec![alice_address].try_into().unwrap()) + .deploy() + .await + .unwrap(); + + let (tx_hash, code_id) = ethereum + .router() + .request_code_validation(code) + .await + .unwrap(); + + (anvil, tx_hash, code_id) + } + #[tokio::test] async fn load_codes_fails_when_code_blob_info_is_missing() { - // SAFETY: The in-memory database is isolated to this test and does not share state. - let db = unsafe { EthexeDatabase::memory() }; - let mut loader = BlobLoader::new_with_reader(db, MockBlobReader::default()); + let anvil = Anvil::new().spawn(); + let db = memory_db(); + let reader = test_reader(anvil.endpoint(), anvil.endpoint()).await; + let mut loader = BlobLoader::new_with_consensus_reader(db, reader); let code_id = CodeId::generate(&[1, 2, 3, 4]); let err = loader @@ -585,15 +633,15 @@ mod tests { #[tokio::test] async fn already_loaded_code_is_emitted_without_remote_read() { - // SAFETY: The in-memory database is isolated to this test and does not share state. - let db = unsafe { EthexeDatabase::memory() }; + let anvil = Anvil::new().spawn(); + let db = memory_db(); let code = generated_code(64); let code_id = db.set_original_code(&code); let tx_hash = H256::random(); set_blob_info(&db, code_id, tx_hash); - let reader = MockBlobReader::with_response(tx_hash, MockReadResult::TransactionNotFound); - let mut loader = BlobLoader::new_with_reader(db, reader.clone()); + let reader = test_reader(anvil.endpoint(), unused_local_http_url().await).await; + let mut loader = BlobLoader::new_with_consensus_reader(db, reader); loader .load_codes(HashSet::from([code_id])) @@ -604,71 +652,20 @@ mod tests { assert_eq!(loaded.code_id, code_id); assert_eq!(loaded.code, code); - assert_eq!(reader.calls(), 0); - assert_eq!(loader.pending_codes_len(), 0); - } - - #[tokio::test] - async fn remote_code_is_emitted_and_pending_state_is_cleared() { - // SAFETY: The in-memory database is isolated to this test and does not share state. - let db = unsafe { EthexeDatabase::memory() }; - let code = generated_code(128); - let code_id = CodeId::generate(&code); - let tx_hash = H256::random(); - set_blob_info(&db, code_id, tx_hash); - - let reader = MockBlobReader::with_response(tx_hash, MockReadResult::Ok(code.clone())); - let mut loader = BlobLoader::new_with_reader(db, reader.clone()); - - loader - .load_codes(HashSet::from([code_id])) - .expect("metadata exists"); - - assert_eq!(loader.pending_codes_len(), 1); - let loaded = expect_blob_loaded(&mut loader).await; - - assert_eq!(loaded.code_id, code_id); - assert_eq!(loaded.code, code); - assert_eq!(reader.calls(), 1); - assert_eq!(loader.pending_codes_len(), 0); - } - - #[tokio::test] - async fn remote_code_larger_than_three_blob_chunks_round_trips() { - // SAFETY: The in-memory database is isolated to this test and does not share state. - let db = unsafe { EthexeDatabase::memory() }; - let code = generated_code(3 * BLOB_CHUNK_SIZE + 17); - let code_id = CodeId::generate(&code); - let tx_hash = H256::random(); - set_blob_info(&db, code_id, tx_hash); - - let reader = MockBlobReader::with_response(tx_hash, MockReadResult::Ok(code.clone())); - let mut loader = BlobLoader::new_with_reader(db, reader.clone()); - - loader - .load_codes(HashSet::from([code_id])) - .expect("metadata exists"); - - let loaded = expect_blob_loaded(&mut loader).await; - - assert_eq!(loaded.code_id, code_id); - assert_eq!(loaded.code.len(), 3 * BLOB_CHUNK_SIZE + 17); - assert_eq!(loaded.code, code); - assert_eq!(reader.calls(), 1); assert_eq!(loader.pending_codes_len(), 0); } #[tokio::test] async fn reader_failure_does_not_emit_success_or_terminate_stream() { - // SAFETY: The in-memory database is isolated to this test and does not share state. - let db = unsafe { EthexeDatabase::memory() }; + let anvil = Anvil::new().spawn(); + let db = memory_db(); let code = generated_code(128); let code_id = CodeId::generate(&code); let tx_hash = H256::random(); set_blob_info(&db, code_id, tx_hash); - let reader = MockBlobReader::with_response(tx_hash, MockReadResult::TransactionNotFound); - let mut loader = BlobLoader::new_with_reader(db, reader.clone()); + let reader = test_reader(anvil.endpoint(), anvil.endpoint()).await; + let mut loader = BlobLoader::new_with_consensus_reader(db, reader); loader .load_codes(HashSet::from([code_id])) @@ -681,20 +678,21 @@ mod tests { "reader failure should be logged and skipped, not emitted as success" ); assert!(!loader.is_terminated()); - assert_eq!(reader.calls(), 1); } #[tokio::test] async fn repeated_load_codes_for_pending_code_schedules_one_remote_read() { - // SAFETY: The in-memory database is isolated to this test and does not share state. - let db = unsafe { EthexeDatabase::memory() }; + let beacon_block_time = Duration::from_secs(1); let code = generated_code(128); - let code_id = CodeId::generate(&code); - let tx_hash = H256::random(); + let (anvil, tx_hash, code_id) = + request_code_validation(31337, beacon_block_time, &code).await; + let db = memory_db(); set_blob_info(&db, code_id, tx_hash); - let reader = MockBlobReader::with_response(tx_hash, MockReadResult::Ok(code)); - let mut loader = BlobLoader::new_with_reader(db, reader.clone()); + let reader = + test_reader_with_block_time(anvil.endpoint(), anvil.endpoint(), beacon_block_time) + .await; + let mut loader = BlobLoader::new_with_consensus_reader(db, reader); loader .load_codes(HashSet::from([code_id])) @@ -709,7 +707,6 @@ mod tests { loader.next().now_or_never().is_none(), "duplicate pending request must not queue another ready event" ); - assert_eq!(reader.calls(), 1); assert_eq!(loader.pending_codes_len(), 0); } @@ -723,6 +720,128 @@ mod tests { run_anvil_blob_loader_test(generated_code(3 * BLOB_CHUNK_SIZE + 17)).await; } + #[tokio::test] + async fn consensus_reader_reports_beacon_rpc_disconnect() { + let anvil = Anvil::new().spawn(); + let reader = test_reader(anvil.endpoint(), unused_local_http_url().await).await; + + let err = reader + .read_blob_bundle(0, &[B256::ZERO]) + .await + .expect_err("disconnected beacon rpc should fail"); + + assert!(matches!(err, ReadBlobBundleError::Reqwest(_))); + } + + #[tokio::test] + async fn consensus_reader_reports_invalid_beacon_json() { + let anvil = Anvil::new().spawn(); + let (beacon_rpc, _) = run_beacon_server(vec!["not json".to_string()]).await; + let reader = test_reader(anvil.endpoint(), beacon_rpc).await; + + let err = reader + .read_blob_bundle(0, &[B256::ZERO]) + .await + .expect_err("invalid beacon json should fail"); + + assert!(matches!(err, ReadBlobBundleError::Serde(_))); + } + + #[tokio::test] + async fn consensus_reader_uses_beacon_genesis_slot_for_non_anvil_chain_id() { + let beacon_block_time = Duration::from_secs(1); + let code = generated_code(128); + let (anvil, tx_hash, code_id) = request_code_validation(1, beacon_block_time, &code).await; + let provider: RootProvider = ProviderBuilder::default() + .connect(&anvil.endpoint()) + .await + .unwrap(); + let tx = provider + .get_transaction_by_hash(tx_hash.0.into()) + .await + .unwrap() + .unwrap(); + let block_hash = tx.block_hash.unwrap(); + let block = provider + .get_block_by_hash(block_hash) + .await + .unwrap() + .unwrap(); + let expected_slot = block.header.number; + let genesis_time = block.header.timestamp - expected_slot; + let blob_body = reqwest::get(format!( + "{}/eth/v1/beacon/blobs/{expected_slot}?versioned_hashes={}", + anvil.endpoint(), + tx.blob_versioned_hashes().unwrap()[0] + )) + .await + .unwrap() + .text() + .await + .unwrap(); + let (beacon_rpc, paths) = run_beacon_server(vec![ + format!( + r#"{{"data":{{"genesis_time":"{genesis_time}","genesis_validators_root":"0x0000000000000000000000000000000000000000000000000000000000000000","genesis_fork_version":"0x00000000"}}}}"# + ), + blob_body, + ]) + .await; + let reader = + test_reader_with_block_time(anvil.endpoint(), beacon_rpc, beacon_block_time).await; + + let blob = reader.read_blob(code_id, tx_hash).await.unwrap(); + + assert_eq!(blob, code); + let paths = paths.lock().await; + assert!(paths.iter().any(|path| path == "/eth/v1/beacon/genesis")); + assert!(paths.iter().any(|path| { + path.starts_with(&format!( + "/eth/v1/beacon/blobs/{expected_slot}?versioned_hashes=" + )) + })); + } + + #[tokio::test] + async fn consensus_reader_re_requests_blob_after_transient_invalid_json() { + let beacon_block_time = Duration::from_secs(1); + let code = generated_code(128); + let (anvil, tx_hash, code_id) = + request_code_validation(31337, beacon_block_time, &code).await; + let provider: RootProvider = ProviderBuilder::default() + .connect(&anvil.endpoint()) + .await + .unwrap(); + let tx = provider + .get_transaction_by_hash(tx_hash.0.into()) + .await + .unwrap() + .unwrap(); + let slot = tx.block_number.unwrap(); + let blob_body = reqwest::get(format!( + "{}/eth/v1/beacon/blobs/{slot}?versioned_hashes={}", + anvil.endpoint(), + tx.blob_versioned_hashes().unwrap()[0] + )) + .await + .unwrap() + .text() + .await + .unwrap(); + let (beacon_rpc, paths) = run_beacon_server(vec!["not json".to_string(), blob_body]).await; + let reader = test_reader(anvil.endpoint(), beacon_rpc).await; + + let blob = reader.read_blob(code_id, tx_hash).await.unwrap(); + + assert_eq!(blob, code); + let blob_requests = paths + .lock() + .await + .iter() + .filter(|path| path.starts_with(&format!("/eth/v1/beacon/blobs/{slot}"))) + .count(); + assert_eq!(blob_requests, 2); + } + #[test] fn test_handle_blob() { let code_id = CodeId::generate(&[1, 2, 3, 4]); From 3f39440ddb8edfeb231e10f8295c5634111d5469 Mon Sep 17 00:00:00 2001 From: playX18 Date: Tue, 28 Apr 2026 17:19:10 +0700 Subject: [PATCH 4/8] construct database inline --- ethexe/blob-loader/src/lib.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/ethexe/blob-loader/src/lib.rs b/ethexe/blob-loader/src/lib.rs index 1c88fe0929f..7047797355d 100644 --- a/ethexe/blob-loader/src/lib.rs +++ b/ethexe/blob-loader/src/lib.rs @@ -417,13 +417,6 @@ mod tests { (0..len).map(|i| (i % 251) as u8).collect() } - fn memory_db() -> EthexeDatabase { - #[allow(unused_unsafe)] - unsafe { - EthexeDatabase::memory() - } - } - fn set_blob_info(db: &EthexeDatabase, code_id: CodeId, tx_hash: H256) { db.set_code_blob_info( code_id, @@ -565,8 +558,8 @@ mod tests { .request_code_validation(&code) .await .unwrap(); - - let db = memory_db(); + #[allow(unused_unsafe)] + let db = unsafe { EthexeDatabase::memory() }; set_blob_info(&db, code_id, tx_hash); let mut loader = BlobLoader::new(db, consensus_cfg) @@ -618,7 +611,8 @@ mod tests { #[tokio::test] async fn load_codes_fails_when_code_blob_info_is_missing() { let anvil = Anvil::new().spawn(); - let db = memory_db(); + #[allow(unused_unsafe)] + let db = unsafe { EthexeDatabase::memory() }; let reader = test_reader(anvil.endpoint(), anvil.endpoint()).await; let mut loader = BlobLoader::new_with_consensus_reader(db, reader); let code_id = CodeId::generate(&[1, 2, 3, 4]); @@ -634,7 +628,8 @@ mod tests { #[tokio::test] async fn already_loaded_code_is_emitted_without_remote_read() { let anvil = Anvil::new().spawn(); - let db = memory_db(); + #[allow(unused_unsafe)] + let db = unsafe { EthexeDatabase::memory() }; let code = generated_code(64); let code_id = db.set_original_code(&code); let tx_hash = H256::random(); @@ -658,7 +653,8 @@ mod tests { #[tokio::test] async fn reader_failure_does_not_emit_success_or_terminate_stream() { let anvil = Anvil::new().spawn(); - let db = memory_db(); + #[allow(unused_unsafe)] + let db = unsafe { EthexeDatabase::memory() }; let code = generated_code(128); let code_id = CodeId::generate(&code); let tx_hash = H256::random(); @@ -686,7 +682,8 @@ mod tests { let code = generated_code(128); let (anvil, tx_hash, code_id) = request_code_validation(31337, beacon_block_time, &code).await; - let db = memory_db(); + #[allow(unused_unsafe)] + let db = unsafe { EthexeDatabase::memory() }; set_blob_info(&db, code_id, tx_hash); let reader = From a051db567285234502c90a8d22610f3098ec2585 Mon Sep 17 00:00:00 2001 From: playX18 Date: Tue, 28 Apr 2026 17:27:13 +0700 Subject: [PATCH 5/8] remove unsafe and add features "mock" to ethexe-db dependency --- ethexe/blob-loader/Cargo.toml | 8 ++++++-- ethexe/blob-loader/src/lib.rs | 26 +++++++++++++------------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/ethexe/blob-loader/Cargo.toml b/ethexe/blob-loader/Cargo.toml index 4cc42d1e9a3..f594ab869d7 100644 --- a/ethexe/blob-loader/Cargo.toml +++ b/ethexe/blob-loader/Cargo.toml @@ -12,7 +12,11 @@ rust-version.workspace = true ethexe-common = { workspace = true, features = ["std"] } gprimitives = { workspace = true, features = ["std"] } -alloy = { workspace = true, features = ["rpc", "rpc-types", "rpc-types-beacon"] } +alloy = { workspace = true, features = [ + "rpc", + "rpc-types", + "rpc-types-beacon", +] } thiserror.workspace = true reqwest = { workspace = true, features = ["default-tls", "json"] } serde_json.workspace = true @@ -22,7 +26,7 @@ tokio = { workspace = true, features = ["rt-multi-thread", "fs"] } gear-workspace-hack.workspace = true [dev-dependencies] -ethexe-db.workspace = true +ethexe-db = { workspace = true, features = ["mock"] } gprimitives = { workspace = true, features = ["ethexe", "std"] } gsigner.workspace = true ethexe-ethereum.workspace = true diff --git a/ethexe/blob-loader/src/lib.rs b/ethexe/blob-loader/src/lib.rs index 7047797355d..5d2b469324b 100644 --- a/ethexe/blob-loader/src/lib.rs +++ b/ethexe/blob-loader/src/lib.rs @@ -399,7 +399,7 @@ mod tests { db::{CodesStorageRW, OnChainStorageRW}, gear_core::ids::prelude::CodeIdExt, }; - use ethexe_db::Database as EthexeDatabase; + use ethexe_db::Database; use ethexe_ethereum::deploy::EthereumDeployer; use futures::{FutureExt, StreamExt}; use gsigner::secp256k1::{PrivateKey, Signer}; @@ -417,7 +417,7 @@ mod tests { (0..len).map(|i| (i % 251) as u8).collect() } - fn set_blob_info(db: &EthexeDatabase, code_id: CodeId, tx_hash: H256) { + fn set_blob_info(db: &Database, code_id: CodeId, tx_hash: H256) { db.set_code_blob_info( code_id, CodeBlobInfo { @@ -515,7 +515,7 @@ mod tests { url } - async fn expect_blob_loaded(loader: &mut BlobLoader) -> CodeAndIdUnchecked { + async fn expect_blob_loaded(loader: &mut BlobLoader) -> CodeAndIdUnchecked { match timeout(Duration::from_secs(2), loader.next()) .await .expect("loader must emit before timeout") @@ -558,8 +558,8 @@ mod tests { .request_code_validation(&code) .await .unwrap(); - #[allow(unused_unsafe)] - let db = unsafe { EthexeDatabase::memory() }; + + let db = Database::memory(); set_blob_info(&db, code_id, tx_hash); let mut loader = BlobLoader::new(db, consensus_cfg) @@ -611,8 +611,8 @@ mod tests { #[tokio::test] async fn load_codes_fails_when_code_blob_info_is_missing() { let anvil = Anvil::new().spawn(); - #[allow(unused_unsafe)] - let db = unsafe { EthexeDatabase::memory() }; + + let db = Database::memory(); let reader = test_reader(anvil.endpoint(), anvil.endpoint()).await; let mut loader = BlobLoader::new_with_consensus_reader(db, reader); let code_id = CodeId::generate(&[1, 2, 3, 4]); @@ -628,8 +628,8 @@ mod tests { #[tokio::test] async fn already_loaded_code_is_emitted_without_remote_read() { let anvil = Anvil::new().spawn(); - #[allow(unused_unsafe)] - let db = unsafe { EthexeDatabase::memory() }; + + let db = Database::memory(); let code = generated_code(64); let code_id = db.set_original_code(&code); let tx_hash = H256::random(); @@ -653,8 +653,8 @@ mod tests { #[tokio::test] async fn reader_failure_does_not_emit_success_or_terminate_stream() { let anvil = Anvil::new().spawn(); - #[allow(unused_unsafe)] - let db = unsafe { EthexeDatabase::memory() }; + + let db = Database::memory(); let code = generated_code(128); let code_id = CodeId::generate(&code); let tx_hash = H256::random(); @@ -682,8 +682,8 @@ mod tests { let code = generated_code(128); let (anvil, tx_hash, code_id) = request_code_validation(31337, beacon_block_time, &code).await; - #[allow(unused_unsafe)] - let db = unsafe { EthexeDatabase::memory() }; + + let db = Database::memory(); set_blob_info(&db, code_id, tx_hash); let reader = From 28f7462bd3a93a8e6b231a592a60f239ccf4175a Mon Sep 17 00:00:00 2001 From: playX18 Date: Tue, 28 Apr 2026 20:53:01 +0700 Subject: [PATCH 6/8] move attempts to const; document beacon RPC mock; split tests into separate file --- ethexe/blob-loader/src/lib.rs | 503 +------------------------------- ethexe/blob-loader/src/tests.rs | 501 +++++++++++++++++++++++++++++++ 2 files changed, 502 insertions(+), 502 deletions(-) create mode 100644 ethexe/blob-loader/src/tests.rs diff --git a/ethexe/blob-loader/src/lib.rs b/ethexe/blob-loader/src/lib.rs index 5d2b469324b..8ed93a6f153 100644 --- a/ethexe/blob-loader/src/lib.rs +++ b/ethexe/blob-loader/src/lib.rs @@ -391,505 +391,4 @@ fn handle_blob( } #[cfg(test)] -mod tests { - use super::*; - use alloy::node_bindings::Anvil; - use ethexe_common::{ - CodeBlobInfo, - db::{CodesStorageRW, OnChainStorageRW}, - gear_core::ids::prelude::CodeIdExt, - }; - use ethexe_db::Database; - use ethexe_ethereum::deploy::EthereumDeployer; - use futures::{FutureExt, StreamExt}; - use gsigner::secp256k1::{PrivateKey, Signer}; - use std::{collections::VecDeque, sync::Arc}; - use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::TcpListener, - sync::Mutex, - time::{Duration, timeout}, - }; - - const BLOB_CHUNK_SIZE: usize = 128 * 1024; - - fn generated_code(len: usize) -> Vec { - (0..len).map(|i| (i % 251) as u8).collect() - } - - fn set_blob_info(db: &Database, code_id: CodeId, tx_hash: H256) { - db.set_code_blob_info( - code_id, - CodeBlobInfo { - timestamp: 0, - tx_hash, - }, - ); - } - - async fn test_reader( - ethereum_rpc: String, - ethereum_beacon_rpc: String, - ) -> ConsensusLayerBlobReader { - test_reader_with_block_time(ethereum_rpc, ethereum_beacon_rpc, Duration::from_millis(10)) - .await - } - - async fn test_reader_with_block_time( - ethereum_rpc: String, - ethereum_beacon_rpc: String, - beacon_block_time: Duration, - ) -> ConsensusLayerBlobReader { - ConsensusLayerBlobReader { - provider: ProviderBuilder::default() - .connect(ðereum_rpc) - .await - .expect("test reader should connect to ethereum rpc"), - http_client: Client::new(), - config: ConsensusLayerConfig { - ethereum_rpc, - ethereum_beacon_rpc, - beacon_block_time, - attempts: const { NonZero::new(3).unwrap() }, - }, - } - } - - async fn run_beacon_server(responses: Vec) -> (String, Arc>>) { - let listener = TcpListener::bind("127.0.0.1:0") - .await - .expect("test beacon server should bind"); - let url = format!("http://{}", listener.local_addr().unwrap()); - let paths = Arc::new(Mutex::new(Vec::new())); - let responses = Arc::new(Mutex::new(VecDeque::from(responses))); - - tokio::spawn({ - let paths = paths.clone(); - let responses = responses.clone(); - async move { - loop { - let Ok((mut socket, _)) = listener.accept().await else { - break; - }; - let paths = paths.clone(); - let responses = responses.clone(); - tokio::spawn(async move { - let mut buf = [0; 2048]; - let Ok(n) = socket.read(&mut buf).await else { - return; - }; - let request = String::from_utf8_lossy(&buf[..n]); - let path = request - .lines() - .next() - .and_then(|line| line.split_whitespace().nth(1)) - .unwrap_or_default() - .to_string(); - paths.lock().await.push(path); - - let body = responses - .lock() - .await - .pop_front() - .unwrap_or_else(|| r#"{"data":[]}"#.to_string()); - let response = format!( - "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}", - body.len(), - body - ); - let _ = socket.write_all(response.as_bytes()).await; - }); - } - } - }); - - (url, paths) - } - - async fn unused_local_http_url() -> String { - let listener = TcpListener::bind("127.0.0.1:0") - .await - .expect("unused local port should bind"); - let url = format!("http://{}", listener.local_addr().unwrap()); - drop(listener); - url - } - - async fn expect_blob_loaded(loader: &mut BlobLoader) -> CodeAndIdUnchecked { - match timeout(Duration::from_secs(2), loader.next()) - .await - .expect("loader must emit before timeout") - .expect("loader stream should yield an event") - .expect("loader event should be ok") - { - BlobLoaderEvent::BlobLoaded(code_and_id) => code_and_id, - } - } - - async fn run_anvil_blob_loader_test(code: Vec) { - let signer = Signer::memory(); - let private_key: PrivateKey = - "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" - .parse() - .unwrap(); - let public_key = signer.import(private_key).unwrap(); - let alice_address = signer.address(public_key); - - let beacon_block_time = Duration::from_secs(1); - let anvil = Anvil::new().block_time(beacon_block_time.as_secs()).spawn(); - - let ethereum = EthereumDeployer::new(&anvil.ws_endpoint(), signer.clone(), alice_address) - .await - .unwrap() - .with_validators(vec![alice_address].try_into().unwrap()) - .deploy() - .await - .unwrap(); - - let consensus_cfg = ConsensusLayerConfig { - ethereum_rpc: anvil.endpoint(), - ethereum_beacon_rpc: anvil.endpoint(), - beacon_block_time, - attempts: const { NonZero::new(3).unwrap() }, - }; - - let (tx_hash, code_id) = ethereum - .router() - .request_code_validation(&code) - .await - .unwrap(); - - let db = Database::memory(); - set_blob_info(&db, code_id, tx_hash); - - let mut loader = BlobLoader::new(db, consensus_cfg) - .await - .expect("blob loader should connect to anvil"); - loader - .load_codes(HashSet::from([code_id])) - .expect("CodeBlobInfo was inserted"); - - let loaded = expect_blob_loaded(&mut loader).await; - assert_eq!(loaded.code_id, code_id); - assert_eq!(loaded.code, code); - } - - async fn request_code_validation( - chain_id: u64, - beacon_block_time: Duration, - code: &[u8], - ) -> (alloy::node_bindings::AnvilInstance, H256, CodeId) { - let signer = Signer::memory(); - let private_key: PrivateKey = - "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" - .parse() - .unwrap(); - let public_key = signer.import(private_key).unwrap(); - let alice_address = signer.address(public_key); - let anvil = Anvil::new() - .chain_id(chain_id) - .block_time(beacon_block_time.as_secs()) - .spawn(); - - let ethereum = EthereumDeployer::new(&anvil.ws_endpoint(), signer.clone(), alice_address) - .await - .unwrap() - .with_validators(vec![alice_address].try_into().unwrap()) - .deploy() - .await - .unwrap(); - - let (tx_hash, code_id) = ethereum - .router() - .request_code_validation(code) - .await - .unwrap(); - - (anvil, tx_hash, code_id) - } - - #[tokio::test] - async fn load_codes_fails_when_code_blob_info_is_missing() { - let anvil = Anvil::new().spawn(); - - let db = Database::memory(); - let reader = test_reader(anvil.endpoint(), anvil.endpoint()).await; - let mut loader = BlobLoader::new_with_consensus_reader(db, reader); - let code_id = CodeId::generate(&[1, 2, 3, 4]); - - let err = loader - .load_codes(HashSet::from([code_id])) - .expect_err("missing CodeBlobInfo must fail"); - - assert!(matches!(err, BlobLoaderError::CodeBlobInfoNotFound(id) if id == code_id)); - assert_eq!(loader.pending_codes_len(), 0); - } - - #[tokio::test] - async fn already_loaded_code_is_emitted_without_remote_read() { - let anvil = Anvil::new().spawn(); - - let db = Database::memory(); - let code = generated_code(64); - let code_id = db.set_original_code(&code); - let tx_hash = H256::random(); - set_blob_info(&db, code_id, tx_hash); - - let reader = test_reader(anvil.endpoint(), unused_local_http_url().await).await; - let mut loader = BlobLoader::new_with_consensus_reader(db, reader); - - loader - .load_codes(HashSet::from([code_id])) - .expect("metadata exists"); - - assert_eq!(loader.pending_codes_len(), 1); - let loaded = expect_blob_loaded(&mut loader).await; - - assert_eq!(loaded.code_id, code_id); - assert_eq!(loaded.code, code); - assert_eq!(loader.pending_codes_len(), 0); - } - - #[tokio::test] - async fn reader_failure_does_not_emit_success_or_terminate_stream() { - let anvil = Anvil::new().spawn(); - - let db = Database::memory(); - let code = generated_code(128); - let code_id = CodeId::generate(&code); - let tx_hash = H256::random(); - set_blob_info(&db, code_id, tx_hash); - - let reader = test_reader(anvil.endpoint(), anvil.endpoint()).await; - let mut loader = BlobLoader::new_with_consensus_reader(db, reader); - - loader - .load_codes(HashSet::from([code_id])) - .expect("metadata exists"); - - let no_event = timeout(Duration::from_millis(100), loader.next()).await; - - assert!( - no_event.is_err(), - "reader failure should be logged and skipped, not emitted as success" - ); - assert!(!loader.is_terminated()); - } - - #[tokio::test] - async fn repeated_load_codes_for_pending_code_schedules_one_remote_read() { - let beacon_block_time = Duration::from_secs(1); - let code = generated_code(128); - let (anvil, tx_hash, code_id) = - request_code_validation(31337, beacon_block_time, &code).await; - - let db = Database::memory(); - set_blob_info(&db, code_id, tx_hash); - - let reader = - test_reader_with_block_time(anvil.endpoint(), anvil.endpoint(), beacon_block_time) - .await; - let mut loader = BlobLoader::new_with_consensus_reader(db, reader); - - loader - .load_codes(HashSet::from([code_id])) - .expect("first request should be accepted"); - loader - .load_codes(HashSet::from([code_id])) - .expect("duplicate pending request should be ignored"); - - assert_eq!(loader.pending_codes_len(), 1); - let _ = expect_blob_loaded(&mut loader).await; - assert!( - loader.next().now_or_never().is_none(), - "duplicate pending request must not queue another ready event" - ); - assert_eq!(loader.pending_codes_len(), 0); - } - - #[tokio::test] - async fn blob_loader_reads_code_from_anvil_tx() { - run_anvil_blob_loader_test(generated_code(128)).await; - } - - #[tokio::test] - async fn blob_loader_reads_code_larger_than_three_blob_chunks_from_anvil_tx() { - run_anvil_blob_loader_test(generated_code(3 * BLOB_CHUNK_SIZE + 17)).await; - } - - #[tokio::test] - async fn consensus_reader_reports_beacon_rpc_disconnect() { - let anvil = Anvil::new().spawn(); - let reader = test_reader(anvil.endpoint(), unused_local_http_url().await).await; - - let err = reader - .read_blob_bundle(0, &[B256::ZERO]) - .await - .expect_err("disconnected beacon rpc should fail"); - - assert!(matches!(err, ReadBlobBundleError::Reqwest(_))); - } - - #[tokio::test] - async fn consensus_reader_reports_invalid_beacon_json() { - let anvil = Anvil::new().spawn(); - let (beacon_rpc, _) = run_beacon_server(vec!["not json".to_string()]).await; - let reader = test_reader(anvil.endpoint(), beacon_rpc).await; - - let err = reader - .read_blob_bundle(0, &[B256::ZERO]) - .await - .expect_err("invalid beacon json should fail"); - - assert!(matches!(err, ReadBlobBundleError::Serde(_))); - } - - #[tokio::test] - async fn consensus_reader_uses_beacon_genesis_slot_for_non_anvil_chain_id() { - let beacon_block_time = Duration::from_secs(1); - let code = generated_code(128); - let (anvil, tx_hash, code_id) = request_code_validation(1, beacon_block_time, &code).await; - let provider: RootProvider = ProviderBuilder::default() - .connect(&anvil.endpoint()) - .await - .unwrap(); - let tx = provider - .get_transaction_by_hash(tx_hash.0.into()) - .await - .unwrap() - .unwrap(); - let block_hash = tx.block_hash.unwrap(); - let block = provider - .get_block_by_hash(block_hash) - .await - .unwrap() - .unwrap(); - let expected_slot = block.header.number; - let genesis_time = block.header.timestamp - expected_slot; - let blob_body = reqwest::get(format!( - "{}/eth/v1/beacon/blobs/{expected_slot}?versioned_hashes={}", - anvil.endpoint(), - tx.blob_versioned_hashes().unwrap()[0] - )) - .await - .unwrap() - .text() - .await - .unwrap(); - let (beacon_rpc, paths) = run_beacon_server(vec![ - format!( - r#"{{"data":{{"genesis_time":"{genesis_time}","genesis_validators_root":"0x0000000000000000000000000000000000000000000000000000000000000000","genesis_fork_version":"0x00000000"}}}}"# - ), - blob_body, - ]) - .await; - let reader = - test_reader_with_block_time(anvil.endpoint(), beacon_rpc, beacon_block_time).await; - - let blob = reader.read_blob(code_id, tx_hash).await.unwrap(); - - assert_eq!(blob, code); - let paths = paths.lock().await; - assert!(paths.iter().any(|path| path == "/eth/v1/beacon/genesis")); - assert!(paths.iter().any(|path| { - path.starts_with(&format!( - "/eth/v1/beacon/blobs/{expected_slot}?versioned_hashes=" - )) - })); - } - - #[tokio::test] - async fn consensus_reader_re_requests_blob_after_transient_invalid_json() { - let beacon_block_time = Duration::from_secs(1); - let code = generated_code(128); - let (anvil, tx_hash, code_id) = - request_code_validation(31337, beacon_block_time, &code).await; - let provider: RootProvider = ProviderBuilder::default() - .connect(&anvil.endpoint()) - .await - .unwrap(); - let tx = provider - .get_transaction_by_hash(tx_hash.0.into()) - .await - .unwrap() - .unwrap(); - let slot = tx.block_number.unwrap(); - let blob_body = reqwest::get(format!( - "{}/eth/v1/beacon/blobs/{slot}?versioned_hashes={}", - anvil.endpoint(), - tx.blob_versioned_hashes().unwrap()[0] - )) - .await - .unwrap() - .text() - .await - .unwrap(); - let (beacon_rpc, paths) = run_beacon_server(vec!["not json".to_string(), blob_body]).await; - let reader = test_reader(anvil.endpoint(), beacon_rpc).await; - - let blob = reader.read_blob(code_id, tx_hash).await.unwrap(); - - assert_eq!(blob, code); - let blob_requests = paths - .lock() - .await - .iter() - .filter(|path| path.starts_with(&format!("/eth/v1/beacon/blobs/{slot}"))) - .count(); - assert_eq!(blob_requests, 2); - } - - #[test] - fn test_handle_blob() { - let code_id = CodeId::generate(&[1, 2, 3, 4]); - - // correct blob - let blob = vec![1, 2, 3, 4]; - let mut previously_received_code_id = None; - let result = - handle_blob(blob.clone(), code_id, &mut previously_received_code_id, 1).unwrap(); - assert_eq!(result, blob); - - // blob with incorrect code id - let blob = vec![4, 3, 2, 1]; - let blob_code_id = CodeId::generate(&blob); - let mut previously_received_code_id = None; - let result = handle_blob(blob.clone(), code_id, &mut previously_received_code_id, 1); - assert!(matches!( - result, - Err(ReaderError::CodeIdMismatch { - expected, - found, - }) if expected == code_id && found == blob_code_id - ),); - assert_eq!(previously_received_code_id, Some(blob_code_id)); - - // same incorrect blob again - should be considered as loaded - let result = - handle_blob(blob.clone(), code_id, &mut previously_received_code_id, 2).unwrap(); - assert_eq!(result, blob); - - // same incorrect blob again, but another code id - let previously_received_code_id = CodeId::from([1; 32]); - let result = handle_blob( - blob.clone(), - code_id, - &mut Some(previously_received_code_id), - 2, - ); - assert!(matches!( - result, - Err(ReaderError::CodeIdMismatch { - expected, - found, - }) if expected == code_id && found == blob_code_id - )); - - // empty blob - let blob = vec![]; - let mut previously_received_code_id = None; - let result = handle_blob(blob.clone(), code_id, &mut previously_received_code_id, 1); - assert!(result.is_err()); - assert!(previously_received_code_id.is_none()); - } -} +mod tests; diff --git a/ethexe/blob-loader/src/tests.rs b/ethexe/blob-loader/src/tests.rs new file mode 100644 index 00000000000..e83c2de79be --- /dev/null +++ b/ethexe/blob-loader/src/tests.rs @@ -0,0 +1,501 @@ + +use super::*; +use alloy::node_bindings::Anvil; +use ethexe_common::{ + CodeBlobInfo, + db::{CodesStorageRW, OnChainStorageRW}, + gear_core::ids::prelude::CodeIdExt, +}; +use ethexe_db::Database; +use ethexe_ethereum::deploy::EthereumDeployer; +use futures::{FutureExt, StreamExt}; +use gsigner::secp256k1::{PrivateKey, Signer}; +use std::{collections::VecDeque, sync::Arc}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpListener, + sync::Mutex, + time::{Duration, timeout}, +}; + +const ATTEMPTS: NonZero = NonZero::new(3).unwrap(); +const BLOB_CHUNK_SIZE: usize = 128 * 1024; + +fn generated_code(len: usize) -> Vec { + (0..len).map(|i| (i % 251) as u8).collect() +} + +fn set_blob_info(db: &Database, code_id: CodeId, tx_hash: H256) { + db.set_code_blob_info( + code_id, + CodeBlobInfo { + timestamp: 0, + tx_hash, + }, + ); +} + +async fn test_reader( + ethereum_rpc: String, + ethereum_beacon_rpc: String, +) -> ConsensusLayerBlobReader { + test_reader_with_block_time(ethereum_rpc, ethereum_beacon_rpc, Duration::from_millis(10)).await +} + +async fn test_reader_with_block_time( + ethereum_rpc: String, + ethereum_beacon_rpc: String, + beacon_block_time: Duration, +) -> ConsensusLayerBlobReader { + ConsensusLayerBlobReader { + provider: ProviderBuilder::default() + .connect(ðereum_rpc) + .await + .expect("test reader should connect to ethereum rpc"), + http_client: Client::new(), + config: ConsensusLayerConfig { + ethereum_rpc, + ethereum_beacon_rpc, + beacon_block_time, + attempts: ATTEMPTS, + }, + } +} + +/// We had a lot of problems in the past with Consensus Layer Blob Reader: bad data arrives, retry didn't work, +/// we forgot to set valid to false on bad code and so on. +/// +/// This function mimics the beacon node behaviour for testing purposes. +/// +/// In practice you can send arbitrary amount of `responses` and this function will send them in order. +async fn run_beacon_server(responses: Vec) -> (String, Arc>>) { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("test beacon server should bind"); + let url = format!("http://{}", listener.local_addr().unwrap()); + let paths = Arc::new(Mutex::new(Vec::new())); + let responses = Arc::new(Mutex::new(VecDeque::from(responses))); + + tokio::spawn({ + let paths = paths.clone(); + let responses = responses.clone(); + async move { + loop { + let Ok((mut socket, _)) = listener.accept().await else { + break; + }; + let paths = paths.clone(); + let responses = responses.clone(); + tokio::spawn(async move { + let mut buf = [0; 2048]; + let Ok(n) = socket.read(&mut buf).await else { + return; + }; + let request = String::from_utf8_lossy(&buf[..n]); + let path = request + .lines() + .next() + .and_then(|line| line.split_whitespace().nth(1)) + .unwrap_or_default() + .to_string(); + paths.lock().await.push(path); + + let body = responses + .lock() + .await + .pop_front() + .unwrap_or_else(|| r#"{"data":[]}"#.to_string()); + let response = format!( + "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}", + body.len(), + body + ); + let _ = socket.write_all(response.as_bytes()).await; + }); + } + } + }); + + (url, paths) +} + +async fn unused_local_http_url() -> String { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("unused local port should bind"); + let url = format!("http://{}", listener.local_addr().unwrap()); + drop(listener); + url +} + +async fn expect_blob_loaded(loader: &mut BlobLoader) -> CodeAndIdUnchecked { + match timeout(Duration::from_secs(2), loader.next()) + .await + .expect("loader must emit before timeout") + .expect("loader stream should yield an event") + .expect("loader event should be ok") + { + BlobLoaderEvent::BlobLoaded(code_and_id) => code_and_id, + } +} + +async fn run_anvil_blob_loader_test(code: Vec) { + let signer = Signer::memory(); + let private_key: PrivateKey = + "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" + .parse() + .unwrap(); + let public_key = signer.import(private_key).unwrap(); + let alice_address = signer.address(public_key); + + let beacon_block_time = Duration::from_secs(1); + let anvil = Anvil::new().block_time(beacon_block_time.as_secs()).spawn(); + + let ethereum = EthereumDeployer::new(&anvil.ws_endpoint(), signer.clone(), alice_address) + .await + .unwrap() + .with_validators(vec![alice_address].try_into().unwrap()) + .deploy() + .await + .unwrap(); + + let consensus_cfg = ConsensusLayerConfig { + ethereum_rpc: anvil.endpoint(), + ethereum_beacon_rpc: anvil.endpoint(), + beacon_block_time, + attempts: ATTEMPTS, + }; + + let (tx_hash, code_id) = ethereum + .router() + .request_code_validation(&code) + .await + .unwrap(); + + let db = Database::memory(); + set_blob_info(&db, code_id, tx_hash); + + let mut loader = BlobLoader::new(db, consensus_cfg) + .await + .expect("blob loader should connect to anvil"); + loader + .load_codes(HashSet::from([code_id])) + .expect("CodeBlobInfo was inserted"); + + let loaded = expect_blob_loaded(&mut loader).await; + assert_eq!(loaded.code_id, code_id); + assert_eq!(loaded.code, code); +} + +async fn request_code_validation( + chain_id: u64, + beacon_block_time: Duration, + code: &[u8], +) -> (alloy::node_bindings::AnvilInstance, H256, CodeId) { + let signer = Signer::memory(); + let private_key: PrivateKey = + "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" + .parse() + .unwrap(); + let public_key = signer.import(private_key).unwrap(); + let alice_address = signer.address(public_key); + let anvil = Anvil::new() + .chain_id(chain_id) + .block_time(beacon_block_time.as_secs()) + .spawn(); + + let ethereum = EthereumDeployer::new(&anvil.ws_endpoint(), signer.clone(), alice_address) + .await + .unwrap() + .with_validators(vec![alice_address].try_into().unwrap()) + .deploy() + .await + .unwrap(); + + let (tx_hash, code_id) = ethereum + .router() + .request_code_validation(code) + .await + .unwrap(); + + (anvil, tx_hash, code_id) +} + +#[tokio::test] +async fn load_codes_fails_when_code_blob_info_is_missing() { + let anvil = Anvil::new().spawn(); + + let db = Database::memory(); + let reader = test_reader(anvil.endpoint(), anvil.endpoint()).await; + let mut loader = BlobLoader::new_with_consensus_reader(db, reader); + let code_id = CodeId::generate(&[1, 2, 3, 4]); + + let err = loader + .load_codes(HashSet::from([code_id])) + .expect_err("missing CodeBlobInfo must fail"); + + assert!(matches!(err, BlobLoaderError::CodeBlobInfoNotFound(id) if id == code_id)); + assert_eq!(loader.pending_codes_len(), 0); +} + +#[tokio::test] +async fn already_loaded_code_is_emitted_without_remote_read() { + let anvil = Anvil::new().spawn(); + + let db = Database::memory(); + let code = generated_code(64); + let code_id = db.set_original_code(&code); + let tx_hash = H256::random(); + set_blob_info(&db, code_id, tx_hash); + + let reader = test_reader(anvil.endpoint(), unused_local_http_url().await).await; + let mut loader = BlobLoader::new_with_consensus_reader(db, reader); + + loader + .load_codes(HashSet::from([code_id])) + .expect("metadata exists"); + + assert_eq!(loader.pending_codes_len(), 1); + let loaded = expect_blob_loaded(&mut loader).await; + + assert_eq!(loaded.code_id, code_id); + assert_eq!(loaded.code, code); + assert_eq!(loader.pending_codes_len(), 0); +} + +#[tokio::test] +async fn reader_failure_does_not_emit_success_or_terminate_stream() { + let anvil = Anvil::new().spawn(); + + let db = Database::memory(); + let code = generated_code(128); + let code_id = CodeId::generate(&code); + let tx_hash = H256::random(); + set_blob_info(&db, code_id, tx_hash); + + let reader = test_reader(anvil.endpoint(), anvil.endpoint()).await; + let mut loader = BlobLoader::new_with_consensus_reader(db, reader); + + loader + .load_codes(HashSet::from([code_id])) + .expect("metadata exists"); + + let no_event = timeout(Duration::from_millis(100), loader.next()).await; + + assert!( + no_event.is_err(), + "reader failure should be logged and skipped, not emitted as success" + ); + assert!(!loader.is_terminated()); +} + +#[tokio::test] +async fn repeated_load_codes_for_pending_code_schedules_one_remote_read() { + let beacon_block_time = Duration::from_secs(1); + let code = generated_code(128); + let (anvil, tx_hash, code_id) = request_code_validation(31337, beacon_block_time, &code).await; + + let db = Database::memory(); + set_blob_info(&db, code_id, tx_hash); + + let reader = + test_reader_with_block_time(anvil.endpoint(), anvil.endpoint(), beacon_block_time).await; + let mut loader = BlobLoader::new_with_consensus_reader(db, reader); + + loader + .load_codes(HashSet::from([code_id])) + .expect("first request should be accepted"); + loader + .load_codes(HashSet::from([code_id])) + .expect("duplicate pending request should be ignored"); + + assert_eq!(loader.pending_codes_len(), 1); + let _ = expect_blob_loaded(&mut loader).await; + assert!( + loader.next().now_or_never().is_none(), + "duplicate pending request must not queue another ready event" + ); + assert_eq!(loader.pending_codes_len(), 0); +} + +#[tokio::test] +async fn blob_loader_reads_code_from_anvil_tx() { + run_anvil_blob_loader_test(generated_code(128)).await; +} + +#[tokio::test] +async fn blob_loader_reads_code_larger_than_three_blob_chunks_from_anvil_tx() { + run_anvil_blob_loader_test(generated_code(3 * BLOB_CHUNK_SIZE + 17)).await; +} + +#[tokio::test] +async fn consensus_reader_reports_beacon_rpc_disconnect() { + let anvil = Anvil::new().spawn(); + let reader = test_reader(anvil.endpoint(), unused_local_http_url().await).await; + + let err = reader + .read_blob_bundle(0, &[B256::ZERO]) + .await + .expect_err("disconnected beacon rpc should fail"); + + assert!(matches!(err, ReadBlobBundleError::Reqwest(_))); +} + +#[tokio::test] +async fn consensus_reader_reports_invalid_beacon_json() { + let anvil = Anvil::new().spawn(); + let (beacon_rpc, _) = run_beacon_server(vec!["not json".to_string()]).await; + let reader = test_reader(anvil.endpoint(), beacon_rpc).await; + + let err = reader + .read_blob_bundle(0, &[B256::ZERO]) + .await + .expect_err("invalid beacon json should fail"); + + assert!(matches!(err, ReadBlobBundleError::Serde(_))); +} + +#[tokio::test] +async fn consensus_reader_uses_beacon_genesis_slot_for_non_anvil_chain_id() { + let beacon_block_time = Duration::from_secs(1); + let code = generated_code(128); + let (anvil, tx_hash, code_id) = request_code_validation(1, beacon_block_time, &code).await; + let provider: RootProvider = ProviderBuilder::default() + .connect(&anvil.endpoint()) + .await + .unwrap(); + let tx = provider + .get_transaction_by_hash(tx_hash.0.into()) + .await + .unwrap() + .unwrap(); + let block_hash = tx.block_hash.unwrap(); + let block = provider + .get_block_by_hash(block_hash) + .await + .unwrap() + .unwrap(); + let expected_slot = block.header.number; + let genesis_time = block.header.timestamp - expected_slot; + let blob_body = reqwest::get(format!( + "{}/eth/v1/beacon/blobs/{expected_slot}?versioned_hashes={}", + anvil.endpoint(), + tx.blob_versioned_hashes().unwrap()[0] + )) + .await + .unwrap() + .text() + .await + .unwrap(); + let (beacon_rpc, paths) = run_beacon_server(vec![ + format!( + r#"{{"data":{{"genesis_time":"{genesis_time}","genesis_validators_root":"0x0000000000000000000000000000000000000000000000000000000000000000","genesis_fork_version":"0x00000000"}}}}"# + ), + blob_body, + ]) + .await; + let reader = test_reader_with_block_time(anvil.endpoint(), beacon_rpc, beacon_block_time).await; + + let blob = reader.read_blob(code_id, tx_hash).await.unwrap(); + + assert_eq!(blob, code); + let paths = paths.lock().await; + assert!(paths.iter().any(|path| path == "/eth/v1/beacon/genesis")); + assert!(paths.iter().any(|path| { + path.starts_with(&format!( + "/eth/v1/beacon/blobs/{expected_slot}?versioned_hashes=" + )) + })); +} + +#[tokio::test] +async fn consensus_reader_re_requests_blob_after_transient_invalid_json() { + let beacon_block_time = Duration::from_secs(1); + let code = generated_code(128); + let (anvil, tx_hash, code_id) = request_code_validation(31337, beacon_block_time, &code).await; + let provider: RootProvider = ProviderBuilder::default() + .connect(&anvil.endpoint()) + .await + .unwrap(); + let tx = provider + .get_transaction_by_hash(tx_hash.0.into()) + .await + .unwrap() + .unwrap(); + let slot = tx.block_number.unwrap(); + let blob_body = reqwest::get(format!( + "{}/eth/v1/beacon/blobs/{slot}?versioned_hashes={}", + anvil.endpoint(), + tx.blob_versioned_hashes().unwrap()[0] + )) + .await + .unwrap() + .text() + .await + .unwrap(); + let (beacon_rpc, paths) = run_beacon_server(vec!["not json".to_string(), blob_body]).await; + let reader = test_reader(anvil.endpoint(), beacon_rpc).await; + + let blob = reader.read_blob(code_id, tx_hash).await.unwrap(); + + assert_eq!(blob, code); + let blob_requests = paths + .lock() + .await + .iter() + .filter(|path| path.starts_with(&format!("/eth/v1/beacon/blobs/{slot}"))) + .count(); + assert_eq!(blob_requests, 2); +} + +#[test] +fn test_handle_blob() { + let code_id = CodeId::generate(&[1, 2, 3, 4]); + + // correct blob + let blob = vec![1, 2, 3, 4]; + let mut previously_received_code_id = None; + let result = handle_blob(blob.clone(), code_id, &mut previously_received_code_id, 1).unwrap(); + assert_eq!(result, blob); + + // blob with incorrect code id + let blob = vec![4, 3, 2, 1]; + let blob_code_id = CodeId::generate(&blob); + let mut previously_received_code_id = None; + let result = handle_blob(blob.clone(), code_id, &mut previously_received_code_id, 1); + assert!(matches!( + result, + Err(ReaderError::CodeIdMismatch { + expected, + found, + }) if expected == code_id && found == blob_code_id + ),); + assert_eq!(previously_received_code_id, Some(blob_code_id)); + + // same incorrect blob again - should be considered as loaded + let result = handle_blob(blob.clone(), code_id, &mut previously_received_code_id, 2).unwrap(); + assert_eq!(result, blob); + + // same incorrect blob again, but another code id + let previously_received_code_id = CodeId::from([1; 32]); + let result = handle_blob( + blob.clone(), + code_id, + &mut Some(previously_received_code_id), + 2, + ); + assert!(matches!( + result, + Err(ReaderError::CodeIdMismatch { + expected, + found, + }) if expected == code_id && found == blob_code_id + )); + + // empty blob + let blob = vec![]; + let mut previously_received_code_id = None; + let result = handle_blob(blob.clone(), code_id, &mut previously_received_code_id, 1); + assert!(result.is_err()); + assert!(previously_received_code_id.is_none()); +} From bc3b3d73fdba3340d8cd4d3a499df6036cac95d7 Mon Sep 17 00:00:00 2001 From: playX18 Date: Tue, 28 Apr 2026 20:54:46 +0700 Subject: [PATCH 7/8] remove consensus_reader_reports_invalid_beacon_json --- ethexe/blob-loader/src/tests.rs | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/ethexe/blob-loader/src/tests.rs b/ethexe/blob-loader/src/tests.rs index e83c2de79be..361f9245cec 100644 --- a/ethexe/blob-loader/src/tests.rs +++ b/ethexe/blob-loader/src/tests.rs @@ -1,4 +1,3 @@ - use super::*; use alloy::node_bindings::Anvil; use ethexe_common::{ @@ -341,20 +340,6 @@ async fn consensus_reader_reports_beacon_rpc_disconnect() { assert!(matches!(err, ReadBlobBundleError::Reqwest(_))); } -#[tokio::test] -async fn consensus_reader_reports_invalid_beacon_json() { - let anvil = Anvil::new().spawn(); - let (beacon_rpc, _) = run_beacon_server(vec!["not json".to_string()]).await; - let reader = test_reader(anvil.endpoint(), beacon_rpc).await; - - let err = reader - .read_blob_bundle(0, &[B256::ZERO]) - .await - .expect_err("invalid beacon json should fail"); - - assert!(matches!(err, ReadBlobBundleError::Serde(_))); -} - #[tokio::test] async fn consensus_reader_uses_beacon_genesis_slot_for_non_anvil_chain_id() { let beacon_block_time = Duration::from_secs(1); From 28b8122cc8baa92830c21693debe56bbea644013 Mon Sep 17 00:00:00 2001 From: playX18 Date: Wed, 20 May 2026 07:25:40 +0700 Subject: [PATCH 8/8] workspace hack --- utils/gear-workspace-hack/Cargo.toml | 2 -- 1 file changed, 2 deletions(-) diff --git a/utils/gear-workspace-hack/Cargo.toml b/utils/gear-workspace-hack/Cargo.toml index 75808e89db8..fcd4ebdffa1 100644 --- a/utils/gear-workspace-hack/Cargo.toml +++ b/utils/gear-workspace-hack/Cargo.toml @@ -215,7 +215,6 @@ features = ["test-helpers"] ### BEGIN HAKARI SECTION [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -aes = { version = "0.8", default-features = false, features = ["zeroize"] } ahash = { version = "0.8" } alloy = { version = "2", features = ["kzg", "node-bindings", "provider-anvil-api", "provider-ws", "rpc-types-beacon", "rpc-types-eth", "signer-mnemonic"] } alloy-chains = { version = "0.2" } @@ -496,7 +495,6 @@ winnow = { version = "0.7" } zeroize = { version = "1", features = ["derive", "std"] } [target.'cfg(not(target_arch = "wasm32"))'.build-dependencies] -aes = { version = "0.8", default-features = false, features = ["zeroize"] } ahash = { version = "0.8" } alloy = { version = "2", features = ["kzg", "node-bindings", "provider-anvil-api", "provider-ws", "rpc-types-beacon", "rpc-types-eth", "signer-mnemonic"] } alloy-chains = { version = "0.2" }