Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions anchor/common/ssv_types/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,12 @@ pub struct Operator {

impl Operator {
/// Creates a new operator from its OperatorId and PEM-encoded public key string
pub fn new(pem_data: &[u8], operator_id: OperatorId, owner: Address) -> Result<Self, String> {
let rsa_pubkey =
operator_key::public::from_base64(pem_data).map_err(|err| err.to_string())?;
pub fn new(
pem_data: &[u8],
operator_id: OperatorId,
owner: Address,
) -> Result<Self, operator_key::ConversionError> {
let rsa_pubkey = operator_key::public::from_base64(pem_data)?;
Ok(Self::new_with_pubkey(rsa_pubkey, operator_id, owner))
}

Expand Down
1 change: 1 addition & 0 deletions anchor/eth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ slot_clock = { workspace = true }
ssv_network_config = { workspace = true }
ssv_types = { workspace = true }
task_executor = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tower = "0.5.2"
tracing = { workspace = true }
Expand Down
43 changes: 35 additions & 8 deletions anchor/eth/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,47 @@
use std::fmt::Display;
use thiserror::Error;

// Custom execution integration layer errors
#[derive(Debug)]
/// Errors from the execution-layer integration.
#[derive(Debug, Error)]
pub enum ExecutionError {
#[error("Sync error: {0}")]
SyncError(String),
#[error("Invalid event: {0}")]
InvalidEvent(String),
#[error("RPC error: {0}")]
RpcError(String),
#[error("WebSocket error: {0}")]
WsError(String),
DecodeError(String),
Misc(String),
#[error("Decode error: {0}")]
DecodeError(#[from] alloy::sol_types::Error),
#[error("Skipped event: {0}")]
SkippedEvent(String),
#[error("Duplicate: {0}")]
Duplicate(String),
#[error("Database error: {0}")]
Database(String),
#[error("Exit processor unavailable: {0}")]
ExitProcessorUnavailable(String),
}

impl Display for ExecutionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LogErrorDisposition {
SkipMalformed,
SkipExpected,
AbortBatch,
}

impl ExecutionError {
pub fn log_disposition(&self) -> LogErrorDisposition {
match self {
Self::InvalidEvent(_) | Self::DecodeError(_) | Self::Duplicate(_) => {
LogErrorDisposition::SkipMalformed
}
Self::SkippedEvent(_) => LogErrorDisposition::SkipExpected,
Self::SyncError(_)
| Self::RpcError(_)
| Self::WsError(_)
| Self::Database(_)
| Self::ExitProcessorUnavailable(_) => LogErrorDisposition::AbortBatch,
}
}
}
7 changes: 1 addition & 6 deletions anchor/eth/src/event_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@ macro_rules! impl_event_decoder {
type Output = $event_type;

fn decode_from_log(log: &Log) -> Result<Self::Output, ExecutionError> {
let decoded = Self::decode_log(&log.inner)
.map_err(|e| {
ExecutionError::DecodeError(
format!("Failed to decode {} event: {}", stringify!($event_type), e)
)
})?;
let decoded = Self::decode_log(&log.inner)?;
Ok(decoded.data)
}
}
Expand Down
123 changes: 35 additions & 88 deletions anchor/eth/src/event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ssv_types::{Cluster, ClusterId, Operator, OperatorId, ValidatorIndex};
use tracing::{debug, error, info, instrument, trace, warn};

use crate::{
error::ExecutionError,
error::{ExecutionError, LogErrorDisposition},
event_parser::EventDecoder,
generated::SSVContract,
index_sync, metrics,
Expand Down Expand Up @@ -134,10 +134,26 @@ impl EventProcessor {
.transaction_hash
.map(|hash| hash.to_string())
.unwrap_or_else(|| "unknown".to_string());
if live {
warn!(tx_hash, "Malformed event: {e}");
} else {
trace!(tx_hash, "Malformed event: {e}");
match e.log_disposition() {
LogErrorDisposition::SkipMalformed => {
if live {
warn!(tx_hash, error = %e, "Malformed event");
} else {
trace!(tx_hash, error = %e, "Malformed event");
}
}
LogErrorDisposition::SkipExpected => {
if live {
debug!(tx_hash, error = %e, "Skipping event");
} else {
trace!(tx_hash, error = %e, "Skipping event");
}
}
LogErrorDisposition::AbortBatch => {
metrics::stop_timer(timer);
error!(tx_hash, error = %e, "Event processing failed");
return Err(e);
}
}
continue;
}
Expand Down Expand Up @@ -220,20 +236,9 @@ impl EventProcessor {

// Construct the Operator and insert it into the database
let operator = Operator::new(data, operator_id, owner).map_err(|e| {
debug!(
operator_pubkey = ?publicKey,
operator_id = ?operator_id,
error = %e,
"Failed to construct operator"
);
ExecutionError::InvalidEvent(format!("Failed to construct operator: {e}"))
})?;
self.db.insert_operator(&operator, tx).map_err(|e| {
debug!(
operator_id = ?operator_id,
error = %e,
"Failed to insert operator into database"
);
ExecutionError::Database(format!("Failed to insert operator into database: {e}"))
})?;

Expand All @@ -259,14 +264,9 @@ impl EventProcessor {
trace!(operator_id = ?operator_id, "Processing operator removed");

// Delete the operator from database and in memory
self.db.delete_operator(operator_id, tx).map_err(|e| {
debug!(
operator_id = ?operator_id,
error = %e,
"Failed to remove operator"
);
ExecutionError::Database(format!("Failed to remove operator: {e}"))
})?;
self.db
.delete_operator(operator_id, tx)
.map_err(|e| ExecutionError::Database(format!("Failed to remove operator: {e}")))?;

debug!(operator_id = ?operatorId, "Operator removed from network");
metrics::inc_counter_vec(&metrics::EXECUTION_EVENTS_PROCESSED, &["operator_removed"]);
Expand Down Expand Up @@ -294,10 +294,10 @@ impl EventProcessor {

// Get the expected nonce and then increment it. This will happen regardless of if the
// event is malformed or not
let nonce = self.db.bump_and_get_nonce(&owner, tx).map_err(|e| {
debug!(owner = ?owner, "Failed to bump nonce");
ExecutionError::Database(format!("Failed to bump nonce: {e}"))
})?;
let nonce = self
.db
.bump_and_get_nonce(&owner, tx)
.map_err(|e| ExecutionError::Database(format!("Failed to bump nonce: {e}")))?;

// During keysplitting, we only care about the nonce
let Mode::Node {
Expand All @@ -322,12 +322,10 @@ impl EventProcessor {
trace!(cluster_id = ?cluster_id, "Parsing and verifying shares");
let (signature, shares) =
parse_shares(&shares, &operator_ids, &cluster_id, &validator_pubkey).map_err(|e| {
debug!(cluster_id = ?cluster_id, error = %e, "Failed to parse shares");
ExecutionError::InvalidEvent(format!("Failed to parse shares. {e}"))
})?;

if !verify_signature(signature, nonce, &owner, &validator_pubkey) {
debug!(cluster_id = ?cluster_id, "Signature verification failed");
return Err(ExecutionError::InvalidEvent(
"Signature verification failed".to_string(),
));
Expand All @@ -336,7 +334,6 @@ impl EventProcessor {
// Fetch the validator metadata
let validator_metadata = construct_validator_metadata(&validator_pubkey, &cluster_id)
.map_err(|e| {
debug!(validator_pubkey= ?validator_pubkey, "Failed to fetch validator metadata");
ExecutionError::Database(format!("Failed to fetch validator metadata: {e}"))
})?;

Expand Down Expand Up @@ -368,7 +365,6 @@ impl EventProcessor {
self.db
.insert_validator(cluster, &validator_metadata, shares, tx)
.map_err(|e| {
debug!(cluster_id = ?cluster_id, error = %e, validator_metadata = ?validator_metadata.public_key, "Failed to insert validator into cluster");
ExecutionError::Database(format!("Failed to insert validator into cluster: {e}"))
})?;

Expand Down Expand Up @@ -412,10 +408,6 @@ impl EventProcessor {
let metadata = match state.metadata().get_by(&validator_pubkey) {
Some(data) => data,
None => {
debug!(
cluster_id = ?cluster_id,
"Failed to fetch validator metadata from database"
);
return Err(ExecutionError::Database(
"Failed to fetch validator metadata from database".to_string(),
));
Expand All @@ -426,10 +418,6 @@ impl EventProcessor {
let cluster = match state.clusters().get_by(&validator_pubkey) {
Some(data) => data,
None => {
debug!(
cluster_id = ?cluster_id,
"Failed to fetch cluster from database"
);
return Err(ExecutionError::Database(
"Failed to fetch cluster from database".to_string(),
));
Expand All @@ -438,12 +426,6 @@ impl EventProcessor {

// Make sure the right owner is removing this validator
if owner != cluster.owner {
debug!(
cluster_id = ?cluster_id,
expected_owner = ?cluster.owner,
actual_owner = ?owner,
"Owner mismatch for validator removal"
);
return Err(ExecutionError::InvalidEvent(format!(
"Cluster already exists with a different owner address. Expected {}. Got {}",
cluster.owner, owner
Expand All @@ -452,12 +434,6 @@ impl EventProcessor {

// Make sure this is the correct validator
if validator_pubkey != metadata.public_key {
debug!(
cluster_id = ?cluster_id,
expected_pubkey = %metadata.public_key,
actual_pubkey = %validator_pubkey,
"Validator pubkey mismatch"
);
return Err(ExecutionError::InvalidEvent(
"Validator does not match".to_string(),
));
Expand All @@ -467,15 +443,7 @@ impl EventProcessor {
// Remove the validator and all corresponding cluster data
self.db
.delete_validator(&validator_pubkey, tx)
.map_err(|e| {
debug!(
cluster_id = ?cluster_id,
pubkey = ?validator_pubkey,
error = %e,
"Failed to delete validator from database"
);
ExecutionError::Database(format!("Failed to validator cluster: {e}"))
})?;
.map_err(|e| ExecutionError::Database(format!("Failed to delete validator: {e}")))?;

trace!(
cluster_id = ?cluster_id,
Expand Down Expand Up @@ -504,11 +472,6 @@ impl EventProcessor {

// Update the status of the cluster to be liquidated
self.db.update_status(cluster_id, true, tx).map_err(|e| {
debug!(
cluster_id = ?cluster_id,
error = %e,
"Failed to mark cluster as liquidated"
);
ExecutionError::Database(format!("Failed to mark cluster as liquidated: {e}"))
})?;

Expand Down Expand Up @@ -542,11 +505,6 @@ impl EventProcessor {

// Update the status of the cluster to be active
self.db.update_status(cluster_id, false, tx).map_err(|e| {
debug!(
cluster_id = ?cluster_id,
error = %e,
"Failed to mark cluster as active"
);
ExecutionError::Database(format!("Failed to mark cluster as active: {e}"))
})?;

Expand Down Expand Up @@ -577,11 +535,6 @@ impl EventProcessor {
self.db
.update_fee_recipient(owner, recipientAddress, tx)
.map_err(|e| {
debug!(
owner = ?owner,
error = %e,
"Failed to update fee recipient"
);
ExecutionError::Database(format!("Failed to update fee recipient: {e}"))
})?;
debug!(
Expand Down Expand Up @@ -659,16 +612,9 @@ impl EventProcessor {
);
}
Err(err) => {
// If the channel is closed, we can't send the exit request
// This is a fatal error and should be handled by the caller
error!(
validator_pubkey = %validator_pubkey,
?err,
"Failed to send validator exit request to processor"
);
return Err(ExecutionError::Misc(
"Failed to send validator exit request to processor".to_string(),
));
return Err(ExecutionError::ExitProcessorUnavailable(format!(
"Failed to send validator exit request to processor: {err}"
)));
}
}

Expand Down Expand Up @@ -735,7 +681,8 @@ impl EventProcessor {
/// mismatch
///
/// # Note
/// If the cluster is already liquidated, the function will return `Ok(())` but issue a warning.
/// If the cluster is already liquidated, the function returns `SkippedEvent` so the caller can
/// skip it without aborting the batch.
fn verify_validator_owner(
&self,
owner: &Address,
Expand All @@ -762,7 +709,7 @@ impl EventProcessor {
}

if cluster.liquidated {
return Err(ExecutionError::Misc(
return Err(ExecutionError::SkippedEvent(
"Cluster is liquidated, skipping exit processing".to_string(),
));
}
Expand Down
2 changes: 0 additions & 2 deletions anchor/eth/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,13 @@ impl SsvEventSyncer {
continue;
}
Err(e) => {
error!(?e, "Failed to fetch EL sync status");
return Err(ExecutionError::RpcError(format!(
"Failed to fetch EL sync status: {e}"
)));
}
}

let current_block = self.rpc_client.get_block_number().await.map_err(|e| {
error!(?e, "Failed to fetch block number");
ExecutionError::RpcError(format!("Failed to fetch block number: {e}"))
})?;
metrics::set_gauge(&metrics::EXECUTION_CURRENT_BLOCK, current_block as i64);
Expand Down
Loading
Loading