From 3faadfad53f404b09eafed3f5e93fb765c332243 Mon Sep 17 00:00:00 2001 From: arya2 Date: Fri, 3 Apr 2026 19:06:16 -0400 Subject: [PATCH 1/9] test(block_size_tracker): add unit tests for recommended_batch_size logic --- .../non_finalized_state/tests/vectors.rs | 62 +++++++++++++++++ .../sync/downloads/block_size_tracker.rs | 67 +++++++++++++++++++ 2 files changed, 129 insertions(+) diff --git a/zebra-state/src/service/non_finalized_state/tests/vectors.rs b/zebra-state/src/service/non_finalized_state/tests/vectors.rs index b101b5af416..bcc1ce436ac 100644 --- a/zebra-state/src/service/non_finalized_state/tests/vectors.rs +++ b/zebra-state/src/service/non_finalized_state/tests/vectors.rs @@ -1111,6 +1111,68 @@ fn pipeline_checkpoint_commit_then_finalize_to_disk() { ); } +/// Test that pruning removes exactly the blocks that have been written to disk. +#[test] +fn pruning_removes_finalized_blocks_from_nfs() { + let _init_guard = zebra_test::init(); + let (mut finalized_state, mut nfs) = test_state(); + let network = Network::Mainnet; + let mut db = finalized_state.db.clone(); + + let blocks: Vec> = CONTINUOUS_MAINNET_BLOCKS + .values() + .take(6) + .map(|bytes| bytes.zcash_deserialize_into().unwrap()) + .collect(); + + // Commit genesis (block 0) directly to disk, matching the real pipeline. + finalized_state + .commit_finalized_direct(blocks[0].clone().into(), None, "test") + .expect("genesis commit should succeed"); + + // Commit blocks 1-5 to NFS. + for block in &blocks[1..] { + nfs.commit_checkpoint_block( + CheckpointVerifiedBlock::from(block.clone()), + &finalized_state.db, + ) + .expect("commit should succeed"); + } + assert_eq!(nfs.best_chain_len(), Some(5)); + + // Write blocks 1-3 to disk (simulating Thread 3 catching up). + for block in &blocks[1..4] { + let checkpoint_verified = CheckpointVerifiedBlock::from(block.clone()); + let treestate = Treestate::default(); + let finalized = FinalizedBlock::from_checkpoint_verified(checkpoint_verified, treestate); + let empty_nfs = NonFinalizedState::new(&network); + let spent_utxos = db.lookup_spent_utxos(&finalized, &empty_nfs); + db.write_block(finalized, spent_utxos, None, &network, "test") + .expect("write should succeed"); + } + + // Prune: same logic as write.rs pipeline pruning loop. + let finalized_tip_height = db.finalized_tip_height().unwrap(); + assert_eq!(finalized_tip_height, Height(3)); + + while nfs + .root_height() + .is_some_and(|root| root <= finalized_tip_height) + { + nfs.finalize(); + } + + // Blocks 1-3 should be pruned, leaving blocks 4-5. + assert_eq!(nfs.best_chain_len(), Some(2), "should have 2 blocks left"); + assert_eq!( + nfs.root_height(), + Some(Height(4)), + "root should be at height 4" + ); + let (tip_height, _) = nfs.best_tip().expect("should have tip"); + assert_eq!(tip_height, Height(5), "tip should be at height 5"); +} + /// Creates an ephemeral finalized state and empty non-finalized state for testing. fn test_state() -> (FinalizedState, NonFinalizedState) { let network = Network::Mainnet; diff --git a/zebrad/src/components/sync/downloads/block_size_tracker.rs b/zebrad/src/components/sync/downloads/block_size_tracker.rs index 033d819d25a..651b46e220b 100644 --- a/zebrad/src/components/sync/downloads/block_size_tracker.rs +++ b/zebrad/src/components/sync/downloads/block_size_tracker.rs @@ -83,3 +83,70 @@ impl BlockSizeTracker { ((4 * BATCH_TARGET_SIZE) / (5 * avg_size)).clamp(1, MAX_BATCH_SIZE) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn empty_tracker_returns_max_batch_size() { + let mut tracker = BlockSizeTracker::new(); + assert_eq!(tracker.recommended_batch_size(), MAX_BATCH_SIZE); + } + + #[test] + fn small_blocks_yield_max_batch_size() { + let mut tracker = BlockSizeTracker::new(); + // 1KB blocks: 1_000_000 / 1_000 * 4/5 = 800, clamped to 16 + for _ in 0..10 { + tracker.sender().send(1_000).unwrap(); + } + assert_eq!(tracker.recommended_batch_size(), MAX_BATCH_SIZE); + } + + #[test] + fn large_blocks_yield_small_batch_size() { + let mut tracker = BlockSizeTracker::new(); + // 800KB blocks: 1_000_000 / 800_000 * 4/5 = 1.0, clamped to 1 + for _ in 0..10 { + tracker.sender().send(800_000).unwrap(); + } + assert_eq!(tracker.recommended_batch_size(), 1); + } + + #[test] + fn medium_blocks_yield_intermediate_batch_size() { + let mut tracker = BlockSizeTracker::new(); + // 200KB blocks: 1_000_000 / 200_000 * 4/5 = 4 + for _ in 0..10 { + tracker.sender().send(200_000).unwrap(); + } + assert_eq!(tracker.recommended_batch_size(), 4); + } + + #[test] + fn rolling_window_evicts_old_samples() { + let mut tracker = BlockSizeTracker::new(); + // Push 100 large samples (800KB) + for _ in 0..MAX_RECENT_BLOCK_SIZE_SAMPLES { + tracker.sender().send(800_000).unwrap(); + } + // Drain them + assert_eq!(tracker.recommended_batch_size(), 1); + + // Push 100 small samples (1KB) — old large samples should be evicted + for _ in 0..MAX_RECENT_BLOCK_SIZE_SAMPLES { + tracker.sender().send(1_000).unwrap(); + } + assert_eq!(tracker.recommended_batch_size(), MAX_BATCH_SIZE); + } + + #[test] + fn zero_size_blocks_return_max() { + let mut tracker = BlockSizeTracker::new(); + for _ in 0..10 { + tracker.sender().send(0).unwrap(); + } + assert_eq!(tracker.recommended_batch_size(), MAX_BATCH_SIZE); + } +} From 2d4b3fe40e8bd7b36a1d291962a079f6a47fdf0e Mon Sep 17 00:00:00 2001 From: arya2 Date: Fri, 3 Apr 2026 19:18:30 -0400 Subject: [PATCH 2/9] feat(copy_state): Improve `copy-state` command performance by avoiding unnecessary transaction serializations --- zebra-consensus/src/block.rs | 1 + zebra-state/src/arbitrary.rs | 1 + zebra-state/src/lib.rs | 4 +-- zebra-state/src/request.rs | 24 +++++++++++++ zebra-state/src/service/chain_tip.rs | 1 + .../service/finalized_state/zebra_db/block.rs | 36 +++++++++++++++++-- .../zebra_db/block/tests/vectors.rs | 1 + zebrad/src/commands/copy_state.rs | 23 +++++++----- 8 files changed, 79 insertions(+), 12 deletions(-) diff --git a/zebra-consensus/src/block.rs b/zebra-consensus/src/block.rs index e75ad458ad6..55a6b1dcd36 100644 --- a/zebra-consensus/src/block.rs +++ b/zebra-consensus/src/block.rs @@ -339,6 +339,7 @@ where new_outputs, transaction_hashes, deferred_pool_balance_change: Some(deferred_pool_balance_change), + cached_raw_transactions: None, }; // Return early for proposal requests. diff --git a/zebra-state/src/arbitrary.rs b/zebra-state/src/arbitrary.rs index 1dc9b7ce33d..71e94c9e937 100644 --- a/zebra-state/src/arbitrary.rs +++ b/zebra-state/src/arbitrary.rs @@ -98,6 +98,7 @@ impl ContextuallyVerifiedBlock { new_outputs, transaction_hashes, deferred_pool_balance_change: _, + cached_raw_transactions: _, } = block.into(); Self { diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index d604897b2c4..b2599e23f14 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -72,7 +72,7 @@ pub use service::{ pub use service::finalized_state::{ReadDisk, TypedColumnFamily, WriteTypedBatch}; pub use service::{ - finalized_state::{DiskWriteBatch, FromDisk, IntoDisk, WriteDisk, ZebraDb}, + finalized_state::{DiskWriteBatch, FromDisk, IntoDisk, RawBytes, WriteDisk, ZebraDb}, ReadStateService, }; @@ -80,7 +80,7 @@ pub use service::{ #[cfg(any(test, feature = "proptest-impl"))] pub use service::{ arbitrary::{populated_state, CHAIN_TIP_UPDATE_WAIT_LIMIT}, - finalized_state::{RawBytes, KV, MAX_ON_DISK_HEIGHT}, + finalized_state::{KV, MAX_ON_DISK_HEIGHT}, init_test, init_test_services, }; diff --git a/zebra-state/src/request.rs b/zebra-state/src/request.rs index 82620468285..03679568661 100644 --- a/zebra-state/src/request.rs +++ b/zebra-state/src/request.rs @@ -33,6 +33,7 @@ use crate::{ }; use crate::{ error::{CommitCheckpointVerifiedError, InvalidateError, LayeredStateError, ReconsiderError}, + service::finalized_state::RawBytes, CommitSemanticallyVerifiedError, }; @@ -260,6 +261,12 @@ pub struct SemanticallyVerifiedBlock { pub transaction_hashes: Arc<[transaction::Hash]>, /// This block's deferred pool value balance change. pub deferred_pool_balance_change: Option, + /// Cached raw serialized transaction bytes from the source database. + /// + /// When set, these bytes are written directly to the target database + /// without re-serializing the transactions. Used by the `copy-state` + /// command to avoid redundant serialization. + pub cached_raw_transactions: Option>, } /// A block ready to be committed directly to the finalized state with @@ -392,6 +399,8 @@ pub struct FinalizedBlock { pub(super) treestate: Treestate, /// This block's deferred pool value balance change. pub(super) deferred_pool_balance_change: Option, + /// Cached raw serialized transaction bytes from the source database. + pub(super) cached_raw_transactions: Option>, } impl FinalizedBlock { @@ -418,6 +427,7 @@ impl FinalizedBlock { transaction_hashes: block.transaction_hashes, treestate, deferred_pool_balance_change: block.deferred_pool_balance_change, + cached_raw_transactions: block.cached_raw_transactions, } } } @@ -491,6 +501,7 @@ impl ContextuallyVerifiedBlock { new_outputs, transaction_hashes, deferred_pool_balance_change, + cached_raw_transactions: _, } = semantically_verified; // This is redundant for the non-finalized state, @@ -534,6 +545,15 @@ impl CheckpointVerifiedBlock { pub fn with_hash(block: Arc, hash: block::Hash) -> Self { Self(SemanticallyVerifiedBlock::with_hash(block, hash)) } + + /// Attaches cached raw serialized transaction bytes to this block. + /// + /// When set, these bytes are written directly to the target database + /// without re-serializing the transactions. + pub fn with_cached_raw_transactions(mut self, raw_txs: Vec) -> Self { + self.0.cached_raw_transactions = Some(raw_txs); + self + } } impl SemanticallyVerifiedBlock { @@ -552,6 +572,7 @@ impl SemanticallyVerifiedBlock { new_outputs, transaction_hashes, deferred_pool_balance_change: None, + cached_raw_transactions: None, } } @@ -587,6 +608,7 @@ impl From> for SemanticallyVerifiedBlock { new_outputs, transaction_hashes, deferred_pool_balance_change: None, + cached_raw_transactions: None, } } } @@ -602,6 +624,7 @@ impl From for SemanticallyVerifiedBlock { deferred_pool_balance_change: Some(DeferredPoolBalanceChange::new( valid.chain_value_pool_change.deferred_amount(), )), + cached_raw_transactions: None, } } } @@ -615,6 +638,7 @@ impl From for SemanticallyVerifiedBlock { new_outputs: finalized.new_outputs, transaction_hashes: finalized.transaction_hashes, deferred_pool_balance_change: finalized.deferred_pool_balance_change, + cached_raw_transactions: finalized.cached_raw_transactions, } } } diff --git a/zebra-state/src/service/chain_tip.rs b/zebra-state/src/service/chain_tip.rs index 40d67d67929..c7986ef2734 100644 --- a/zebra-state/src/service/chain_tip.rs +++ b/zebra-state/src/service/chain_tip.rs @@ -116,6 +116,7 @@ impl From for ChainTipBlock { new_outputs: _, transaction_hashes, deferred_pool_balance_change: _, + cached_raw_transactions: _, } = prepared; Self { diff --git a/zebra-state/src/service/finalized_state/zebra_db/block.rs b/zebra-state/src/service/finalized_state/zebra_db/block.rs index 2996494833c..4af2afd9b69 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/block.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/block.rs @@ -180,6 +180,33 @@ impl ZebraDb { })) } + /// Returns the [`Block`] with [`block::Hash`] or [`Height`], if it exists + /// in the finalized chain, along with the raw serialized bytes of each transaction. + /// + /// The raw bytes can be used to avoid re-serializing transactions when writing + /// them to another database (e.g., in the `copy-state` command). + #[allow(clippy::unwrap_in_result)] + pub fn block_and_raw_transactions( + &self, + hash_or_height: HashOrHeight, + ) -> Option<(Arc, Vec)> { + let height = hash_or_height.height_or_else(|hash| self.height(hash))?; + let header = self.block_header(height.into())?; + + let mut transactions = Vec::new(); + let mut raw_transactions = Vec::new(); + for (_loc, raw) in self.raw_transactions_by_height(height) { + transactions.push(Arc::new(Transaction::from_bytes(raw.raw_bytes()))); + raw_transactions.push(raw); + } + + let block = Arc::new(Block { + header, + transactions, + }); + Some((block, raw_transactions)) + } + /// Returns the [`Block`] with [`block::Hash`] or [`Height`], if it exists /// in the finalized chain, and its serialized size. #[allow(clippy::unwrap_in_result)] @@ -766,8 +793,13 @@ impl DiskWriteBatch { { let transaction_location = TransactionLocation::from_usize(*height, transaction_index); - // Commit each transaction's data - self.zs_insert(&tx_by_loc, transaction_location, transaction); + // Commit each transaction's data, using cached raw bytes when available + // to avoid re-serializing transactions that were just read from another database. + if let Some(ref cached) = finalized.cached_raw_transactions { + self.zs_insert(&tx_by_loc, transaction_location, &cached[transaction_index]); + } else { + self.zs_insert(&tx_by_loc, transaction_location, transaction); + } // Index each transaction hash and location self.zs_insert(&hash_by_tx_loc, transaction_location, transaction_hash); diff --git a/zebra-state/src/service/finalized_state/zebra_db/block/tests/vectors.rs b/zebra-state/src/service/finalized_state/zebra_db/block/tests/vectors.rs index 1e8c1b6399f..8a4361cdd66 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/block/tests/vectors.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/block/tests/vectors.rs @@ -345,6 +345,7 @@ fn test_block_db_round_trip_with( new_outputs, transaction_hashes, deferred_pool_balance_change: None, + cached_raw_transactions: None, }) }; diff --git a/zebrad/src/commands/copy_state.rs b/zebrad/src/commands/copy_state.rs index 6c998d54c6b..8cf0e2874d0 100644 --- a/zebrad/src/commands/copy_state.rs +++ b/zebrad/src/commands/copy_state.rs @@ -44,8 +44,8 @@ use zebra_chain::{ block::{Block, Height}, parameters::Network, }; -use zebra_state as old_zs; use zebra_state as new_zs; +use zebra_state::{self as old_zs, RawBytes}; use crate::{ components::tokio::{RuntimeRun, TokioComponent}, @@ -213,19 +213,24 @@ impl CopyStateCmd { // Pipeline: read blocks from source in a blocking thread, send them // through a channel, and commit them to the target state concurrently. - let (block_tx, mut block_rx) = tokio::sync::mpsc::channel::<(u32, Arc)>(100); + let (block_tx, mut block_rx) = + tokio::sync::mpsc::channel::<(u32, Arc, Vec)>(100); // Source reader: reads blocks sequentially from the source DB and sends // them through the channel. Runs in a blocking thread because source DB - // reads are synchronous. + // reads are synchronous. Raw transaction bytes are sent alongside each + // block to avoid re-serializing them on write. let source_db = source_db.clone(); let reader_handle = tokio::task::spawn_blocking(move || { for height in min_target_height..=max_copy_height { - let source_block = source_db - .block(Height(height).into()) + let (source_block, raw_txs) = source_db + .block_and_raw_transactions(Height(height).into()) .unwrap_or_else(|| panic!("missing source block at height {height}")); - if block_tx.blocking_send((height, source_block)).is_err() { + if block_tx + .blocking_send((height, source_block, raw_txs)) + .is_err() + { break; } } @@ -235,13 +240,15 @@ impl CopyStateCmd { // commit each block to the target state. The spawned tasks run // concurrently so many blocks can be in flight at once. let mut commit_count: u32 = 0; - while let Some((height, source_block)) = block_rx.recv().await { + while let Some((height, source_block, raw_txs)) = block_rx.recv().await { + let checkpoint_block = new_zs::CheckpointVerifiedBlock::from(source_block) + .with_cached_raw_transactions(raw_txs); let rsp = target_state .ready() .await? .call(new_zs::Request::CommitCheckpointVerifiedBlock( - source_block.into(), + checkpoint_block, )); tokio::spawn(async move { From af6b14863d2cecd98125fcb51e87fd40ff2d3dd5 Mon Sep 17 00:00:00 2001 From: arya2 Date: Fri, 3 Apr 2026 23:52:16 -0400 Subject: [PATCH 3/9] feat(sync_perf): add performance benchmarks for Mainnet and Testnet sync --- zebrad/tests/acceptance.rs | 24 ++ zebrad/tests/common/mod.rs | 1 + zebrad/tests/common/sync.rs | 3 +- zebrad/tests/common/sync_perf.rs | 457 +++++++++++++++++++++++++++++++ 4 files changed, 484 insertions(+), 1 deletion(-) create mode 100644 zebrad/tests/common/sync_perf.rs diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 8546aebfc10..3b6affafeac 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -210,6 +210,7 @@ use common::{ STOP_ON_LOAD_TIMEOUT, SYNC_FINISHED_REGEX, TINY_CHECKPOINT_TEST_HEIGHT, TINY_CHECKPOINT_TIMEOUT, }, + sync_perf::{self, MAINNET_SAMPLES, TESTNET_SAMPLES}, test_type::TestType::{self, *}, }; @@ -1226,6 +1227,29 @@ fn sync_large_checkpoints_mempool_mainnet() -> Result<()> { .map(|_tempdir| ()) } +/// Benchmark initial sync performance across different Mainnet chain eras. +/// +/// Requires `ZEBRA_SOURCE_STATE_DIR` pointing to a fully-synced Mainnet state. +/// Produces a markdown report with blocks/sec per era for cross-branch comparison. +/// +/// Run with: `cargo test -p zebrad sync_performance_benchmark_mainnet -- --ignored --nocapture` +#[test] +#[ignore] +fn sync_performance_benchmark_mainnet() -> Result<()> { + sync_perf::sync_performance_benchmark(&Mainnet, MAINNET_SAMPLES) +} + +/// Benchmark initial sync performance across different Testnet chain eras. +/// +/// Requires `ZEBRA_SOURCE_STATE_DIR` pointing to a fully-synced Testnet state. +/// +/// Run with: `cargo test -p zebrad sync_performance_benchmark_testnet -- --ignored --nocapture` +#[test] +#[ignore] +fn sync_performance_benchmark_testnet() -> Result<()> { + sync_perf::sync_performance_benchmark(&Network::new_default_testnet(), TESTNET_SAMPLES) +} + #[tracing::instrument] fn create_cached_database(network: Network) -> Result<()> { let height = network.mandatory_checkpoint_height(); diff --git a/zebrad/tests/common/mod.rs b/zebrad/tests/common/mod.rs index 24e474e9a2e..e29f79e73f0 100644 --- a/zebrad/tests/common/mod.rs +++ b/zebrad/tests/common/mod.rs @@ -20,4 +20,5 @@ pub mod launch; pub mod lightwalletd; pub mod regtest; pub mod sync; +pub mod sync_perf; pub mod test_type; diff --git a/zebrad/tests/common/sync.rs b/zebrad/tests/common/sync.rs index 2289d24cb42..2ef4127fe94 100644 --- a/zebrad/tests/common/sync.rs +++ b/zebrad/tests/common/sync.rs @@ -255,8 +255,9 @@ pub fn sync_until( // // Since it needs to collect all the output, // the test harness can't kill `zebrad` after it logs the `stop_regex`. + // debug_stop_at_height ensures zebrad exits naturally at any height. assert!( - height.0 < 2_000_000, + height.0 < 10_000_000, "zebrad must exit by itself, so we can collect all the output", ); let output = child.wait_with_output()?; diff --git a/zebrad/tests/common/sync_perf.rs b/zebrad/tests/common/sync_perf.rs new file mode 100644 index 00000000000..9f4ffff464a --- /dev/null +++ b/zebrad/tests/common/sync_perf.rs @@ -0,0 +1,457 @@ +//! Sync performance regression test infrastructure. +//! +//! Measures initial sync throughput across different chain eras by seeding +//! state via `copy-state` and syncing ~50k-block windows from the network. + +use std::{ + env, + path::{Path, PathBuf}, + process::Command, + time::{Duration, Instant}, +}; + +use color_eyre::eyre::{eyre, Result}; + +use zebra_chain::{block::Height, parameters::Network}; + +use zebrad::components::sync; + +use zebra_test::{args, prelude::*}; + +use super::config::{persistent_test_config, testdir}; +use super::launch::ZebradTestDirExt; +use super::sync::{MIN_HEIGHT_FOR_DEFAULT_LOOKAHEAD, STOP_AT_HEIGHT_REGEX}; + +/// Default number of blocks to sync per sample. +const DEFAULT_SAMPLE_SIZE: u32 = 50_000; + +/// Timeout per sample sync from the network. +const SAMPLE_SYNC_TIMEOUT: Duration = Duration::from_secs(60 * 60); // 1 hour + +/// A block range to sample for performance measurement. +pub struct SampleRange { + pub name: &'static str, + pub start: u32, + pub end: u32, + pub description: &'static str, +} + +/// Mainnet sample ranges covering each network upgrade era. +pub const MAINNET_SAMPLES: &[SampleRange] = &[ + SampleRange { + name: "Genesis", + start: 0, + end: 10_000, + description: "V1-V3 transparent-only", + }, + SampleRange { + name: "Early", + start: 20_000, + end: 50_000, + description: "V1-V3 transparent-only", + }, + SampleRange { + name: "Overwinter-Sapling", + start: 395_000, + end: 445_000, + description: "Overwinter (347,500) -> Sapling (419,200)", + }, + SampleRange { + name: "Blossom", + start: 630_000, + end: 680_000, + description: "Blossom activation (653,600)", + }, + SampleRange { + name: "Heartwood", + start: 880_000, + end: 930_000, + description: "Heartwood activation (903,000)", + }, + SampleRange { + name: "NU5", + start: 1_670_000, + end: 1_700_000, + description: "NU5 (1,687,104), V5 txs + Orchard", + }, + SampleRange { + name: "Spam-attack", + start: 1_820_000, + end: 1_850_000, + description: "Dense transparent dust transactions", + }, + SampleRange { + name: "Post-NU6.1", + start: 3_100_000, + end: 3_150_000, + description: "Post-NU6.1, current chain", + }, +]; + +/// Testnet sample ranges. +pub const TESTNET_SAMPLES: &[SampleRange] = &[ + SampleRange { + name: "Genesis", + start: 0, + end: 50_000, + description: "Early testnet chain", + }, + SampleRange { + name: "Sapling", + start: 260_000, + end: 310_000, + description: "Sapling activation (280,000)", + }, + SampleRange { + name: "Canopy", + start: 1_010_000, + end: 1_060_000, + description: "Canopy activation (1,028,500)", + }, + SampleRange { + name: "NU5", + start: 1_820_000, + end: 1_870_000, + description: "NU5 (1,842,420), V5 txs", + }, + SampleRange { + name: "Post-NU6.1", + start: 3_500_000, + end: 3_550_000, + description: "Post-NU6.1, recent testnet", + }, +]; + +/// Result of syncing a single sample range. +struct SampleResult { + name: &'static str, + start: u32, + end: u32, + description: &'static str, + copy_duration: Option, + sync_duration: Duration, + blocks_synced: u32, + blocks_per_sec: f64, +} + +/// Run the performance benchmark for a network with the given sample ranges. +/// +/// Uses the default Zebra cache directory as the source state, or +/// `ZEBRAD_PERF_SOURCE_STATE_DIR` if set. Optionally writes a markdown +/// report to `ZEBRAD_PERF_REPORT_PATH`. +pub fn sync_performance_benchmark(network: &Network, samples: &[SampleRange]) -> Result<()> { + let _init_guard = zebra_test::init(); + + // Use ZEBRAD_PERF_SOURCE_STATE_DIR if set, otherwise fall back to the + // default Zebra cache directory. The env var name avoids the ZEBRA_ prefix + // which Zebra's config loader interprets as config overrides. + let source_state_dir = env::var("ZEBRAD_PERF_SOURCE_STATE_DIR") + .map(PathBuf::from) + .unwrap_or_else(|_| zebra_chain::common::default_cache_dir()); + + if !source_state_dir.exists() { + return Err(eyre!( + "Source state directory does not exist: {}\n\ + Set ZEBRAD_PERF_SOURCE_STATE_DIR or sync Zebra first.", + source_state_dir.display() + )); + } + + let sample_size = env::var("ZEBRAD_PERF_SAMPLE_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_SAMPLE_SIZE); + + let (commit, branch) = collect_git_info(); + let hardware = collect_hardware_info(); + let network_name = format!("{network}"); + + eprintln!("=== Sync Performance Benchmark ==="); + eprintln!("Branch: {branch} Commit: {commit}"); + eprintln!("Network: {network_name} Sample size: {sample_size} blocks"); + eprintln!(); + + let mut results = Vec::new(); + + for sample in samples { + // Allow overriding sample size via env var + // let end = sample.start + sample_size; + + eprintln!( + "--- {} ({} -> {}) ---", + sample.name, sample.start, sample.end + ); + + let result = run_sample(&source_state_dir, sample, network)?; + + eprintln!( + " {} blocks in {:.1}s = {:.1} blocks/sec\n", + result.blocks_synced, + result.sync_duration.as_secs_f64(), + result.blocks_per_sec, + ); + + results.push(result); + } + + let report = format_report(&network_name, &commit, &branch, &hardware, &results); + + eprintln!("{report}"); + + // Optionally write to file + if let Ok(path) = env::var("ZEBRAD_PERF_REPORT_PATH") { + std::fs::write(&path, &report)?; + eprintln!("Report written to {path}"); + } + + Ok(()) +} + +/// Run a single sample: seed state via copy-state, then sync and measure. +fn run_sample( + source_state_dir: &Path, + sample: &SampleRange, + network: &Network, +) -> Result { + let tempdir = testdir()?; + + // Phase 1: Copy state to start height (skip for genesis). + // copy-state writes to tempdir/state/v27/{network}/ via the target config. + let copy_duration = if sample.start > 0 { + Some(copy_state_to_height( + source_state_dir, + tempdir.path(), + sample.start, + network, + )?) + } else { + None + }; + + if let Some(d) = ©_duration { + eprintln!(" Copy to height {}: {:.1}s", sample.start, d.as_secs_f64()); + } + + // Phase 2: Sync from network, measuring wall-clock time. + // + // We manage the zebrad process directly instead of using `sync_until` + // because `sync_until` with `ShouldNotActivate` uses `wait_with_output`, + // which does NOT respect the timeout set by `with_timeout`. By using + // `expect_stdout_line_matches` the deadline is enforced. + let mut config = persistent_test_config(network)?; + config.state.debug_stop_at_height = Some(sample.end); + config.consensus.checkpoint_sync = true; + + if Height(sample.end) > MIN_HEIGHT_FOR_DEFAULT_LOOKAHEAD { + config.sync.checkpoint_verify_concurrency_limit = + sync::DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT; + } + + let tempdir = tempdir.replace_config(&mut config)?; + + let sync_start = Instant::now(); + let mut child = tempdir + .spawn_child(args!["start"])? + .with_timeout(SAMPLE_SYNC_TIMEOUT); + + child.expect_stdout_line_matches(STOP_AT_HEIGHT_REGEX)?; + let sync_duration = sync_start.elapsed(); + + child.kill(true)?; + let _tempdir = child.dir.take().expect("dir was not already taken"); + child.wait_with_output()?; + + let blocks_synced = sample.end - sample.start; + let blocks_per_sec = blocks_synced as f64 / sync_duration.as_secs_f64(); + + Ok(SampleResult { + name: sample.name, + start: sample.start, + end: sample.end, + description: sample.description, + copy_duration, + sync_duration, + blocks_synced, + blocks_per_sec, + }) +} + +/// Use `copy-state` to populate target state up to `height`. +/// +/// `target_dir` is used as both the working directory for config files and +/// the target `cache_dir` — copy-state writes state into `target_dir/state/v27/{network}/`. +fn copy_state_to_height( + source_state_dir: &Path, + target_dir: &Path, + height: u32, + network: &Network, +) -> Result { + // Write source config (used as zebrad's main config for copy-state) + let mut source_config = persistent_test_config(network)?; + source_config.state.cache_dir = source_state_dir.to_owned(); + + let source_config_path = target_dir.join("source-zebrad.toml"); + let source_toml = toml::to_string(&source_config)?; + std::fs::write(&source_config_path, source_toml)?; + + // Write target config with cache_dir = target_dir so state ends up at + // target_dir/state/v27/{network}/, matching what sync_until expects. + let mut target_config = persistent_test_config(network)?; + target_config.state.cache_dir = target_dir.to_owned(); + + let target_config_path = target_dir.join("target.toml"); + let target_toml = toml::to_string(&target_config)?; + std::fs::write(&target_config_path, target_toml)?; + + let start = Instant::now(); + + // Spawn copy-state as subprocess + let zebrad_bin = + env::var("ZEBRAD_BIN").unwrap_or_else(|_| env!("CARGO_BIN_EXE_zebrad").to_string()); + + // Clear ZEBRA_* env vars from the subprocess so Zebra's config loader + // doesn't interpret our test env vars (like ZEBRAD_PERF_*) as config + // field overrides. + let clean_env: Vec<(String, String)> = env::vars() + .filter(|(k, _)| !k.starts_with("ZEBRA")) + .collect(); + let output = Command::new(&zebrad_bin) + .env_clear() + .envs(clean_env) + .args([ + "-c", + source_config_path.to_str().unwrap(), + "copy-state", + "--max-source-height", + &height.to_string(), + "--target-config-path", + target_config_path.to_str().unwrap(), + ]) + .output()?; + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(eyre!("copy-state failed at height {height}: {stderr}")); + } + + Ok(start.elapsed()) +} + +/// Sync from the current state tip to `end_height` and return the wall clock duration. +/// +/// Reuses the provided `tempdir` so that state seeded by copy-state is preserved. +/// Collect git branch and commit info. +fn collect_git_info() -> (String, String) { + let commit = Command::new("git") + .args(["rev-parse", "--short", "HEAD"]) + .output() + .ok() + .and_then(|o| String::from_utf8(o.stdout).ok()) + .map(|s| s.trim().to_string()) + .unwrap_or_else(|| "unknown".to_string()); + + let branch = Command::new("git") + .args(["rev-parse", "--abbrev-ref", "HEAD"]) + .output() + .ok() + .and_then(|o| String::from_utf8(o.stdout).ok()) + .map(|s| s.trim().to_string()) + .unwrap_or_else(|| "unknown".to_string()); + + (commit, branch) +} + +/// Collect basic hardware info for the report. +fn collect_hardware_info() -> String { + let mut info = String::new(); + + // CPU model + if let Ok(cpuinfo) = std::fs::read_to_string("/proc/cpuinfo") { + if let Some(line) = cpuinfo.lines().find(|l| l.starts_with("model name")) { + if let Some(model) = line.split(':').nth(1) { + info.push_str(&format!("CPU: {}\n", model.trim())); + } + } + let cores = cpuinfo + .lines() + .filter(|l| l.starts_with("processor")) + .count(); + info.push_str(&format!("Cores: {cores}\n")); + } + + // RAM + if let Ok(meminfo) = std::fs::read_to_string("/proc/meminfo") { + if let Some(line) = meminfo.lines().find(|l| l.starts_with("MemTotal")) { + if let Some(mem) = line.split_whitespace().nth(1) { + if let Ok(kb) = mem.parse::() { + info.push_str(&format!("RAM: {} GB\n", kb / 1_048_576)); + } + } + } + } + + // Kernel + if let Ok(output) = Command::new("uname").arg("-r").output() { + if let Ok(kernel) = String::from_utf8(output.stdout) { + info.push_str(&format!("Kernel: {}", kernel.trim())); + } + } + + if info.is_empty() { + "Unknown platform".to_string() + } else { + info + } +} + +/// Format the benchmark results as a markdown report. +fn format_report( + network: &str, + commit: &str, + branch: &str, + hardware: &str, + results: &[SampleResult], +) -> String { + let mut report = String::new(); + + report.push_str("# Zebra Sync Performance Report\n\n"); + report.push_str(&format!("**Date:** {}\n", chrono::Utc::now().to_rfc3339())); + report.push_str(&format!("**Branch:** {branch}\n")); + report.push_str(&format!("**Commit:** {commit}\n")); + report.push_str(&format!("**Network:** {network}\n\n")); + + report.push_str("## Hardware\n\n"); + report.push_str("```\n"); + report.push_str(hardware); + report.push_str("\n```\n\n"); + + report.push_str("## Results\n\n"); + report.push_str("| Sample | Heights | Duration | Blocks/sec | Copy Time | Description |\n"); + report.push_str("|--------|---------|----------|------------|-----------|-------------|\n"); + + for r in results { + let copy_time = r + .copy_duration + .map(|d| format!("{:.0}s", d.as_secs_f64())) + .unwrap_or_else(|| "N/A".to_string()); + + report.push_str(&format!( + "| {} | {} → {} | {:.0}s | {:.1} | {} | {} |\n", + r.name, + r.start, + r.end, + r.sync_duration.as_secs_f64(), + r.blocks_per_sec, + copy_time, + r.description, + )); + } + + report.push_str("\n## Notes\n\n"); + report.push_str("- Sync uses public network peers (not isolated)\n"); + report.push_str( + "- Duration measures wall clock from `zebrad start` to `stopping at configured height`\n", + ); + report.push_str("- Copy time is wall clock for `copy-state --max-source-height`\n"); + + report +} From 4a7457df9103bd30aa644fd1fd21c335191fe5f5 Mon Sep 17 00:00:00 2001 From: arya2 Date: Fri, 3 Apr 2026 23:54:13 -0400 Subject: [PATCH 4/9] Now the sync part works but the copy-state still isn't working --- zebra-state/src/service/finalized_state.rs | 36 +++++++++++++++++----- zebra-state/src/service/write.rs | 23 +++++++------- zebrad/src/components/inbound.rs | 1 - 3 files changed, 40 insertions(+), 20 deletions(-) diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index fd64462ae83..49bc33c6a51 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -19,7 +19,7 @@ use std::{ sync::Arc, }; -use zebra_chain::{block, parallel::tree::NoteCommitmentTrees, parameters::Network}; +use zebra_chain::{block, parallel::tree::NoteCommitmentTrees, parameters::Network, transparent}; use zebra_db::{ chain::BLOCK_INFO, transparent::{BALANCE_BY_TRANSPARENT_ADDR, TX_LOC_BY_SPENT_OUT_LOC}, @@ -325,7 +325,7 @@ impl FinalizedState { prev_note_commitment_trees: Option, source: &str, ) -> Result<(block::Hash, NoteCommitmentTrees), CommitCheckpointVerifiedError> { - let (height, hash, finalized, prev_note_commitment_trees) = match finalizable_block { + let (finalized, prev_note_commitment_trees) = match finalizable_block { FinalizableBlock::Checkpoint { checkpoint_verified, } => { @@ -387,8 +387,6 @@ impl FinalizedState { }; ( - checkpoint_verified.height, - checkpoint_verified.hash, FinalizedBlock::from_checkpoint_verified(checkpoint_verified, treestate), Some(prev_note_commitment_trees), ) @@ -397,15 +395,38 @@ impl FinalizedState { contextually_verified, treestate, } => ( - contextually_verified.height, - contextually_verified.hash, FinalizedBlock::from_contextually_verified(contextually_verified, treestate), prev_note_commitment_trees, ), }; + self.commit_finalized_direct_internal(finalized, prev_note_commitment_trees, None, source) + } + + /// Immediately commit a `finalized` block to the finalized state. + /// + /// This can be called either by the non-finalized state (when finalizing + /// a block) or by the checkpoint verifier. + /// + /// Use `source` as the source of the block in log messages. + /// + /// # Errors + /// + /// - Propagates any errors from writing to the DB + /// - Propagates any errors from updating history and note commitment trees + /// - If `hashFinalSaplingRoot` / `hashLightClientRoot` / `hashBlockCommitments` + /// does not match the expected value + #[allow(clippy::unwrap_in_result)] + pub fn commit_finalized_direct_internal( + &mut self, + finalized: FinalizedBlock, + prev_note_commitment_trees: Option, + spent_utxos: Option>, + source: &str, + ) -> Result<(block::Hash, NoteCommitmentTrees), CommitCheckpointVerifiedError> { let committed_tip_hash = self.db.finalized_tip_hash(); let committed_tip_height = self.db.finalized_tip_height(); + let &FinalizedBlock { height, hash, .. } = &finalized; // Assert that callers (including unit tests) get the chain order correct if self.db.is_empty() { @@ -437,7 +458,8 @@ impl FinalizedState { // Phase 2: all UTXOs are finalized, use empty NFS for fallback. let empty_nfs = NonFinalizedState::new(&self.network()); - let spent_utxos = self.db.lookup_spent_utxos(&finalized, &empty_nfs); + let spent_utxos = + spent_utxos.unwrap_or_else(|| self.db.lookup_spent_utxos(&finalized, &empty_nfs)); let result = self.db.write_block( finalized, diff --git a/zebra-state/src/service/write.rs b/zebra-state/src/service/write.rs index 49c5f4f8438..8f2bc71e154 100644 --- a/zebra-state/src/service/write.rs +++ b/zebra-state/src/service/write.rs @@ -311,25 +311,24 @@ impl WriteBlockWorkerTask { }); // Thread 3: prepare batch and write to disk - let mut db3 = finalized_state.db.clone(); - let network = non_finalized_state.network.clone(); + let mut finalized_state3 = finalized_state.clone(); s.spawn(move || { let mut prev_note_commitment_trees: Option = None; while let Ok((finalized, spent_utxos)) = write_rx.recv() { let note_commitment_trees = finalized.treestate.note_commitment_trees.clone(); let height = finalized.height; - db3.write_block( - finalized, - spent_utxos, - prev_note_commitment_trees.take(), - &network, - "commit checkpoint-verified pipeline", - ) - .expect( - "unexpected disk write error: \ + finalized_state3 + .commit_finalized_direct_internal( + finalized, + prev_note_commitment_trees.take(), + Some(spent_utxos), + "commit checkpoint-verified pipeline", + ) + .expect( + "unexpected disk write error: \ block has already been validated by the non-finalized state", - ); + ); prev_note_commitment_trees = Some(note_commitment_trees); diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index 7c5e6753c01..70aa17f730e 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -26,7 +26,6 @@ use zebra_state::{self as zs}; use zebra_chain::{ block::{self, Block}, - serialization::ZcashSerialize, transaction::UnminedTxId, }; use zebra_consensus::{router::RouterError, VerifyBlockError}; From bfbc6c5d7f1c45456d1194be83ab70b0a32f1021 Mon Sep 17 00:00:00 2001 From: arya2 Date: Sat, 4 Apr 2026 00:06:22 -0400 Subject: [PATCH 5/9] feat(copy_state): enhance commit loop to ensure write pipeline completion before final tip check --- zebrad/src/commands/copy_state.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/zebrad/src/commands/copy_state.rs b/zebrad/src/commands/copy_state.rs index 8cf0e2874d0..bf159edca01 100644 --- a/zebrad/src/commands/copy_state.rs +++ b/zebrad/src/commands/copy_state.rs @@ -236,9 +236,11 @@ impl CopyStateCmd { } }); - // Commit loop: receives blocks from the reader and spawns tasks that - // commit each block to the target state. The spawned tasks run - // concurrently so many blocks can be in flight at once. + // Commit loop: receives blocks from the reader and commits each block + // to the target state. Responses are collected into a JoinSet so they + // can be awaited after all blocks have been sent, ensuring the write + // pipeline finishes before we check the final tip. + let mut commit_tasks = tokio::task::JoinSet::new(); let mut commit_count: u32 = 0; while let Some((height, source_block, raw_txs)) = block_rx.recv().await { let checkpoint_block = new_zs::CheckpointVerifiedBlock::from(source_block) @@ -251,7 +253,7 @@ impl CopyStateCmd { checkpoint_block, )); - tokio::spawn(async move { + commit_tasks.spawn(async move { match rsp.await { Ok(new_zs::Response::Committed(_hash)) => {} other => warn!("block commit failed at height {height}: {other:?}"), @@ -274,6 +276,12 @@ impl CopyStateCmd { .await .expect("source reader task should not panic"); + // Wait for all in-flight commit responses so the write pipeline + // finishes before we check the final tip. + while let Some(result) = commit_tasks.join_next().await { + result.expect("commit task should not panic"); + } + let elapsed = copy_start_time.elapsed(); info!(?max_copy_height, ?elapsed, "finished copying blocks"); From ab0f5630fc7dfb087d4a8de4ea2fe1dfa9b1e10b Mon Sep 17 00:00:00 2001 From: arya2 Date: Sat, 4 Apr 2026 01:36:42 -0400 Subject: [PATCH 6/9] feat(copy_state): optimize block commit process using FuturesOrdered for concurrent execution --- zebrad/src/commands/copy_state.rs | 72 ++++++++++++------------------- zebrad/tests/common/sync_perf.rs | 14 +++--- 2 files changed, 36 insertions(+), 50 deletions(-) diff --git a/zebrad/src/commands/copy_state.rs b/zebrad/src/commands/copy_state.rs index bf159edca01..a0306b2e753 100644 --- a/zebrad/src/commands/copy_state.rs +++ b/zebrad/src/commands/copy_state.rs @@ -33,19 +33,21 @@ //! * fetches blocks from the best finalized chain from permanent storage, //! in the new format -use std::{cmp::min, path::PathBuf, sync::Arc}; +use std::{cmp::min, path::PathBuf}; use abscissa_core::{config, Command, FrameworkError}; use color_eyre::eyre::{eyre, Report}; +use futures::stream::FuturesOrdered; use tokio::time::Instant; +use tokio_stream::StreamExt; use tower::{Service, ServiceExt}; use zebra_chain::{ - block::{Block, Height}, + block::Height, parameters::Network, }; use zebra_state as new_zs; -use zebra_state::{self as old_zs, RawBytes}; +use zebra_state as old_zs; use crate::{ components::tokio::{RuntimeRun, TokioComponent}, @@ -211,38 +213,18 @@ impl CopyStateCmd { let copy_start_time = Instant::now(); - // Pipeline: read blocks from source in a blocking thread, send them - // through a channel, and commit them to the target state concurrently. - let (block_tx, mut block_rx) = - tokio::sync::mpsc::channel::<(u32, Arc, Vec)>(100); - - // Source reader: reads blocks sequentially from the source DB and sends - // them through the channel. Runs in a blocking thread because source DB - // reads are synchronous. Raw transaction bytes are sent alongside each - // block to avoid re-serializing them on write. - let source_db = source_db.clone(); - let reader_handle = tokio::task::spawn_blocking(move || { - for height in min_target_height..=max_copy_height { - let (source_block, raw_txs) = source_db - .block_and_raw_transactions(Height(height).into()) - .unwrap_or_else(|| panic!("missing source block at height {height}")); - - if block_tx - .blocking_send((height, source_block, raw_txs)) - .is_err() - { - break; - } - } - }); - - // Commit loop: receives blocks from the reader and commits each block - // to the target state. Responses are collected into a JoinSet so they - // can be awaited after all blocks have been sent, ensuring the write - // pipeline finishes before we check the final tip. - let mut commit_tasks = tokio::task::JoinSet::new(); + // Copy loop: read each block from the source DB and commit it to the + // target state. Commit responses are collected in a FuturesOrdered and + // drained via select! so commits complete in the background while the + // next block is read. + let mut commit_futures = FuturesOrdered::new(); let mut commit_count: u32 = 0; - while let Some((height, source_block, raw_txs)) = block_rx.recv().await { + + for height in min_target_height..=max_copy_height { + let (source_block, raw_txs) = source_db + .block_and_raw_transactions(Height(height).into()) + .unwrap_or_else(|| panic!("missing source block at height {height}")); + let checkpoint_block = new_zs::CheckpointVerifiedBlock::from(source_block) .with_cached_raw_transactions(raw_txs); let rsp = @@ -253,13 +235,22 @@ impl CopyStateCmd { checkpoint_block, )); - commit_tasks.spawn(async move { + commit_futures.push_back(async move { match rsp.await { Ok(new_zs::Response::Committed(_hash)) => {} other => warn!("block commit failed at height {height}: {other:?}"), } }); + // Drain any already-completed commit futures to bound memory. + loop { + tokio::select! { + biased; + Some(()) = commit_futures.next() => {} + else => break, + } + } + commit_count += 1; if commit_count.is_multiple_of(PROGRESS_HEIGHT_INTERVAL) { let elapsed = copy_start_time.elapsed(); @@ -272,15 +263,8 @@ impl CopyStateCmd { } } - reader_handle - .await - .expect("source reader task should not panic"); - - // Wait for all in-flight commit responses so the write pipeline - // finishes before we check the final tip. - while let Some(result) = commit_tasks.join_next().await { - result.expect("commit task should not panic"); - } + // Wait for all remaining in-flight commit responses. + while let Some(()) = commit_futures.next().await {} let elapsed = copy_start_time.elapsed(); info!(?max_copy_height, ?elapsed, "finished copying blocks"); diff --git a/zebrad/tests/common/sync_perf.rs b/zebrad/tests/common/sync_perf.rs index 9f4ffff464a..6fa44fc7ead 100644 --- a/zebrad/tests/common/sync_perf.rs +++ b/zebrad/tests/common/sync_perf.rs @@ -37,6 +37,8 @@ pub struct SampleRange { } /// Mainnet sample ranges covering each network upgrade era. +/// +/// 230k blocks total as of this writing. pub const MAINNET_SAMPLES: &[SampleRange] = &[ SampleRange { name: "Genesis", @@ -46,26 +48,26 @@ pub const MAINNET_SAMPLES: &[SampleRange] = &[ }, SampleRange { name: "Early", - start: 20_000, + start: 40_000, end: 50_000, description: "V1-V3 transparent-only", }, SampleRange { name: "Overwinter-Sapling", start: 395_000, - end: 445_000, + end: 420_000, description: "Overwinter (347,500) -> Sapling (419,200)", }, SampleRange { name: "Blossom", start: 630_000, - end: 680_000, + end: 670_000, description: "Blossom activation (653,600)", }, SampleRange { name: "Heartwood", start: 880_000, - end: 930_000, + end: 940_000, description: "Heartwood activation (903,000)", }, SampleRange { @@ -77,13 +79,13 @@ pub const MAINNET_SAMPLES: &[SampleRange] = &[ SampleRange { name: "Spam-attack", start: 1_820_000, - end: 1_850_000, + end: 1_840_000, description: "Dense transparent dust transactions", }, SampleRange { name: "Post-NU6.1", start: 3_100_000, - end: 3_150_000, + end: 3_125_000, description: "Post-NU6.1, current chain", }, ]; From 82a31c5b09c083e50e9878c9d7741a84ec3a18f8 Mon Sep 17 00:00:00 2001 From: arya2 Date: Sat, 4 Apr 2026 02:09:32 -0400 Subject: [PATCH 7/9] feat(copy_state): refactor block retrieval to use read-only state service for improved error handling --- zebra-state/src/request.rs | 13 +++++++++++++ zebra-state/src/response.rs | 10 +++++++++- zebra-state/src/service.rs | 7 +++++++ zebra-state/src/service/read.rs | 6 +++--- zebra-state/src/service/read/block.rs | 14 +++++++++++++- zebrad/src/commands/copy_state.rs | 13 +++++-------- 6 files changed, 50 insertions(+), 13 deletions(-) diff --git a/zebra-state/src/request.rs b/zebra-state/src/request.rs index 03679568661..0fbf1f49a65 100644 --- a/zebra-state/src/request.rs +++ b/zebra-state/src/request.rs @@ -1169,6 +1169,18 @@ pub enum ReadRequest { /// * [`ReadResponse::BlockAndSize(None)`](ReadResponse::BlockAndSize) otherwise. BlockAndSize(HashOrHeight), + /// Looks up a block by hash or height in the current best chain, + /// returning both the deserialized block and its raw transaction bytes. + /// + /// The raw bytes can be used to avoid re-serializing transactions when + /// writing them to another database (e.g., in the `copy-state` command). + /// + /// Returns + /// + /// * [`ReadResponse::BlockAndRawTransactions(Some(...))`](ReadResponse::BlockAndRawTransactions) if the block is in the best chain; + /// * [`ReadResponse::BlockAndRawTransactions(None)`](ReadResponse::BlockAndRawTransactions) otherwise. + BlockAndRawTransactions(HashOrHeight), + /// Looks up a block header by hash or height in the current best chain. /// /// Returns @@ -1448,6 +1460,7 @@ impl ReadRequest { ReadRequest::Block(_) => "block", ReadRequest::AnyChainBlock(_) => "any_chain_block", ReadRequest::BlockAndSize(_) => "block_and_size", + ReadRequest::BlockAndRawTransactions(_) => "block_and_raw_transactions", ReadRequest::BlockHeader(_) => "block_header", ReadRequest::Transaction(_) => "transaction", ReadRequest::AnyChainTransaction(_) => "any_chain_transaction", diff --git a/zebra-state/src/response.rs b/zebra-state/src/response.rs index 6ca5be13de3..32cf528850d 100644 --- a/zebra-state/src/response.rs +++ b/zebra-state/src/response.rs @@ -25,7 +25,10 @@ use zebra_chain::work::difficulty::CompactDifficulty; #[allow(unused_imports)] use crate::{ReadRequest, Request}; -use crate::{service::read::AddressUtxos, NonFinalizedState, TransactionLocation, WatchReceiver}; +use crate::{ + service::{finalized_state::RawBytes, read::AddressUtxos}, + NonFinalizedState, TransactionLocation, WatchReceiver, +}; #[derive(Clone, Debug, PartialEq, Eq)] /// A response to a [`StateService`](crate::service::StateService) [`Request`]. @@ -335,6 +338,10 @@ pub enum ReadResponse { /// serialized size. BlockAndSize(Option<(Arc, usize)>), + /// Response to [`ReadRequest::BlockAndRawTransactions`] with the specified + /// block and its raw serialized transaction bytes. + BlockAndRawTransactions(Option<(Arc, Vec)>), + /// The response to a `BlockHeader` request. BlockHeader { /// The header of the requested block @@ -548,6 +555,7 @@ impl TryFrom for Response { | ReadResponse::AddressUtxos(_) | ReadResponse::ChainInfo(_) | ReadResponse::NonFinalizedBlocksListener(_) + | ReadResponse::BlockAndRawTransactions(_) | ReadResponse::IsTransparentOutputSpent(_) => { Err("there is no corresponding Response for this ReadResponse") } diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index 3b4f8d53430..be28c159ca9 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -1399,6 +1399,13 @@ impl Service for ReadStateService { read::block_and_size(state.latest_best_chain(), &state.db, hash_or_height), )), + // Used by the copy-state command. + ReadRequest::BlockAndRawTransactions(hash_or_height) => { + Ok(ReadResponse::BlockAndRawTransactions( + read::block_and_raw_transactions(&state.db, hash_or_height), + )) + } + // Used by the get_block (verbose) RPC and the StateService. ReadRequest::BlockHeader(hash_or_height) => { let best_chain = state.latest_best_chain(); diff --git a/zebra-state/src/service/read.rs b/zebra-state/src/service/read.rs index c27aee1ee82..3d34fb248cc 100644 --- a/zebra-state/src/service/read.rs +++ b/zebra-state/src/service/read.rs @@ -29,9 +29,9 @@ pub use address::{ utxo::{address_utxos, AddressUtxos}, }; pub use block::{ - any_block, any_transaction, any_utxo, block, block_and_size, block_header, block_info, - mined_transaction, transaction_hashes_for_any_block, transaction_hashes_for_block, - unspent_utxo, + any_block, any_transaction, any_utxo, block, block_and_raw_transactions, block_and_size, + block_header, block_info, mined_transaction, transaction_hashes_for_any_block, + transaction_hashes_for_block, unspent_utxo, }; #[cfg(feature = "indexer")] diff --git a/zebra-state/src/service/read/block.rs b/zebra-state/src/service/read/block.rs index e292925e0db..993a33a17c4 100644 --- a/zebra-state/src/service/read/block.rs +++ b/zebra-state/src/service/read/block.rs @@ -27,7 +27,7 @@ use zebra_chain::{ use crate::{ response::{AnyTx, MinedTx}, service::{ - finalized_state::ZebraDb, + finalized_state::{RawBytes, ZebraDb}, non_finalized_state::{Chain, NonFinalizedState}, read::tip_height, }, @@ -89,6 +89,18 @@ where .or_else(|| db.block_and_size(hash_or_height)) } +/// Returns the [`Block`] and its raw serialized transaction bytes with +/// [`block::Hash`] or [`Height`], if it exists in the finalized `db`. +/// +/// For blocks in the non-finalized state, falls back to returning the block +/// without raw transactions (raw bytes are only available from disk). +pub fn block_and_raw_transactions( + db: &ZebraDb, + hash_or_height: HashOrHeight, +) -> Option<(Arc, Vec)> { + db.block_and_raw_transactions(hash_or_height) +} + /// Returns the [`block::Header`] with [`block::Hash`] or /// [`Height`], if it exists in the non-finalized `chain` or finalized `db`. pub fn block_header( diff --git a/zebrad/src/commands/copy_state.rs b/zebrad/src/commands/copy_state.rs index a0306b2e753..76c6fb66575 100644 --- a/zebrad/src/commands/copy_state.rs +++ b/zebrad/src/commands/copy_state.rs @@ -42,10 +42,7 @@ use tokio::time::Instant; use tokio_stream::StreamExt; use tower::{Service, ServiceExt}; -use zebra_chain::{ - block::Height, - parameters::Network, -}; +use zebra_chain::{block::Height, parameters::Network}; use zebra_state as new_zs; use zebra_state as old_zs; @@ -213,10 +210,10 @@ impl CopyStateCmd { let copy_start_time = Instant::now(); - // Copy loop: read each block from the source DB and commit it to the - // target state. Commit responses are collected in a FuturesOrdered and - // drained via select! so commits complete in the background while the - // next block is read. + // Copy loop: read each block from the source read state service and + // commit it to the target state. Commit responses are collected in a + // FuturesOrdered and drained via select! so commits complete in the + // background while the next block is read. let mut commit_futures = FuturesOrdered::new(); let mut commit_count: u32 = 0; From 7aed2a1a010ca7c23abb03516ec8050a3bdf3400 Mon Sep 17 00:00:00 2001 From: arya2 Date: Sat, 4 Apr 2026 04:04:38 -0400 Subject: [PATCH 8/9] feat(copy_state): update block retrieval to use read-only state service for improved error handling --- zebrad/src/commands/copy_state.rs | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/zebrad/src/commands/copy_state.rs b/zebrad/src/commands/copy_state.rs index 76c6fb66575..e5ba449d02a 100644 --- a/zebrad/src/commands/copy_state.rs +++ b/zebrad/src/commands/copy_state.rs @@ -125,7 +125,7 @@ impl CopyStateCmd { let source_start_time = Instant::now(); // We're not verifying UTXOs here, so we don't need the maximum checkpoint height. - let (mut source_read_only_state_service, source_db, _source_latest_non_finalized_state) = + let (mut source_read_only_state_service, _source_db, _source_latest_non_finalized_state) = old_zs::spawn_init_read_only(source_config.clone(), network).await?; let elapsed = source_start_time.elapsed(); @@ -218,9 +218,22 @@ impl CopyStateCmd { let mut commit_count: u32 = 0; for height in min_target_height..=max_copy_height { - let (source_block, raw_txs) = source_db - .block_and_raw_transactions(Height(height).into()) - .unwrap_or_else(|| panic!("missing source block at height {height}")); + let (source_block, raw_txs) = match source_read_only_state_service + .ready() + .await? + .call(old_zs::ReadRequest::BlockAndRawTransactions( + Height(height).into(), + )) + .await? + { + old_zs::ReadResponse::BlockAndRawTransactions(Some(block_and_txs)) => block_and_txs, + old_zs::ReadResponse::BlockAndRawTransactions(None) => { + panic!("missing source block at height {height}") + } + response => Err(format!( + "unexpected response to BlockAndRawTransactions request: {response:?}" + ))?, + }; let checkpoint_block = new_zs::CheckpointVerifiedBlock::from(source_block) .with_cached_raw_transactions(raw_txs); From 8bc6816ed8d8a991a4bd05979589467949b3b217 Mon Sep 17 00:00:00 2001 From: arya2 Date: Sat, 4 Apr 2026 05:06:20 -0400 Subject: [PATCH 9/9] feat(state): disable auto-compaction during checkpoint sync for faster writes --- .../src/service/finalized_state/disk_db.rs | 31 ++++++++++++++++--- .../src/service/finalized_state/zebra_db.rs | 6 ++++ zebra-state/src/service/write.rs | 12 +++++++ zebrad/src/commands/copy_state.rs | 4 --- 4 files changed, 45 insertions(+), 8 deletions(-) diff --git a/zebra-state/src/service/finalized_state/disk_db.rs b/zebra-state/src/service/finalized_state/disk_db.rs index 016fdd87971..4de83e4f550 100644 --- a/zebra-state/src/service/finalized_state/disk_db.rs +++ b/zebra-state/src/service/finalized_state/disk_db.rs @@ -97,6 +97,10 @@ pub struct DiskDb { /// applying any format changes that may have been required. finished_format_upgrades: Arc, + /// When true, the write-ahead log is skipped for write operations. + /// This is used during checkpoint sync for faster bulk writes. + disable_wal: Arc, + // Owned State // // Everything contained in this state must be shared by all clones, or read-only. @@ -1051,6 +1055,7 @@ impl DiskDb { ephemeral: config.ephemeral, db: Arc::new(db), finished_format_upgrades: Arc::new(AtomicBool::new(false)), + disable_wal: Arc::new(AtomicBool::new(false)), }; db.assert_default_cf_is_empty(); @@ -1118,9 +1123,26 @@ impl DiskDb { // Write methods // Low-level write methods are located in the WriteDisk trait + /// Enables or disables RocksDB auto-compaction at runtime. + pub fn set_auto_compaction(&self, enabled: bool) { + let value = if enabled { "false" } else { "true" }; + self.db + .set_options(&[("disable_auto_compactions", value)]) + .expect("setting disable_auto_compactions should succeed"); + } + /// Writes `batch` to the database. + /// + /// When bulk writes are enabled, the write-ahead log is skipped + /// for faster throughput. pub(crate) fn write(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> { - self.db.write(batch.batch) + if self.disable_wal.load(atomic::Ordering::Relaxed) { + let mut write_opts = rocksdb::WriteOptions::default(); + write_opts.disable_wal(true); + self.db.write_opt(batch.batch, &write_opts) + } else { + self.db.write(batch.batch) + } } // Private methods @@ -1268,6 +1290,7 @@ impl DiskDb { /// Returns the database options for the finalized state database. fn options() -> rocksdb::Options { let mut opts = rocksdb::Options::default(); + let mut block_based_opts = rocksdb::BlockBasedOptions::default(); const ONE_MEGABYTE: usize = 1024 * 1024; @@ -1297,13 +1320,13 @@ impl DiskDb { // accumulate in memory before flushing to L0. The default 64 MB is // conservative; 256 MB per buffer with 4 buffers gives the flush // thread headroom to keep up with sustained block commits. - opts.set_write_buffer_size(256 * ONE_MEGABYTE); - opts.set_max_write_buffer_number(4); + opts.set_write_buffer_size(512 * ONE_MEGABYTE); + opts.set_max_write_buffer_number(2); // Allow multiple background threads for flush and compaction. // The default (1 flush + 1 compaction) can't keep up during sync. // RocksDB splits these across flush and compaction automatically. - opts.set_max_background_jobs(8); + opts.set_max_background_jobs(2); // Allow multiple memtables to be written to concurrently. // This is safe with the default skiplist memtable and reduces diff --git a/zebra-state/src/service/finalized_state/zebra_db.rs b/zebra-state/src/service/finalized_state/zebra_db.rs index e153b71f0f3..2c3c5e880cf 100644 --- a/zebra-state/src/service/finalized_state/zebra_db.rs +++ b/zebra-state/src/service/finalized_state/zebra_db.rs @@ -259,6 +259,12 @@ impl ZebraDb { } } + /// Enables or disables RocksDB auto-compaction. + /// See [`DiskDb::set_auto_compaction`] for details. + pub fn set_auto_compaction(&self, enabled: bool) { + self.db.set_auto_compaction(enabled); + } + /// When called with a secondary DB instance, tries to catch up with the primary DB instance pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> { self.db.try_catch_up_with_primary() diff --git a/zebra-state/src/service/write.rs b/zebra-state/src/service/write.rs index 8f2bc71e154..b43922ee91b 100644 --- a/zebra-state/src/service/write.rs +++ b/zebra-state/src/service/write.rs @@ -283,6 +283,12 @@ impl WriteBlockWorkerTask { let mut last_zebra_mined_log_height = None; let mut prev_finalized_note_commitment_trees = None; + // Disable auto-compaction during checkpoint sync for faster writes. + // WAL stays enabled for crash safety during network sync. + finalized_state + .db + .set_auto_compaction(false); + // Pipeline: commit checkpoint-verified blocks to the non-finalized state (Thread 1), // look up spent UTXOs/output-locations (Thread 2), then prepare batch and write to // disk (Thread 3). This avoids blocking Thread 1 on any disk I/O. @@ -470,6 +476,12 @@ impl WriteBlockWorkerTask { // Scoped threads are automatically joined when the scope exits. }); + // Checkpoint sync is complete. Re-enable auto-compaction + // for normal operation during full verification. + finalized_state + .db + .set_auto_compaction(true); + // All checkpoint blocks have been written to disk by the pipeline. // Remove them from the non-finalized state so Phase 2 starts clean. while non_finalized_state.best_chain_len().unwrap_or(0) > 0 { diff --git a/zebrad/src/commands/copy_state.rs b/zebrad/src/commands/copy_state.rs index e5ba449d02a..e937b864707 100644 --- a/zebrad/src/commands/copy_state.rs +++ b/zebrad/src/commands/copy_state.rs @@ -138,10 +138,6 @@ impl CopyStateCmd { let target_start_time = Instant::now(); // We're not verifying UTXOs here, so we don't need the maximum checkpoint height. - // - // TODO: call Options::PrepareForBulkLoad() - // See "What's the fastest way to load data into RocksDB?" in - // https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ let ( mut target_state, _target_read_only_state_service,