@@ -100,6 +100,7 @@ use common::{
100100 GenericIndexName ,
101101 IndexId ,
102102 PersistenceVersion ,
103+ RepeatableReason ,
103104 RepeatableTimestamp ,
104105 Timestamp ,
105106 } ,
@@ -232,7 +233,7 @@ pub struct LeaderRetentionWorkers {
232233pub async fn latest_retention_min_snapshot_ts (
233234 persistence : & dyn PersistenceReader ,
234235 retention_type : RetentionType ,
235- ) -> anyhow:: Result < Timestamp > {
236+ ) -> anyhow:: Result < RepeatableTimestamp > {
236237 let _timer = match retention_type {
237238 RetentionType :: Document => latest_min_document_snapshot_timer ( ) ,
238239 RetentionType :: Index => latest_min_snapshot_timer ( ) ,
@@ -251,7 +252,14 @@ pub async fn latest_retention_min_snapshot_ts(
251252 None => Timestamp :: MIN ,
252253 _ => anyhow:: bail!( "invalid retention snapshot {min_snapshot_value:?}" ) ,
253254 } ;
254- Ok ( min_snapshot_ts)
255+ // We have the invariant:
256+ // min_document_snapshot_ts <= min_index_snapshot_ts <= max_repeatable_ts
257+ // and the latter ts is repeatable by definition, so min_snapshot_ts is
258+ // always repeatable.
259+ Ok ( RepeatableTimestamp :: new_validated (
260+ min_snapshot_ts,
261+ RepeatableReason :: MinSnapshotTsPersistence ,
262+ ) )
255263}
256264
257265const INITIAL_BACKOFF : Duration = Duration :: from_millis ( 50 ) ;
@@ -1592,16 +1600,23 @@ async fn load_snapshot_bounds(
15921600 latest_retention_min_snapshot_ts ( persistence, RetentionType :: Index ) . await ?;
15931601 let min_document_snapshot_ts =
15941602 latest_retention_min_snapshot_ts ( persistence, RetentionType :: Document ) . await ?;
1595- if * repeatable_ts < min_index_snapshot_ts {
1603+ if repeatable_ts < min_index_snapshot_ts {
15961604 anyhow:: bail!( snapshot_invalid_error(
15971605 * repeatable_ts,
1598- min_index_snapshot_ts,
1606+ * min_index_snapshot_ts,
15991607 RetentionType :: Index
16001608 ) ) ;
16011609 }
1610+ if repeatable_ts < min_document_snapshot_ts {
1611+ anyhow:: bail!( snapshot_invalid_error(
1612+ * repeatable_ts,
1613+ * min_document_snapshot_ts,
1614+ RetentionType :: Document
1615+ ) ) ;
1616+ }
16021617 Ok ( SnapshotBounds {
1603- min_index_snapshot_ts : repeatable_ts . prior_ts ( min_index_snapshot_ts ) ? ,
1604- min_document_snapshot_ts : repeatable_ts . prior_ts ( min_document_snapshot_ts ) ? ,
1618+ min_index_snapshot_ts,
1619+ min_document_snapshot_ts,
16051620 } )
16061621}
16071622
@@ -1665,22 +1680,18 @@ impl<RT: Runtime> RetentionValidator for FollowerRetentionManager<RT> {
16651680 }
16661681
16671682 async fn min_snapshot_ts ( & self ) -> anyhow:: Result < RepeatableTimestamp > {
1668- let snapshot_ts = new_static_repeatable_recent ( self . persistence . as_ref ( ) ) . await ?;
1669- let latest = snapshot_ts. prior_ts (
1683+ let latest =
16701684 latest_retention_min_snapshot_ts ( self . persistence . as_ref ( ) , RetentionType :: Index )
1671- . await ?,
1672- ) ?;
1685+ . await ?;
16731686 let mut snapshot_bounds = self . snapshot_bounds . lock ( ) ;
16741687 snapshot_bounds. advance_min_snapshot_ts ( latest) ;
16751688 Ok ( latest)
16761689 }
16771690
16781691 async fn min_document_snapshot_ts ( & self ) -> anyhow:: Result < RepeatableTimestamp > {
1679- let snapshot_ts = new_static_repeatable_recent ( self . persistence . as_ref ( ) ) . await ?;
1680- let latest = snapshot_ts. prior_ts (
1692+ let latest =
16811693 latest_retention_min_snapshot_ts ( self . persistence . as_ref ( ) , RetentionType :: Document )
1682- . await ?,
1683- ) ?;
1694+ . await ?;
16841695 let mut snapshot_bounds = self . snapshot_bounds . lock ( ) ;
16851696 snapshot_bounds. advance_min_document_snapshot_ts ( latest) ;
16861697 Ok ( latest)
0 commit comments