Skip to content

Commit 8b27e2c

Browse files
authored
feat: add some validation logic to migration from-ipfs (#685)
* feat: add some validation logic to migration from-ipfs Since ceramic-one nodes refuse to sync with nodes that share bad data its important that the migration process ensure no bad data is migrated. This change adds the ability to validate signatures and chain ids skipping events that do not pass validation. Additionally you can also filter events to only events for a specific model. * fix: clippy * fix test
1 parent 82668a1 commit 8b27e2c

5 files changed

Lines changed: 122 additions & 34 deletions

File tree

event-svc/src/event/migration.rs

Lines changed: 64 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
use anyhow::{anyhow, Context as _};
77
use ceramic_core::{EventId, Network, StreamId};
88
use ceramic_event::unvalidated::{self, signed::cacao::Capability};
9+
use ceramic_validation::{event_verifier::Verifier, AtTime, VerifyJwsOpts};
910
use cid::Cid;
1011
use futures::TryStreamExt;
1112
use ipld_core::ipld::Ipld;
@@ -26,6 +27,10 @@ pub struct Migrator<'a, S> {
2627
blocks: S,
2728
batch: Vec<ReconItem<EventId>>,
2829
log_tile_docs: bool,
30+
// set of sep values to import, if an event doesn't have these values it will be skipped.
31+
sep_filter: Vec<Vec<u8>>,
32+
validate_signatures: bool,
33+
supported_chains: Option<Vec<String>>,
2934

3035
// All unsigned init payloads we have found.
3136
unsigned_init_payloads: BTreeSet<Cid>,
@@ -48,12 +53,18 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
4853
network: Network,
4954
blocks: S,
5055
log_tile_docs: bool,
56+
sep_filter: Vec<Vec<u8>>,
57+
validate_signatures: bool,
58+
supported_chains: Option<Vec<String>>,
5159
) -> Result<Self> {
5260
Ok(Self {
5361
network,
5462
service,
5563
blocks,
5664
log_tile_docs,
65+
sep_filter,
66+
validate_signatures,
67+
supported_chains,
5768
batch: Default::default(),
5869
unsigned_init_payloads: Default::default(),
5970
referenced_unsigned_init_payloads: Default::default(),
@@ -71,7 +82,7 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
7182
Err(Error::MissingBlock(*cid))
7283
}
7384
}
74-
fn handle_error(&mut self, cid: Cid, err: &Error) {
85+
fn handle_error(&mut self, cid: Cid, err: &Error, model_context: ModelContext) {
7586
let log = match err {
7687
Error::FoundInitTileDoc(_) | Error::FoundDataTileDoc(_) => {
7788
self.tile_doc_count += 1;
@@ -86,12 +97,12 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
8697
.entry(model.to_owned())
8798
.and_modify(|count| *count += 1)
8899
.or_insert(1);
89-
self.handle_error(cid, err);
100+
self.handle_error(cid, err, model.clone());
90101
false
91102
}
92103
};
93104
if log {
94-
error!(%cid, err=format!("{err:#}"), "error processing block");
105+
error!(%cid, err=format!("{err:#}"), model=%model_context, "error processing block");
95106
}
96107
}
97108

@@ -105,7 +116,7 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
105116
while let Some((cid, data)) = all_blocks.try_next().await? {
106117
let ret = self.process_block(cid, &data).await;
107118
if let Err(err) = ret {
108-
self.handle_error(cid, &err)
119+
self.handle_error(cid, &err, ModelContext(None))
109120
}
110121
if self.batch.len() > 1000 {
111122
if let Err(err) = self.write_batch().await {
@@ -208,12 +219,8 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
208219
);
209220
let event = unvalidated::init::Event::new(payload);
210221
let event: unvalidated::Event<Ipld> = unvalidated::Event::from(Box::new(event));
211-
self.batch.push(
212-
event_builder
213-
.build(&self.network, event)
214-
.await
215-
.with_model_context(&model)?,
216-
);
222+
self.validate_build_and_push(event_builder, event, &model)
223+
.await?;
217224
if self.batch.len() > 1000 {
218225
self.write_batch().await?
219226
}
@@ -296,13 +303,8 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
296303
}
297304
let s = unvalidated::signed::Event::new(cid, event, link, payload, capability);
298305
let event = unvalidated::Event::from(s);
299-
self.batch.push(
300-
event_builder
301-
.build(&self.network, event)
302-
.await
303-
.with_model_context(&model)?,
304-
);
305-
Ok(())
306+
self.validate_build_and_push(event_builder, event, &model)
307+
.await
306308
}
307309
fn is_tile_doc_init(&self, data: &[u8]) -> bool {
308310
// Attempt to decode the payload as a loose TileDocument.
@@ -427,12 +429,51 @@ impl<'a, S: BlockStore> Migrator<'a, S> {
427429
}
428430
let time = unvalidated::TimeEvent::new(event, proof, proof_edges);
429431
let event: unvalidated::Event<Ipld> = unvalidated::Event::from(Box::new(time));
430-
self.batch.push(
431-
event_builder
432-
.build(&self.network, event)
433-
.await
434-
.with_model_context(&model)?,
435-
);
432+
self.validate_build_and_push(event_builder, event, &model)
433+
.await
434+
}
435+
436+
// Build the event and push onto the batch if its is valid.
437+
async fn validate_build_and_push(
438+
&mut self,
439+
event_builder: EventBuilder,
440+
event: unvalidated::Event<Ipld>,
441+
model: &ModelContext,
442+
) -> Result<()> {
443+
if self.sep_filter.is_empty() || self.sep_filter.contains(&event_builder.sep) {
444+
match &event {
445+
unvalidated::Event::Time(event) => {
446+
if let Some(supported_chains) = &self.supported_chains {
447+
let chain_id = event.proof().chain_id().to_owned();
448+
if !supported_chains.contains(&chain_id) {
449+
return Err(anyhow!("event has unsupported chain: {chain_id}"))
450+
.with_model_context(model);
451+
}
452+
}
453+
}
454+
unvalidated::Event::Signed(event) => {
455+
if self.validate_signatures {
456+
event
457+
.verify_signature(
458+
Some(&event_builder.controller),
459+
&VerifyJwsOpts {
460+
at_time: AtTime::SkipTimeChecks,
461+
..Default::default()
462+
},
463+
)
464+
.await
465+
.with_model_context(model)?;
466+
}
467+
}
468+
unvalidated::Event::Unsigned(_) => {}
469+
};
470+
self.batch.push(
471+
event_builder
472+
.build(&self.network, event)
473+
.await
474+
.with_model_context(model)?,
475+
);
476+
}
436477
Ok(())
437478
}
438479
}

event-svc/src/event/service.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,21 @@ impl EventService {
201201
network: Network,
202202
blocks: impl BlockStore,
203203
log_tile_docs: bool,
204+
sep_filter: Vec<Vec<u8>>,
205+
validate_signatures: bool,
206+
supported_chains: Option<Vec<String>>,
204207
) -> Result<()> {
205-
let migrator = Migrator::new(self, network, blocks, log_tile_docs)
206-
.await
207-
.map_err(Error::new_fatal)?;
208+
let migrator = Migrator::new(
209+
self,
210+
network,
211+
blocks,
212+
log_tile_docs,
213+
sep_filter,
214+
validate_signatures,
215+
supported_chains,
216+
)
217+
.await
218+
.map_err(Error::new_fatal)?;
208219
migrator.migrate().await.map_err(Error::new_fatal)?;
209220
Ok(())
210221
}

event-svc/src/tests/migration.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ async fn test_migration(cars: Vec<Vec<u8>>) {
5656
let conn = crate::store::SqlitePool::connect_in_memory().await.unwrap();
5757
let service = EventService::new_with_event_validation(conn).await.unwrap();
5858
service
59-
.migrate_from_ipfs(Network::Local(42), blocks, false)
59+
.migrate_from_ipfs(Network::Local(42), blocks, false, vec![], false, None)
6060
.await
6161
.unwrap();
6262
let actual_events: BTreeSet<_> = ceramic_api::EventService::range_with_values(

one/src/migrations.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
use std::{path::PathBuf, sync::Arc};
1+
use std::{path::PathBuf, str::FromStr as _, sync::Arc};
22

33
use anyhow::{anyhow, Result};
44
use async_stream::try_stream;
55
use async_trait::async_trait;
6+
use ceramic_core::StreamId;
67
use ceramic_event::unvalidated;
78
use ceramic_event_svc::{BlockStore, EventService};
89
use ceramic_metrics::config::Config as MetricsConfig;
@@ -105,6 +106,21 @@ pub struct FromIpfsOpts {
105106
/// Number of files to process
106107
#[clap(long, short = 'l', env = "CERAMIC_ONE_MIGRATION_LIMIT")]
107108
limit: Option<u64>,
109+
110+
/// Optional list of model stream ids. Only events from these models will be migrated.
111+
/// If the list is empty all events are migrated.
112+
#[clap(long, value_delimiter = ',', env = "CERAMIC_ONE_MODEL_FILTER")]
113+
model_filter: Vec<String>,
114+
115+
/// Whether to validate the signatures of signed events.
116+
/// Events with invalid signatures will be skipped and counted as errors.
117+
#[clap(long, env = "CERAMIC_ONE_VALIDATE_SIGNATURES")]
118+
validate_signatures: bool,
119+
120+
/// Whether to validate the chain of time events.
121+
/// Events with invalid chains will be skipped and counted as errors.
122+
#[clap(long, env = "CERAMIC_ONE_VALIDATE_CHAINS")]
123+
validate_chain: bool,
108124
}
109125

110126
impl From<&FromIpfsOpts> for DBOpts {
@@ -165,7 +181,27 @@ async fn from_ipfs(opts: FromIpfsOpts) -> Result<()> {
165181
file_limit: opts.limit,
166182
};
167183
event_svc
168-
.migrate_from_ipfs(network, blocks, opts.log_tile_docs)
184+
.migrate_from_ipfs(
185+
network,
186+
blocks,
187+
opts.log_tile_docs,
188+
opts.model_filter
189+
.iter()
190+
.map(|model| {
191+
StreamId::from_str(model)
192+
.map_err(|err| anyhow!("model filter must be valid stream id: {err}"))
193+
.map(|s| s.to_vec())
194+
})
195+
.collect::<Result<Vec<Vec<u8>>, _>>()?,
196+
opts.validate_signatures,
197+
opts.validate_chain
198+
.then(|| {
199+
opts.network
200+
.supported_chain_ids()
201+
.map(|chains| chains.into_iter().map(|chain| chain.to_string()).collect())
202+
})
203+
.flatten(),
204+
)
169205
.await?;
170206
Ok(())
171207
}

pipeline/src/aggregator/model_instance_validate.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ impl ModelInstanceValidate {
8585
.model_instance
8686
.is_valid(i)
8787
.then(|| serde_json::from_slice::<ModelInstance>(columns.model_instance.value(i)))
88-
.ok_or_validation_internal_err("cannot validate null instance document"))
88+
.ok_or_validation_failure("cannot validate null instance document"))
8989
.context("instance payload not a valid json documnet")
9090
.map_to_validation_failure());
9191
let patch = maybe_fail!(columns
@@ -96,16 +96,16 @@ impl ModelInstanceValidate {
9696
.context("instance patch is not a valid json patch document")
9797
// This is an error because to get to this point we know that the patch has already
9898
// been applied.
99-
.map_to_validation_internal_err());
99+
.map_to_validation_failure());
100100
let model_version = maybe_fail!(maybe_fail!(columns
101101
.model_versions
102102
.is_valid(i)
103103
.then(|| Cid::read_bytes(columns.model_versions.value(i)))
104-
.ok_or_validation_internal_err(
104+
.ok_or_validation_failure(
105105
"cannot validate instance against an unknown model version"
106106
))
107107
.context("model version must be a valid CID")
108-
.map_to_validation_internal_err());
108+
.map_to_validation_failure());
109109
let model_definition = maybe_fail!(maybe_fail!(columns
110110
.model_definitions
111111
.is_valid(i)
@@ -123,7 +123,7 @@ impl ModelInstanceValidate {
123123
.event_heights
124124
.is_valid(i)
125125
.then(|| columns.event_heights.value(i))
126-
.ok_or_validation_internal_err(
126+
.ok_or_validation_failure(
127127
"cannot validate an instance with an unknown event event_height"
128128
));
129129

0 commit comments

Comments
 (0)