Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ resolver = "2"

[workspace.package]
license = "Apache-2.0"
edition = "2021"
edition = "2024"
repository = "https://github.com/superfly/corrosion"
homepage = "https://superfly.github.io/corrosion/"

Expand Down
2 changes: 1 addition & 1 deletion crates/backoff/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{iter, time};

use rand::{rng, Rng};
use rand::{Rng, rng};

/// Exponential backoff.
#[derive(Debug, Clone)]
Expand Down
4 changes: 2 additions & 2 deletions crates/consul-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::{
fs::OpenOptions,
};

use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_with::{serde_as, NoneAsEmptyString};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use serde_with::{NoneAsEmptyString, serde_as};

use rusqlite::types::{FromSql, FromSqlError, ValueRef};

Expand Down
8 changes: 4 additions & 4 deletions crates/corro-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use corro_types::{
updates::Handle,
};
use futures::{SinkExt, TryStreamExt};
use rusqlite::{named_params, params, OptionalExtension};
use rusqlite::{OptionalExtension, named_params, params};
use serde::{Deserialize, Serialize};
use serde_json::json;
use spawn::spawn_counted;
Expand All @@ -24,11 +24,11 @@ use tokio::{
sync::{mpsc, oneshot},
task::block_in_place,
};
use tokio_serde::{formats::Json, Framed};
use tokio_serde::{Framed, formats::Json};
use tokio_util::codec::LengthDelimitedCodec;
use tracing::{debug, error, info, warn};
use tracing_filter::{legacy::Filter, FilterLayer};
use tracing_subscriber::{reload::Handle as ReloadHandle, Registry};
use tracing_filter::{FilterLayer, legacy::Filter};
use tracing_subscriber::{Registry, reload::Handle as ReloadHandle};
use tripwire::Tripwire;
use uuid::Uuid;

Expand Down
27 changes: 20 additions & 7 deletions crates/corro-agent/benches/process_changes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use corro_agent::api::public::api_v1_db_schema;
use criterion::{
black_box, criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput,
BatchSize, BenchmarkId, Criterion, Throughput, black_box, criterion_group, criterion_main,
};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
Expand All @@ -14,13 +14,13 @@ use corro_types::{
api::Statement,
base::{CrsqlDbVersion, CrsqlSeq},
broadcast::{ChangeSource, ChangeV1, Changeset},
change::{row_to_change, Change},
change::{Change, row_to_change},
};
use hyper::StatusCode;

use corro_agent::{
agent::process_multiple_changes,
api::public::{api_v1_transactions, TimeoutParams},
api::public::{TimeoutParams, api_v1_transactions},
};

/// Configuration for a benchark run
Expand Down Expand Up @@ -130,15 +130,28 @@ async fn generate_changesets(
// Insert - use high IDs to avoid conflicts
let id = config.initial_rows_per_table + tx_idx * 1000 + rng.random_range(0..1000);
Statement::WithParams(
format!("INSERT OR REPLACE INTO {table_name} (id, value, counter, random) VALUES (?, ?, ?, ?)"),
vec![(id as i64).into(), format!("value_{id}").into(), rng.random_range(0..100000).into(), 0.into()],
format!(
"INSERT OR REPLACE INTO {table_name} (id, value, counter, random) VALUES (?, ?, ?, ?)"
),
vec![
(id as i64).into(),
format!("value_{id}").into(),
rng.random_range(0..100000).into(),
0.into(),
],
)
} else if op_type < insert_pct + update_pct {
// Update - use existing IDs
let id = rng.random_range(1..=config.initial_rows_per_table.min(1000));
Statement::WithParams(
format!("UPDATE {table_name} SET counter = counter + 1, value = ?, random = ? WHERE id = ?"),
vec![format!("updated_{}", rng.random_range(0..1000)).into(), rng.random_range(0..100000).into(), (id as i64).into()],
format!(
"UPDATE {table_name} SET counter = counter + 1, value = ?, random = ? WHERE id = ?"
),
vec![
format!("updated_{}", rng.random_range(0..1000)).into(),
rng.random_range(0..100000).into(),
(id as i64).into(),
],
)
} else {
// Delete
Expand Down
8 changes: 6 additions & 2 deletions crates/corro-agent/src/agent/bi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ pub fn spawn_bipayload_handler(
)
.await
{
warn!("could not complete receiving sync: {e}");
warn!(
"could not complete receiving sync: {e}"
);
}
break;
}
Expand All @@ -109,7 +111,9 @@ pub fn spawn_bipayload_handler(
}

Err(e) => {
error!("could not read framed payload from bidirectional stream: {e}");
error!(
"could not read framed payload from bidirectional stream: {e}"
);
}
},
}
Expand Down
6 changes: 3 additions & 3 deletions crates/corro-agent/src/agent/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use crate::agent::RANDOM_NODES_CHOICES;
use corro_types::{agent::SplitPool, config::DEFAULT_GOSSIP_PORT};

use hickory_resolver::{
proto::rr::{RData, RecordType},
ResolveErrorKind,
proto::rr::{RData, RecordType},
};
use rand::{rngs::StdRng, seq::IteratorRandom, SeedableRng};
use rand::{SeedableRng, rngs::StdRng, seq::IteratorRandom};
use std::{collections::HashSet, net::SocketAddr};
use tokio::task::block_in_place;
use tracing::{debug, error, warn};
Expand Down Expand Up @@ -60,8 +60,8 @@ async fn resolve_bootstrap(
our_addr: SocketAddr,
) -> eyre::Result<HashSet<SocketAddr>> {
use hickory_resolver::{
config::{NameServerConfigGroup, ResolverConfig},
Resolver,
config::{NameServerConfigGroup, ResolverConfig},
};

let mut addrs = HashSet::new();
Expand Down
19 changes: 9 additions & 10 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ use std::{

use crate::{
agent::{
bi, bootstrap, uni,
ANNOUNCE_INTERVAL, SyncClientError, bi, bootstrap, uni,
util::{log_at_pow_10, process_multiple_changes},
SyncClientError, ANNOUNCE_INTERVAL,
},
api::peer::parallel_sync,
transport::Transport,
Expand All @@ -33,19 +32,19 @@ use corro_types::{
use bytes::Bytes;
use corro_types::broadcast::Timestamp;
use foca::OwnedNotification;
use indexmap::map::Entry;
use indexmap::IndexMap;
use indexmap::map::Entry;
use metrics::{counter, gauge, histogram};
use rand::{prelude::IteratorRandom, rngs::StdRng, SeedableRng};
use rand::{SeedableRng, prelude::IteratorRandom, rngs::StdRng};
use rangemap::RangeInclusiveSet;
use spawn::spawn_counted;
use tokio::time::sleep;
use tokio::{
sync::mpsc::Receiver as TokioReceiver,
task::{block_in_place, JoinSet},
task::{JoinSet, block_in_place},
};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tracing::{debug, debug_span, error, info, trace, warn, Instrument};
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
use tracing::{Instrument, debug, debug_span, error, info, trace, warn};
use tripwire::{Outcome, PreemptibleFutureExt, TimeoutFutureExt, Tripwire};

/// Spawn a tree of tasks that handles incoming gossip server
Expand Down Expand Up @@ -966,11 +965,11 @@ mod tests {
use crate::api::public::api_v1_db_schema;

use super::*;
use axum::{http::StatusCode, Extension, Json};
use axum::{Extension, Json, http::StatusCode};
use corro_tests::TEST_SCHEMA;
use corro_types::api::{ColumnName, TableName};
use corro_types::{
base::{dbsr, dbvr, CrsqlDbVersion},
base::{CrsqlDbVersion, dbsr, dbvr},
broadcast::Changeset,
change::Change,
config::Config,
Expand All @@ -979,7 +978,7 @@ mod tests {
use rusqlite::Connection;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::{timeout, Duration};
use tokio::time::{Duration, timeout};

#[test]
fn ensure_truncate_works() -> eyre::Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions crates/corro-agent/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ mod tests;
use bytes::Bytes;
use corro_types::api::QueryEventMeta;
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::{broadcast::Sender, RwLock};
use tokio::sync::{RwLock, broadcast::Sender};
use uuid::Uuid;

// Public exports
pub use error::{SyncClientError, SyncRecvError};
pub use run_root::start_with_config;
pub use setup::{setup, AgentOptions};
pub use setup::{AgentOptions, setup};
pub use uni::spawn_unipayload_handler;
pub use util::process_multiple_changes;

Expand Down
3 changes: 2 additions & 1 deletion crates/corro-agent/src/agent/run_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use std::time::Instant;
use crate::api::public::execute_schema;
use crate::{
agent::{
AgentOptions,
handlers::{self, spawn_handle_db_maintenance},
metrics, setup, util, AgentOptions,
metrics, setup, util,
},
broadcast::runtime_loop,
transport::Transport,
Expand Down
10 changes: 5 additions & 5 deletions crates/corro-agent/src/agent/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use std::{net::SocketAddr, ops::DerefMut, sync::Arc, time::Duration};
use tokio::{
net::TcpListener,
sync::{
mpsc::{channel as tokio_channel, Receiver as TokioReceiver},
RwLock as TokioRwLock, Semaphore,
mpsc::{Receiver as TokioReceiver, channel as tokio_channel},
},
};
use tracing::{debug, error, info, trace, warn};
Expand All @@ -26,7 +26,7 @@ use crate::{
api::{
peer::gossip_server_endpoint,
public::{
pubsub::{process_sub_channel, MatcherBroadcastCache, SharedMatcherBroadcastCache},
pubsub::{MatcherBroadcastCache, SharedMatcherBroadcastCache, process_sub_channel},
update::SharedUpdateBroadcastCache,
},
},
Expand All @@ -35,15 +35,15 @@ use crate::{
use corro_types::{
actor::ActorId,
agent::{
migrate, Agent, AgentConfig, Booked, BookedVersions, LockRegistry, LockState, SplitPool,
Agent, AgentConfig, Booked, BookedVersions, LockRegistry, LockState, SplitPool, migrate,
},
base::{CrsqlDbVersion, CrsqlDbVersionRange},
broadcast::{BroadcastInput, ChangeSource, ChangeV1, FocaInput},
channel::{bounded, CorroReceiver},
channel::{CorroReceiver, bounded},
config::Config,
members::Members,
pubsub::{Matcher, SubsManager},
schema::{init_schema, Schema},
schema::{Schema, init_schema},
sqlite::CrConn,
updates::UpdatesManager,
};
Expand Down
14 changes: 8 additions & 6 deletions crates/corro-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ use std::{
};

use axum::Extension;
use futures::{future, stream::FuturesUnordered, StreamExt, TryStreamExt};
use futures::{StreamExt, TryStreamExt, future, stream::FuturesUnordered};
use hyper::StatusCode;
use rand::{prelude::Distribution, rngs::StdRng, seq::IteratorRandom, SeedableRng};
use rand::{SeedableRng, prelude::Distribution, rngs::StdRng, seq::IteratorRandom};
use rangemap::RangeInclusiveSet;
use serde::Deserialize;
use serde_json::json;
use spawn::wait_for_all_pending_handles;
use tokio::{
sync::mpsc,
time::{sleep, timeout, MissedTickBehavior},
time::{MissedTickBehavior, sleep, timeout},
};
use tracing::{debug, info_span};
use tripwire::Tripwire;
Expand All @@ -25,7 +25,7 @@ use crate::{
agent::process_multiple_changes,
api::{
peer::parallel_sync,
public::{api_v1_db_schema, api_v1_transactions, TimeoutParams},
public::{TimeoutParams, api_v1_db_schema, api_v1_transactions},
},
transport::Transport,
};
Expand All @@ -34,7 +34,7 @@ use corro_types::change::Change;
use corro_types::{
actor::{ActorId, MemberId},
api::{ExecResponse, ExecResult, Statement},
base::{dbsr, dbsri, dbvri, CrsqlDbVersion, CrsqlDbVersionRange, CrsqlSeq},
base::{CrsqlDbVersion, CrsqlDbVersionRange, CrsqlSeq, dbsr, dbsri, dbvri},
broadcast::{ChangeSource, ChangeV1, Changeset},
sync::generate_sync,
};
Expand Down Expand Up @@ -604,7 +604,9 @@ async fn large_tx_sync() -> eyre::Result<()> {
];

for n in counts.iter() {
let req_body: Vec<Statement> = serde_json::from_value(json!([format!("INSERT INTO testsbool (id) WITH RECURSIVE cte(id) AS ( SELECT random() UNION ALL SELECT random() FROM cte LIMIT {n} ) SELECT id FROM cte;")]))?;
let req_body: Vec<Statement> = serde_json::from_value(json!([format!(
"INSERT INTO testsbool (id) WITH RECURSIVE cte(id) AS ( SELECT random() UNION ALL SELECT random() FROM cte LIMIT {n} ) SELECT id FROM cte;"
)]))?;

let res = timeout(
Duration::from_secs(5),
Expand Down
Loading
Loading