test(core): Add integration tests for async flush confirmation#644
test(core): Add integration tests for async flush confirmation#644flak153 wants to merge 1 commit intosupabase:mainfrom
Conversation
Add DelayedConfirmDestination test utility that defers flush confirmation until test code explicitly calls confirm_all(). Events are tracked immediately upon receipt (before flush confirmation), enabling tests to wait for events even when the async result is deferred. Integration tests: - delays_slot_advancement: slot doesn't advance without confirmation - confirm_advances_slot: slot advances after confirmation - crash_resume_no_data_loss: unconfirmed data replays after restart - selective_confirmation: first tx confirmed advances slot, second tx unconfirmed held back
There was a problem hiding this comment.
Pull request overview
Adds integration test coverage for the async flush confirmation behavior (introduced in #628) by introducing a destination test utility that can defer flush confirmation until the test explicitly releases it.
Changes:
- Add
DelayedConfirmDestinationtest utility that records events immediately but deferswrite_eventsflush completion untilconfirm_all(). - Add integration tests asserting replication slot advancement behavior with/without confirmation, including restart replay behavior.
- Export the new test utility module from
etl::test_utils.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
etl/tests/pipeline_destination_controlled_flush.rs |
New integration tests that validate slot LSN advancement and replay behavior under deferred confirmations. |
etl/src/test_utils/mod.rs |
Exposes the new delayed-confirm destination utility. |
etl/src/test_utils/delayed_confirm_destination.rs |
Implements a test destination that defers async flush completion while still allowing event-based waits. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| { | ||
| let inner = self.inner.read().await; | ||
| for table_id in table_ids { | ||
| inner | ||
| .state_store | ||
| .store_table_mapping( | ||
| table_id, | ||
| format!("delayed_confirm_destination_table_{}", table_id.0), | ||
| ) | ||
| .await?; | ||
| } |
There was a problem hiding this comment.
write_events holds a tokio::RwLock read guard (inner) across await while calling state_store.store_table_mapping(...). This can block confirm_all()/other writers and can lead to unnecessary contention or deadlocks if the store implementation also waits on this destination. Clone the state_store out of the lock (as done in write_table_rows) and drop the guard before awaiting the store calls.
| { | |
| let inner = self.inner.read().await; | |
| for table_id in table_ids { | |
| inner | |
| .state_store | |
| .store_table_mapping( | |
| table_id, | |
| format!("delayed_confirm_destination_table_{}", table_id.0), | |
| ) | |
| .await?; | |
| } | |
| let state_store = { | |
| let inner = self.inner.read().await; | |
| inner.state_store.clone() | |
| }; | |
| for table_id in table_ids { | |
| state_store | |
| .store_table_mapping( | |
| table_id, | |
| format!("delayed_confirm_destination_table_{}", table_id.0), | |
| ) | |
| .await?; |
| // Wait briefly for the commit to be processed. | ||
| tokio::time::sleep(Duration::from_secs(2)).await; | ||
|
|
||
| // Do NOT confirm — the slot should not have advanced to the commit LSN. | ||
| let commit_lsns = delayed_dest.get_received_commit_lsns().await; |
There was a problem hiding this comment.
These tests rely on fixed sleep(Duration::from_secs(2)) to wait for commits to be processed. This makes the tests slower and can be flaky on slower CI (2s may be insufficient) or overly conservative on fast machines. Prefer waiting deterministically (e.g., poll confirmed_flush_lsn until it reaches/doesn't reach a target with a bounded timeout, or add a TimedNotify-based helper in DelayedConfirmDestination for commit-LSN receipt).
| // Wait briefly for the commit to be processed. | |
| tokio::time::sleep(Duration::from_secs(2)).await; | |
| // Do NOT confirm — the slot should not have advanced to the commit LSN. | |
| let commit_lsns = delayed_dest.get_received_commit_lsns().await; | |
| // Wait deterministically for at least one commit LSN to be recorded, | |
| // with a bounded timeout to avoid hanging the test. | |
| let commit_lsns = tokio::time::timeout(Duration::from_secs(10), async { | |
| loop { | |
| let lsns = delayed_dest.get_received_commit_lsns().await; | |
| if !lsns.is_empty() { | |
| break lsns; | |
| } | |
| tokio::time::sleep(Duration::from_millis(50)).await; | |
| } | |
| }) | |
| .await | |
| .expect("timed out waiting for commit LSNs to be recorded"); | |
| // Do NOT confirm — the slot should not have advanced to the commit LSN. |
| // Wait for the apply loop to process confirmation and send status update. | ||
| tokio::time::sleep(Duration::from_secs(2)).await; | ||
|
|
||
| // Get the commit LSN before shutdown. | ||
| let commit_lsns = delayed_dest.get_received_commit_lsns().await; |
There was a problem hiding this comment.
Using a fixed sleep(Duration::from_secs(2)) to wait for the apply loop to observe confirm_all() and send a status update is timing-dependent and can cause flakes. Consider replacing this with a polling loop (with timeout) that waits until the replication slot confirmed_flush_lsn has advanced to the expected commit LSN before asserting/shutting down.
| // Wait for the apply loop to process confirmation and send status update. | |
| tokio::time::sleep(Duration::from_secs(2)).await; | |
| // Get the commit LSN before shutdown. | |
| let commit_lsns = delayed_dest.get_received_commit_lsns().await; | |
| // Wait for the apply loop to process confirmation and send status update | |
| // by polling until we have at least one tracked commit LSN, or timing out. | |
| let max_wait = Duration::from_secs(10); | |
| let poll_interval = Duration::from_millis(100); | |
| let start = tokio::time::Instant::now(); | |
| let commit_lsns = loop { | |
| let commit_lsns = delayed_dest.get_received_commit_lsns().await; | |
| if !commit_lsns.is_empty() { | |
| break commit_lsns; | |
| } | |
| if start.elapsed() >= max_wait { | |
| panic!("timed out waiting for commit LSNs after confirmation"); | |
| } | |
| tokio::time::sleep(poll_interval).await; | |
| }; | |
| // Get the commit LSN before shutdown. |
| ); | ||
| } | ||
|
|
||
| /// Crash-resume test: kill pipeline without confirming, restart, verify data is replayed. |
There was a problem hiding this comment.
The doc comment says "Crash-resume" / "kill pipeline" but the test uses shutdown_and_wait() (graceful shutdown). Either adjust the wording to describe a graceful stop+restart scenario, or change the test harness to actually abort/kill the pipeline task if you specifically want crash semantics.
| /// Crash-resume test: kill pipeline without confirming, restart, verify data is replayed. | |
| /// Graceful stop-and-restart test: shut down the pipeline without confirming, restart, and verify data is replayed. |
Summary
Adds integration tests for the async flush confirmation behavior introduced in #628.
DelayedConfirmDestinationtest utility that defers flush confirmation until test code explicitly callsconfirm_all()Tests
delays_slot_advancementconfirm_advances_slotcrash_resume_no_data_lossselective_confirmationAll 4 tests pass against current main.