Skip to content

test(core): Add integration tests for async flush confirmation#644

Open
flak153 wants to merge 1 commit intosupabase:mainfrom
flak153:add-flush-confirmation-tests
Open

test(core): Add integration tests for async flush confirmation#644
flak153 wants to merge 1 commit intosupabase:mainfrom
flak153:add-flush-confirmation-tests

Conversation

@flak153
Copy link
Copy Markdown

@flak153 flak153 commented Mar 27, 2026

Summary

Adds integration tests for the async flush confirmation behavior introduced in #628.

  • DelayedConfirmDestination test utility that defers flush confirmation until test code explicitly calls confirm_all()
  • Events tracked immediately upon receipt (before confirmation), enabling tests to wait for events even when the async result is deferred

Tests

Test What it verifies
delays_slot_advancement Slot doesn't advance without confirmation
confirm_advances_slot Slot advances after confirmation
crash_resume_no_data_loss Unconfirmed data replays after restart
selective_confirmation First tx confirmed advances slot, second tx unconfirmed held back

All 4 tests pass against current main.

Add DelayedConfirmDestination test utility that defers flush
confirmation until test code explicitly calls confirm_all(). Events
are tracked immediately upon receipt (before flush confirmation),
enabling tests to wait for events even when the async result is
deferred.

Integration tests:
- delays_slot_advancement: slot doesn't advance without confirmation
- confirm_advances_slot: slot advances after confirmation
- crash_resume_no_data_loss: unconfirmed data replays after restart
- selective_confirmation: first tx confirmed advances slot, second
  tx unconfirmed held back
@flak153 flak153 requested a review from a team as a code owner March 27, 2026 15:43
Copilot AI review requested due to automatic review settings March 27, 2026 15:43
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds integration test coverage for the async flush confirmation behavior (introduced in #628) by introducing a destination test utility that can defer flush confirmation until the test explicitly releases it.

Changes:

  • Add DelayedConfirmDestination test utility that records events immediately but defers write_events flush completion until confirm_all().
  • Add integration tests asserting replication slot advancement behavior with/without confirmation, including restart replay behavior.
  • Export the new test utility module from etl::test_utils.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
etl/tests/pipeline_destination_controlled_flush.rs New integration tests that validate slot LSN advancement and replay behavior under deferred confirmations.
etl/src/test_utils/mod.rs Exposes the new delayed-confirm destination utility.
etl/src/test_utils/delayed_confirm_destination.rs Implements a test destination that defers async flush completion while still allowing event-based waits.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +214 to +224
{
let inner = self.inner.read().await;
for table_id in table_ids {
inner
.state_store
.store_table_mapping(
table_id,
format!("delayed_confirm_destination_table_{}", table_id.0),
)
.await?;
}
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write_events holds a tokio::RwLock read guard (inner) across await while calling state_store.store_table_mapping(...). This can block confirm_all()/other writers and can lead to unnecessary contention or deadlocks if the store implementation also waits on this destination. Clone the state_store out of the lock (as done in write_table_rows) and drop the guard before awaiting the store calls.

Suggested change
{
let inner = self.inner.read().await;
for table_id in table_ids {
inner
.state_store
.store_table_mapping(
table_id,
format!("delayed_confirm_destination_table_{}", table_id.0),
)
.await?;
}
let state_store = {
let inner = self.inner.read().await;
inner.state_store.clone()
};
for table_id in table_ids {
state_store
.store_table_mapping(
table_id,
format!("delayed_confirm_destination_table_{}", table_id.0),
)
.await?;

Copilot uses AI. Check for mistakes.
Comment on lines +79 to +83
// Wait briefly for the commit to be processed.
tokio::time::sleep(Duration::from_secs(2)).await;

// Do NOT confirm — the slot should not have advanced to the commit LSN.
let commit_lsns = delayed_dest.get_received_commit_lsns().await;
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests rely on fixed sleep(Duration::from_secs(2)) to wait for commits to be processed. This makes the tests slower and can be flaky on slower CI (2s may be insufficient) or overly conservative on fast machines. Prefer waiting deterministically (e.g., poll confirmed_flush_lsn until it reaches/doesn't reach a target with a bounded timeout, or add a TimedNotify-based helper in DelayedConfirmDestination for commit-LSN receipt).

Suggested change
// Wait briefly for the commit to be processed.
tokio::time::sleep(Duration::from_secs(2)).await;
// Do NOT confirm — the slot should not have advanced to the commit LSN.
let commit_lsns = delayed_dest.get_received_commit_lsns().await;
// Wait deterministically for at least one commit LSN to be recorded,
// with a bounded timeout to avoid hanging the test.
let commit_lsns = tokio::time::timeout(Duration::from_secs(10), async {
loop {
let lsns = delayed_dest.get_received_commit_lsns().await;
if !lsns.is_empty() {
break lsns;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("timed out waiting for commit LSNs to be recorded");
// Do NOT confirm — the slot should not have advanced to the commit LSN.

Copilot uses AI. Check for mistakes.
Comment on lines +156 to +160
// Wait for the apply loop to process confirmation and send status update.
tokio::time::sleep(Duration::from_secs(2)).await;

// Get the commit LSN before shutdown.
let commit_lsns = delayed_dest.get_received_commit_lsns().await;
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a fixed sleep(Duration::from_secs(2)) to wait for the apply loop to observe confirm_all() and send a status update is timing-dependent and can cause flakes. Consider replacing this with a polling loop (with timeout) that waits until the replication slot confirmed_flush_lsn has advanced to the expected commit LSN before asserting/shutting down.

Suggested change
// Wait for the apply loop to process confirmation and send status update.
tokio::time::sleep(Duration::from_secs(2)).await;
// Get the commit LSN before shutdown.
let commit_lsns = delayed_dest.get_received_commit_lsns().await;
// Wait for the apply loop to process confirmation and send status update
// by polling until we have at least one tracked commit LSN, or timing out.
let max_wait = Duration::from_secs(10);
let poll_interval = Duration::from_millis(100);
let start = tokio::time::Instant::now();
let commit_lsns = loop {
let commit_lsns = delayed_dest.get_received_commit_lsns().await;
if !commit_lsns.is_empty() {
break commit_lsns;
}
if start.elapsed() >= max_wait {
panic!("timed out waiting for commit LSNs after confirmation");
}
tokio::time::sleep(poll_interval).await;
};
// Get the commit LSN before shutdown.

Copilot uses AI. Check for mistakes.
);
}

/// Crash-resume test: kill pipeline without confirming, restart, verify data is replayed.
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc comment says "Crash-resume" / "kill pipeline" but the test uses shutdown_and_wait() (graceful shutdown). Either adjust the wording to describe a graceful stop+restart scenario, or change the test harness to actually abort/kill the pipeline task if you specifically want crash semantics.

Suggested change
/// Crash-resume test: kill pipeline without confirming, restart, verify data is replayed.
/// Graceful stop-and-restart test: shut down the pipeline without confirming, restart, and verify data is replayed.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants