Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions zebra-consensus/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ where
new_outputs,
transaction_hashes,
deferred_pool_balance_change: Some(deferred_pool_balance_change),
cached_raw_transactions: None,
};

// Return early for proposal requests.
Expand Down
1 change: 1 addition & 0 deletions zebra-state/src/arbitrary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl ContextuallyVerifiedBlock {
new_outputs,
transaction_hashes,
deferred_pool_balance_change: _,
cached_raw_transactions: _,
} = block.into();

Self {
Expand Down
4 changes: 2 additions & 2 deletions zebra-state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ pub use service::{
pub use service::finalized_state::{ReadDisk, TypedColumnFamily, WriteTypedBatch};

pub use service::{
finalized_state::{DiskWriteBatch, FromDisk, IntoDisk, WriteDisk, ZebraDb},
finalized_state::{DiskWriteBatch, FromDisk, IntoDisk, RawBytes, WriteDisk, ZebraDb},
ReadStateService,
};

// Allow use in external tests
#[cfg(any(test, feature = "proptest-impl"))]
pub use service::{
arbitrary::{populated_state, CHAIN_TIP_UPDATE_WAIT_LIMIT},
finalized_state::{RawBytes, KV, MAX_ON_DISK_HEIGHT},
finalized_state::{KV, MAX_ON_DISK_HEIGHT},
init_test, init_test_services,
};

Expand Down
37 changes: 37 additions & 0 deletions zebra-state/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::{
};
use crate::{
error::{CommitCheckpointVerifiedError, InvalidateError, LayeredStateError, ReconsiderError},
service::finalized_state::RawBytes,
CommitSemanticallyVerifiedError,
};

Expand Down Expand Up @@ -260,6 +261,12 @@ pub struct SemanticallyVerifiedBlock {
pub transaction_hashes: Arc<[transaction::Hash]>,
/// This block's deferred pool value balance change.
pub deferred_pool_balance_change: Option<DeferredPoolBalanceChange>,
/// Cached raw serialized transaction bytes from the source database.
///
/// When set, these bytes are written directly to the target database
/// without re-serializing the transactions. Used by the `copy-state`
/// command to avoid redundant serialization.
pub cached_raw_transactions: Option<Vec<RawBytes>>,
}

/// A block ready to be committed directly to the finalized state with
Expand Down Expand Up @@ -392,6 +399,8 @@ pub struct FinalizedBlock {
pub(super) treestate: Treestate,
/// This block's deferred pool value balance change.
pub(super) deferred_pool_balance_change: Option<DeferredPoolBalanceChange>,
/// Cached raw serialized transaction bytes from the source database.
pub(super) cached_raw_transactions: Option<Vec<RawBytes>>,
}

impl FinalizedBlock {
Expand All @@ -418,6 +427,7 @@ impl FinalizedBlock {
transaction_hashes: block.transaction_hashes,
treestate,
deferred_pool_balance_change: block.deferred_pool_balance_change,
cached_raw_transactions: block.cached_raw_transactions,
}
}
}
Expand Down Expand Up @@ -491,6 +501,7 @@ impl ContextuallyVerifiedBlock {
new_outputs,
transaction_hashes,
deferred_pool_balance_change,
cached_raw_transactions: _,
} = semantically_verified;

// This is redundant for the non-finalized state,
Expand Down Expand Up @@ -534,6 +545,15 @@ impl CheckpointVerifiedBlock {
pub fn with_hash(block: Arc<Block>, hash: block::Hash) -> Self {
Self(SemanticallyVerifiedBlock::with_hash(block, hash))
}

/// Attaches cached raw serialized transaction bytes to this block.
///
/// When set, these bytes are written directly to the target database
/// without re-serializing the transactions.
pub fn with_cached_raw_transactions(mut self, raw_txs: Vec<RawBytes>) -> Self {
self.0.cached_raw_transactions = Some(raw_txs);
self
}
}

impl SemanticallyVerifiedBlock {
Expand All @@ -552,6 +572,7 @@ impl SemanticallyVerifiedBlock {
new_outputs,
transaction_hashes,
deferred_pool_balance_change: None,
cached_raw_transactions: None,
}
}

Expand Down Expand Up @@ -587,6 +608,7 @@ impl From<Arc<Block>> for SemanticallyVerifiedBlock {
new_outputs,
transaction_hashes,
deferred_pool_balance_change: None,
cached_raw_transactions: None,
}
}
}
Expand All @@ -602,6 +624,7 @@ impl From<ContextuallyVerifiedBlock> for SemanticallyVerifiedBlock {
deferred_pool_balance_change: Some(DeferredPoolBalanceChange::new(
valid.chain_value_pool_change.deferred_amount(),
)),
cached_raw_transactions: None,
}
}
}
Expand All @@ -615,6 +638,7 @@ impl From<FinalizedBlock> for SemanticallyVerifiedBlock {
new_outputs: finalized.new_outputs,
transaction_hashes: finalized.transaction_hashes,
deferred_pool_balance_change: finalized.deferred_pool_balance_change,
cached_raw_transactions: finalized.cached_raw_transactions,
}
}
}
Expand Down Expand Up @@ -1145,6 +1169,18 @@ pub enum ReadRequest {
/// * [`ReadResponse::BlockAndSize(None)`](ReadResponse::BlockAndSize) otherwise.
BlockAndSize(HashOrHeight),

/// Looks up a block by hash or height in the current best chain,
/// returning both the deserialized block and its raw transaction bytes.
///
/// The raw bytes can be used to avoid re-serializing transactions when
/// writing them to another database (e.g., in the `copy-state` command).
///
/// Returns
///
/// * [`ReadResponse::BlockAndRawTransactions(Some(...))`](ReadResponse::BlockAndRawTransactions) if the block is in the best chain;
/// * [`ReadResponse::BlockAndRawTransactions(None)`](ReadResponse::BlockAndRawTransactions) otherwise.
BlockAndRawTransactions(HashOrHeight),

/// Looks up a block header by hash or height in the current best chain.
///
/// Returns
Expand Down Expand Up @@ -1424,6 +1460,7 @@ impl ReadRequest {
ReadRequest::Block(_) => "block",
ReadRequest::AnyChainBlock(_) => "any_chain_block",
ReadRequest::BlockAndSize(_) => "block_and_size",
ReadRequest::BlockAndRawTransactions(_) => "block_and_raw_transactions",
ReadRequest::BlockHeader(_) => "block_header",
ReadRequest::Transaction(_) => "transaction",
ReadRequest::AnyChainTransaction(_) => "any_chain_transaction",
Expand Down
10 changes: 9 additions & 1 deletion zebra-state/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use zebra_chain::work::difficulty::CompactDifficulty;
#[allow(unused_imports)]
use crate::{ReadRequest, Request};

use crate::{service::read::AddressUtxos, NonFinalizedState, TransactionLocation, WatchReceiver};
use crate::{
service::{finalized_state::RawBytes, read::AddressUtxos},
NonFinalizedState, TransactionLocation, WatchReceiver,
};

#[derive(Clone, Debug, PartialEq, Eq)]
/// A response to a [`StateService`](crate::service::StateService) [`Request`].
Expand Down Expand Up @@ -335,6 +338,10 @@ pub enum ReadResponse {
/// serialized size.
BlockAndSize(Option<(Arc<Block>, usize)>),

/// Response to [`ReadRequest::BlockAndRawTransactions`] with the specified
/// block and its raw serialized transaction bytes.
BlockAndRawTransactions(Option<(Arc<Block>, Vec<RawBytes>)>),

/// The response to a `BlockHeader` request.
BlockHeader {
/// The header of the requested block
Expand Down Expand Up @@ -548,6 +555,7 @@ impl TryFrom<ReadResponse> for Response {
| ReadResponse::AddressUtxos(_)
| ReadResponse::ChainInfo(_)
| ReadResponse::NonFinalizedBlocksListener(_)
| ReadResponse::BlockAndRawTransactions(_)
| ReadResponse::IsTransparentOutputSpent(_) => {
Err("there is no corresponding Response for this ReadResponse")
}
Expand Down
7 changes: 7 additions & 0 deletions zebra-state/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1399,6 +1399,13 @@ impl Service<ReadRequest> for ReadStateService {
read::block_and_size(state.latest_best_chain(), &state.db, hash_or_height),
)),

// Used by the copy-state command.
ReadRequest::BlockAndRawTransactions(hash_or_height) => {
Ok(ReadResponse::BlockAndRawTransactions(
read::block_and_raw_transactions(&state.db, hash_or_height),
))
}

// Used by the get_block (verbose) RPC and the StateService.
ReadRequest::BlockHeader(hash_or_height) => {
let best_chain = state.latest_best_chain();
Expand Down
1 change: 1 addition & 0 deletions zebra-state/src/service/chain_tip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl From<SemanticallyVerifiedBlock> for ChainTipBlock {
new_outputs: _,
transaction_hashes,
deferred_pool_balance_change: _,
cached_raw_transactions: _,
} = prepared;

Self {
Expand Down
36 changes: 29 additions & 7 deletions zebra-state/src/service/finalized_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{
sync::Arc,
};

use zebra_chain::{block, parallel::tree::NoteCommitmentTrees, parameters::Network};
use zebra_chain::{block, parallel::tree::NoteCommitmentTrees, parameters::Network, transparent};
use zebra_db::{
chain::BLOCK_INFO,
transparent::{BALANCE_BY_TRANSPARENT_ADDR, TX_LOC_BY_SPENT_OUT_LOC},
Expand Down Expand Up @@ -325,7 +325,7 @@ impl FinalizedState {
prev_note_commitment_trees: Option<NoteCommitmentTrees>,
source: &str,
) -> Result<(block::Hash, NoteCommitmentTrees), CommitCheckpointVerifiedError> {
let (height, hash, finalized, prev_note_commitment_trees) = match finalizable_block {
let (finalized, prev_note_commitment_trees) = match finalizable_block {
FinalizableBlock::Checkpoint {
checkpoint_verified,
} => {
Expand Down Expand Up @@ -387,8 +387,6 @@ impl FinalizedState {
};

(
checkpoint_verified.height,
checkpoint_verified.hash,
FinalizedBlock::from_checkpoint_verified(checkpoint_verified, treestate),
Some(prev_note_commitment_trees),
)
Expand All @@ -397,15 +395,38 @@ impl FinalizedState {
contextually_verified,
treestate,
} => (
contextually_verified.height,
contextually_verified.hash,
FinalizedBlock::from_contextually_verified(contextually_verified, treestate),
prev_note_commitment_trees,
),
};

self.commit_finalized_direct_internal(finalized, prev_note_commitment_trees, None, source)
}

/// Immediately commit a `finalized` block to the finalized state.
///
/// This can be called either by the non-finalized state (when finalizing
/// a block) or by the checkpoint verifier.
///
/// Use `source` as the source of the block in log messages.
///
/// # Errors
///
/// - Propagates any errors from writing to the DB
/// - Propagates any errors from updating history and note commitment trees
/// - If `hashFinalSaplingRoot` / `hashLightClientRoot` / `hashBlockCommitments`
/// does not match the expected value
#[allow(clippy::unwrap_in_result)]
pub fn commit_finalized_direct_internal(
&mut self,
finalized: FinalizedBlock,
prev_note_commitment_trees: Option<NoteCommitmentTrees>,
spent_utxos: Option<Vec<(transparent::OutPoint, OutputLocation, transparent::Utxo)>>,
source: &str,
) -> Result<(block::Hash, NoteCommitmentTrees), CommitCheckpointVerifiedError> {
let committed_tip_hash = self.db.finalized_tip_hash();
let committed_tip_height = self.db.finalized_tip_height();
let &FinalizedBlock { height, hash, .. } = &finalized;

// Assert that callers (including unit tests) get the chain order correct
if self.db.is_empty() {
Expand Down Expand Up @@ -437,7 +458,8 @@ impl FinalizedState {

// Phase 2: all UTXOs are finalized, use empty NFS for fallback.
let empty_nfs = NonFinalizedState::new(&self.network());
let spent_utxos = self.db.lookup_spent_utxos(&finalized, &empty_nfs);
let spent_utxos =
spent_utxos.unwrap_or_else(|| self.db.lookup_spent_utxos(&finalized, &empty_nfs));

let result = self.db.write_block(
finalized,
Expand Down
31 changes: 27 additions & 4 deletions zebra-state/src/service/finalized_state/disk_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ pub struct DiskDb {
/// applying any format changes that may have been required.
finished_format_upgrades: Arc<AtomicBool>,

/// When true, the write-ahead log is skipped for write operations.
/// This is used during checkpoint sync for faster bulk writes.
disable_wal: Arc<AtomicBool>,

// Owned State
//
// Everything contained in this state must be shared by all clones, or read-only.
Expand Down Expand Up @@ -1051,6 +1055,7 @@ impl DiskDb {
ephemeral: config.ephemeral,
db: Arc::new(db),
finished_format_upgrades: Arc::new(AtomicBool::new(false)),
disable_wal: Arc::new(AtomicBool::new(false)),
};

db.assert_default_cf_is_empty();
Expand Down Expand Up @@ -1118,9 +1123,26 @@ impl DiskDb {
// Write methods
// Low-level write methods are located in the WriteDisk trait

/// Enables or disables RocksDB auto-compaction at runtime.
pub fn set_auto_compaction(&self, enabled: bool) {
let value = if enabled { "false" } else { "true" };
self.db
.set_options(&[("disable_auto_compactions", value)])
.expect("setting disable_auto_compactions should succeed");
}

/// Writes `batch` to the database.
///
/// When bulk writes are enabled, the write-ahead log is skipped
/// for faster throughput.
pub(crate) fn write(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> {
self.db.write(batch.batch)
if self.disable_wal.load(atomic::Ordering::Relaxed) {
let mut write_opts = rocksdb::WriteOptions::default();
write_opts.disable_wal(true);
self.db.write_opt(batch.batch, &write_opts)
} else {
self.db.write(batch.batch)
}
}

// Private methods
Expand Down Expand Up @@ -1268,6 +1290,7 @@ impl DiskDb {
/// Returns the database options for the finalized state database.
fn options() -> rocksdb::Options {
let mut opts = rocksdb::Options::default();

let mut block_based_opts = rocksdb::BlockBasedOptions::default();

const ONE_MEGABYTE: usize = 1024 * 1024;
Expand Down Expand Up @@ -1297,13 +1320,13 @@ impl DiskDb {
// accumulate in memory before flushing to L0. The default 64 MB is
// conservative; 256 MB per buffer with 4 buffers gives the flush
// thread headroom to keep up with sustained block commits.
opts.set_write_buffer_size(256 * ONE_MEGABYTE);
opts.set_max_write_buffer_number(4);
opts.set_write_buffer_size(512 * ONE_MEGABYTE);
opts.set_max_write_buffer_number(2);

// Allow multiple background threads for flush and compaction.
// The default (1 flush + 1 compaction) can't keep up during sync.
// RocksDB splits these across flush and compaction automatically.
opts.set_max_background_jobs(8);
opts.set_max_background_jobs(2);

// Allow multiple memtables to be written to concurrently.
// This is safe with the default skiplist memtable and reduces
Expand Down
6 changes: 6 additions & 0 deletions zebra-state/src/service/finalized_state/zebra_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ impl ZebraDb {
}
}

/// Enables or disables RocksDB auto-compaction.
/// See [`DiskDb::set_auto_compaction`] for details.
pub fn set_auto_compaction(&self, enabled: bool) {
self.db.set_auto_compaction(enabled);
}

/// When called with a secondary DB instance, tries to catch up with the primary DB instance
pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> {
self.db.try_catch_up_with_primary()
Expand Down
Loading
Loading