Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions integration/rust/tests/integration/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,15 @@ async fn assert_cancelled(
) {
let result = timeout(Duration::from_secs(5), handle)
.await
.expect(&format!(
"{label}: cancelled query did not unblock within 5 seconds"
))
.expect(&format!("{label}: task panicked"));
.unwrap_or_else(|_| panic!("{label}: cancelled query did not unblock within 5 seconds"))
.unwrap_or_else(|_| panic!("{label}: task panicked"));

let err = result.expect_err(&format!(
"{label}: query should have been cancelled, but it succeeded"
));
let db_err = err.as_db_error().expect(&format!(
"{label}: expected a PostgreSQL error, not a network error"
));
let db_err = err
.as_db_error()
.unwrap_or_else(|| panic!("{label}: expected a PostgreSQL error, not a network error"));

assert_eq!(
db_err.code().code(),
Expand Down
2 changes: 1 addition & 1 deletion pgdog/src/backend/pool/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,7 @@ mod test {

// Trigger schema_not_needed on each shard after a short delay so the
// waiter wakes up via the per-shard schema_waiter notification.
let shards: Vec<_> = cluster.shards.iter().cloned().collect();
let shards: Vec<_> = cluster.shards.to_vec();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
for shard in &shards {
Expand Down
2 changes: 1 addition & 1 deletion pgdog/src/backend/pool/lb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl LoadBalancer {
match timeout(self.checkout_timeout, self.get_primary_internal(request)).await {
Ok(Ok(guard)) => Ok(guard),
Err(_) => Err(Error::CheckoutTimeout),
Ok(Err(err)) => Err(err.into()),
Ok(Err(err)) => Err(err),
}
}

Expand Down
10 changes: 5 additions & 5 deletions pgdog/src/backend/replication/logical/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl SchemaStatement {
let stmt = match stmt {
Statement::Index { table, sql, .. } => pgdog_stats::SchemaStatement {
id,
user: user.into(),
user,
shard,
sql: sql.clone(),
kind: StatementKind::Index,
Expand All @@ -254,7 +254,7 @@ impl SchemaStatement {
},
Statement::Table { table, sql } => pgdog_stats::SchemaStatement {
id,
user: user.into(),
user,
shard,
sql: sql.clone(),
kind: StatementKind::Table,
Expand All @@ -265,7 +265,7 @@ impl SchemaStatement {
},
Statement::Other { sql, .. } => pgdog_stats::SchemaStatement {
id,
user: user.into(),
user,
shard,
sql: sql.clone(),
kind: StatementKind::Statement,
Expand All @@ -276,7 +276,7 @@ impl SchemaStatement {
},
Statement::SequenceOwner { sql, .. } => pgdog_stats::SchemaStatement {
id,
user: user.into(),
user,
shard,
sql: sql.to_string(),
kind: StatementKind::Statement,
Expand All @@ -287,7 +287,7 @@ impl SchemaStatement {
},
Statement::SequenceSetMax { sql, .. } => pgdog_stats::SchemaStatement {
id,
user: user.into(),
user,
shard,
sql: sql.clone(),
kind: StatementKind::Statement,
Expand Down
6 changes: 3 additions & 3 deletions pgdog/src/backend/replication/logical/subscriber/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ impl StreamSubscriber {
} else {
statements.insert.parse()
};
let ctx = StreamContext::new(&self.cluster, &insert.tuple_data, &parse)?;
let ctx = StreamContext::new(&self.cluster, &insert.tuple_data, parse)?;
self.send(ctx.shard(), ctx.bind()).await?;
}

Expand Down Expand Up @@ -573,9 +573,9 @@ impl StreamSubscriber {
// FULL identity guarantees old_full is fully materialised; 'u' columns in
// update.new carry the same value as the corresponding column in old_full.
// Routing from a raw 'u' column yields empty bytes → wrong shard.
let complete_new = update.new.fill_toasted_from(&old_full)?;
let complete_new = update.new.fill_toasted_from(old_full)?;
let new_shard = self.shard_for(&complete_new, &update_parse)?;
let old_shard = self.shard_for(&old_full, &update_parse)?;
let old_shard = self.shard_for(old_full, &update_parse)?;

if new_shard != old_shard {
// Shard key changed: DELETE on old shard, INSERT on new shard.
Expand Down
12 changes: 5 additions & 7 deletions pgdog/src/frontend/router/parser/cache/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,11 @@ impl Ast {
}
}

NodeRef::DropStmt(stmt) => {
if stmt.remove_type() == ObjectType::ObjectTable {
for object in &stmt.objects {
if let Some(NodeEnum::List(ref list)) = object.node {
if let Ok(table) = Table::try_from(list) {
tables.insert(table);
}
NodeRef::DropStmt(stmt) if stmt.remove_type() == ObjectType::ObjectTable => {
for object in &stmt.objects {
if let Some(NodeEnum::List(ref list)) = object.node {
if let Ok(table) = Table::try_from(list) {
tables.insert(table);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pgdog/src/frontend/router/parser/query/test/test_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fn test_limit_offset_with_bad_params() {
Sync.into(),
]);

let err = command.err().expect("limit should fail");
let err = command.expect_err("limit should fail");
assert_eq!(
err.to_string(),
"expected parameter $1 to be an integer, got 'apples' instead"
Expand All @@ -106,7 +106,7 @@ fn test_limit_offset_with_bad_params() {
Sync.into(),
]);

let err = command.err().expect("offset should fail");
let err = command.expect_err("offset should fail");
assert_eq!(
err.to_string(),
"expected parameter $2 to be an integer, got 'oranges' instead"
Expand Down
6 changes: 3 additions & 3 deletions pgdog/src/frontend/router/parser/query/test/test_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,14 @@ fn test_reset() {
#[test]
fn test_set_single_primary() {
let mut test = QueryParserTest::new_single_primary(&config());
let command = test.execute(vec![Query::new("SET statement_timeout TO 1").into()].into());
let command = test.execute(vec![Query::new("SET statement_timeout TO 1").into()]);
assert!(matches!(command, Command::Set { .. }));

let mut config = (*config()).clone();
config.config.general.query_parser = pgdog_config::QueryParserLevel::Off;

let mut test = QueryParserTest::new_single_primary(&config);
let command = test.execute(vec![Query::new("SET statement_timeout TO 1").into()].into());
let command = test.execute(vec![Query::new("SET statement_timeout TO 1").into()]);
match command {
Command::Query(query) => assert_eq!(
query.shard_with_priority().source(),
Expand All @@ -187,7 +187,7 @@ fn test_set_single_primary() {
#[test]
fn test_single_shard_set() {
let mut test = QueryParserTest::new_single_shard(&config());
let command = test.execute(vec![Query::new("SET lock_timeout TO '1s'").into()].into());
let command = test.execute(vec![Query::new("SET lock_timeout TO '1s'").into()]);

match command {
Command::Set { route, .. } => assert!(!route.is_cross_shard()),
Expand Down
8 changes: 4 additions & 4 deletions pgdog/src/frontend/router/parser/query/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ impl QueryParser {
TransactionStmtKind::TransStmtRollbackTo => rollback_savepoint = true,
TransactionStmtKind::TransStmtPrepare
| TransactionStmtKind::TransStmtCommitPrepared
| TransactionStmtKind::TransStmtRollbackPrepared => {
if context.router_context.two_pc {
return Err(Error::NoTwoPc);
}
| TransactionStmtKind::TransStmtRollbackPrepared
if context.router_context.two_pc =>
{
return Err(Error::NoTwoPc);
}
_ => (),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl StatementRewrite<'_> {
return Ok(());
};

let aggregate = Aggregate::parse(&select);
let aggregate = Aggregate::parse(select);
if aggregate.is_empty() {
return Ok(());
}
Expand Down
16 changes: 6 additions & 10 deletions pgdog/src/frontend/router/parser/rewrite/statement/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,8 @@ impl StatementRewrite<'_> {
fn collect_params(node: &Node, params: &mut Vec<u16>) {
if let Some(node_enum) = &node.node {
match node_enum {
NodeEnum::ParamRef(param) => {
if param.number > 0 {
params.push((param.number - 1) as u16);
}
NodeEnum::ParamRef(param) if param.number > 0 => {
params.push((param.number - 1) as u16);
}
NodeEnum::List(list) => {
for item in &list.items {
Expand All @@ -291,12 +289,10 @@ impl StatementRewrite<'_> {
fn renumber_params(node: &mut Node, params: &[u16]) {
if let Some(node_enum) = &mut node.node {
match node_enum {
NodeEnum::ParamRef(param) => {
if param.number > 0 {
let old_pos = (param.number - 1) as u16;
if let Some(new_pos) = params.iter().position(|&p| p == old_pos) {
param.number = (new_pos + 1) as i32;
}
NodeEnum::ParamRef(param) if param.number > 0 => {
let old_pos = (param.number - 1) as u16;
if let Some(new_pos) = params.iter().position(|&p| p == old_pos) {
param.number = (new_pos + 1) as i32;
}
}
NodeEnum::List(list) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ fn rewrite_params(parse_result: &mut ParseResult) -> Result<Vec<u16>, Error> {
})?;

let mut params: Vec<(i32, i32)> = params.into_iter().collect();
params.sort_by(|a, b| a.1.cmp(&b.1));
params.sort_by_key(|a| a.1);

Ok(params
.into_iter()
Expand Down
5 changes: 2 additions & 3 deletions pgdog/src/frontend/router/parser/where_clause.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ impl<'a> WhereClause<'a> {
let mut keys = vec![];

match node.node {
Some(NodeEnum::NullTest(ref null_test)) => {
Some(NodeEnum::NullTest(ref null_test))
// Only check for IS NULL, IS NOT NULL definitely doesn't help.
if NullTestType::try_from(null_test.nulltesttype) == Ok(NullTestType::IsNull) {
if NullTestType::try_from(null_test.nulltesttype) == Ok(NullTestType::IsNull) => {
let left = null_test
.arg
.as_ref()
Expand All @@ -216,7 +216,6 @@ impl<'a> WhereClause<'a> {
keys.push(Output::NullCheck(c));
}
}
}

Some(NodeEnum::BoolExpr(ref expr)) => {
// Only AND expressions can really be asserted.
Expand Down
12 changes: 4 additions & 8 deletions pgdog/src/frontend/router/sharding/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,11 @@ impl<'a> Ranges<'a> {
let mut matches = 0;
for value in &bound {
match value {
Some(FlexibleType::String(s)) => {
if range.varchar(s) {
matches += 1;
}
Some(FlexibleType::String(s)) if range.varchar(s) => {
matches += 1;
}
Some(FlexibleType::Integer(i)) => {
if range.integer(i) {
matches += 1;
}
Some(FlexibleType::Integer(i)) if range.integer(i) => {
matches += 1;
}
_ => (),
}
Expand Down
6 changes: 6 additions & 0 deletions pgdog/src/net/messages/frontend_pid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ static COUNTER: AtomicU32 = AtomicU32::new(0);
#[derive(Copy, Clone, Debug, Display, Hash, PartialEq, Eq)]
pub struct FrontendPid(i32);

impl Default for FrontendPid {
fn default() -> Self {
Self::new()
}
}

impl FrontendPid {
pub fn new() -> Self {
// Mask off the sign bit so the synthetic pid is always non-negative,
Expand Down
4 changes: 2 additions & 2 deletions pgdog/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,12 @@ pub fn raise_nofile_limit() -> u64 {
"failed to raise NOFILE soft limit from {} to {}",
prev, rlim.rlim_max
);
return prev as u64;
return prev;
}
}
}

rlim.rlim_cur as u64
rlim.rlim_cur
}

#[cfg(not(unix))]
Expand Down
Loading