@@ -755,7 +755,6 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
755755 /// internal_document_ts))
756756 #[ try_stream( ok = ( Timestamp , Option <( Timestamp , InternalDocumentId ) >) , error = anyhow:: Error ) ]
757757 async fn expired_documents (
758- rt : & RT ,
759758 persistence : Arc < dyn PersistenceReader > ,
760759 cursor : RepeatableTimestamp ,
761760 min_document_snapshot_ts : RepeatableTimestamp ,
@@ -793,26 +792,13 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
793792 let Some ( prev_rev_ts) = prev_ts else {
794793 log_document_retention_scanned_document ( maybe_doc. is_none ( ) , false ) ;
795794 if maybe_doc. is_none ( ) {
796- anyhow:: ensure!(
797- ts <= Timestamp :: try_from( rt. unix_timestamp( ) . as_system_time( ) ) ?
798- . sub( * DOCUMENT_RETENTION_DELAY ) ?,
799- "Tried to delete document (id: {id}, ts: {ts}), which was out of the \
800- retention window"
801- ) ;
802795 yield ( ts, Some ( ( ts, id) ) ) ;
803796 } else {
804797 yield ( ts, None ) ;
805798 }
806799 continue ;
807800 } ;
808801
809- anyhow:: ensure!(
810- prev_rev_ts
811- <= Timestamp :: try_from( rt. unix_timestamp( ) . as_system_time( ) ) ?
812- . sub( * DOCUMENT_RETENTION_DELAY ) ?,
813- "Tried to delete document (id: {id}, ts: {prev_rev_ts}), which was out of the \
814- retention window"
815- ) ;
816802 log_document_retention_scanned_document ( maybe_doc. is_none ( ) , true ) ;
817803 yield ( ts, Some ( ( prev_rev_ts, id) ) ) ;
818804
@@ -853,7 +839,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
853839 let snapshot_ts = min_snapshot_ts;
854840
855841 tracing:: trace!( "delete_documents: about to grab chunks" ) ;
856- let expired_chunks = Self :: expired_documents ( rt , reader, cursor, min_snapshot_ts)
842+ let expired_chunks = Self :: expired_documents ( reader, cursor, min_snapshot_ts)
857843 . try_chunks2 ( * DOCUMENT_RETENTION_DELETE_CHUNK ) ;
858844 pin_mut ! ( expired_chunks) ;
859845 while let Some ( scanned_chunk) = expired_chunks. try_next ( ) . await ? {
@@ -1239,14 +1225,23 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
12391225 )
12401226 . await ?;
12411227 tracing:: trace!( "go_delete_documents: loaded checkpoint: {cursor:?}" ) ;
1228+
1229+ // Only delete documents up to (now() -
1230+ // DOCUMENT_RETENTION_DELAY), even if min_document_snapshot_ts
1231+ // is ahead of that point.
1232+ let max_end_cursor = min_document_snapshot_ts. prior_ts (
1233+ ( * min_document_snapshot_ts)
1234+ . min ( rt. generate_timestamp ( ) ?. sub ( * DOCUMENT_RETENTION_DELAY ) ?) ,
1235+ ) ?;
12421236 let ( new_cursor, scanned_documents) = Self :: delete_documents (
1243- min_document_snapshot_ts ,
1237+ max_end_cursor ,
12441238 persistence. clone ( ) ,
12451239 & rt,
12461240 cursor,
12471241 retention_rate_limiter. clone ( ) ,
12481242 )
12491243 . await ?;
1244+ max_end_cursor. prior_ts ( * new_cursor) ?;
12501245 tracing:: debug!( "go_delete_documents: Checkpointing at: {new_cursor:?}" ) ;
12511246
12521247 Self :: checkpoint (
@@ -1259,9 +1254,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
12591254 )
12601255 . await ?;
12611256
1262- // If we scanned >= the scanned batch, we probably returned
1263- // early and have more work to do, so run again immediately.
1264- is_working = scanned_documents >= * DOCUMENT_RETENTION_MAX_SCANNED_DOCUMENTS ;
1257+ is_working = new_cursor < min_document_snapshot_ts;
12651258 if is_working {
12661259 tracing:: trace!(
12671260 "go_delete_documents: processed {scanned_documents:?} rows, more to go"
@@ -1855,7 +1848,7 @@ mod tests {
18551848 }
18561849
18571850 #[ convex_macro:: test_runtime]
1858- async fn test_expired_documents ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
1851+ async fn test_expired_documents ( _rt : TestRuntime ) -> anyhow:: Result < ( ) > {
18591852 let p = TestPersistence :: new ( ) ;
18601853 let mut id_generator = TestIdGenerator :: new ( ) ;
18611854 let table: TableName = str:: parse ( "table" ) ?;
@@ -1889,7 +1882,6 @@ mod tests {
18891882 let reader = p. reader ( ) ;
18901883
18911884 let scanned_stream = LeaderRetentionManager :: < TestRuntime > :: expired_documents (
1892- & rt,
18931885 reader,
18941886 RepeatableTimestamp :: MIN ,
18951887 min_snapshot_ts,
@@ -1933,7 +1925,7 @@ mod tests {
19331925 }
19341926
19351927 #[ convex_macro:: test_runtime]
1936- async fn test_delete_document_chunk ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
1928+ async fn test_delete_document_chunk ( _rt : TestRuntime ) -> anyhow:: Result < ( ) > {
19371929 unsafe { env:: set_var ( "DOCUMENT_RETENTION_DELETE_PARALLEL" , "4" ) } ;
19381930 let p = Arc :: new ( TestPersistence :: new ( ) ) ;
19391931 let mut id_generator = TestIdGenerator :: new ( ) ;
@@ -1964,7 +1956,6 @@ mod tests {
19641956 let reader = p. reader ( ) ;
19651957
19661958 let scanned_stream = LeaderRetentionManager :: < TestRuntime > :: expired_documents (
1967- & rt,
19681959 reader,
19691960 RepeatableTimestamp :: MIN ,
19701961 min_snapshot_ts,
0 commit comments