@@ -26,6 +26,7 @@ import (
2626 "github.com/cockroachdb/cockroach/pkg/sql"
2727 "github.com/cockroachdb/cockroach/pkg/util"
2828 "github.com/cockroachdb/cockroach/pkg/util/besteffort"
29+ "github.com/cockroachdb/cockroach/pkg/util/buildutil"
2930 "github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
3031 "github.com/cockroachdb/cockroach/pkg/util/encoding"
3132 "github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -234,8 +235,11 @@ type RestorableBackup struct {
234235// ListRestorableBackups lists all restorable backups from the backup index
235236// within the specified time interval (inclusive at both ends). The store should
236237// be rooted at the default collection URI (the one that contains the
237- // `metadata/` directory).
238- //
238+ // `metadata/` directory). A maxCount of 0 indicates no limit on the number
239+ // of backups to return, otherwise, if the number of backups found exceeds
240+ // maxCount, iteration will stop early and the boolean return value will be
241+ // set to true.
242+
239243// NB: Duplicate end times within a chain are elided, as IDs only identify
240244// unique end times within a chain. For the purposes of determining which
241245// backup's metadata we use to populate the fields, we always pick the backup
@@ -248,86 +252,113 @@ type RestorableBackup struct {
248252// milliseconds. As such, it is possible that a backup with an end time slightly
249253// ahead of `before` may be included in the results.
250254func ListRestorableBackups (
251- ctx context.Context , store cloud.ExternalStorage , after , before time.Time ,
252- ) ([]RestorableBackup , error ) {
253- idxInRange , err := listIndexesWithinRange (ctx , store , after , before )
254- if err != nil {
255- return nil , err
256- }
257-
258- var filteredIndexes []parsedIndex
259- for _ , index := range idxInRange {
260- if len (filteredIndexes ) > 0 {
261- last := & filteredIndexes [len (filteredIndexes )- 1 ]
262- // Elide duplicate end times within a chain. Because the indexes are
263- // sorted with ascending start times breaking ties, keeping the last one
264- // ensures that we keep the non-compacted backup.
265- if last .end .Equal (index .end ) && last .fullEnd .Equal (index .fullEnd ) {
266- last .filePath = index .filePath
267- continue
255+ ctx context.Context , store cloud.ExternalStorage , newerThan , olderThan time.Time , maxCount uint ,
256+ ) ([]RestorableBackup , bool , error ) {
257+ ctx , trace := tracing .ChildSpan (ctx , "backupinfo.ListRestorableBackups" )
258+ defer trace .Finish ()
259+
260+ var filteredIdxs []parsedIndex
261+ var exceededMax bool
262+ if err := listIndexesWithinRange (
263+ ctx , store , newerThan , olderThan ,
264+ func (index parsedIndex ) error {
265+ if len (filteredIdxs ) > 0 {
266+ lastIdx := len (filteredIdxs ) - 1
267+ // Elide duplicate end times within a chain. Because indexes are fetched
268+ // in descending order with ties broken by ascending start time, keeping
269+ // the last one ensures that we keep the non-compacted backup.
270+ if filteredIdxs [lastIdx ].end .Equal (index .end ) &&
271+ filteredIdxs [lastIdx ].fullEnd .Equal (index .fullEnd ) {
272+ if buildutil .CrdbTestBuild {
273+ // Sanity check that start times are in ascending order for indexes
274+ // with the same end time.
275+ if index .start .Before (filteredIdxs [lastIdx ].start ) {
276+ return errors .Newf (
277+ "expected index start times to be in ascending order: %s vs %s" ,
278+ index .start , filteredIdxs [lastIdx ].start ,
279+ )
280+ }
281+ }
282+ filteredIdxs [lastIdx ] = index
283+ return nil
284+ }
268285 }
269- }
270- filteredIndexes = append (filteredIndexes , index )
286+ filteredIdxs = append (filteredIdxs , index )
287+ if maxCount > 0 && uint (len (filteredIdxs )) > maxCount {
288+ exceededMax = true
289+ return cloud .ErrListingDone
290+ }
291+ return nil
292+ },
293+ ); err != nil {
294+ return nil , false , err
295+ }
296+ if exceededMax {
297+ filteredIdxs = filteredIdxs [:maxCount ]
271298 }
272299
273- backups := make ([]RestorableBackup , 0 , len (filteredIndexes ))
274- for _ , index := range filteredIndexes {
275- reader , _ , err := store .ReadFile (ctx , index .filePath , cloud.ReadOptions {})
276- if err != nil {
277- return nil , errors .Wrapf (err , "reading index file %s" , index .filePath )
278- }
279-
280- bytes , err := ioctx .ReadAll (ctx , reader )
281- besteffort .Error (ctx , "cleanup-index-reader" , func (ctx context.Context ) error {
282- return reader .Close (ctx )
283- })
300+ ctx , readTrace := tracing .ChildSpan (ctx , "backupinfo.ReadIndexFiles" )
301+ defer readTrace .Finish ()
302+ backups , err := util .MapE (filteredIdxs , func (index parsedIndex ) (RestorableBackup , error ) {
303+ idxMeta , err := readIndexFile (ctx , store , index .filePath )
284304 if err != nil {
285- return nil , errors . Wrapf ( err , "reading index file %s" , index . filePath )
305+ return RestorableBackup {}, err
286306 }
287-
288- idxMeta := backuppb.BackupIndexMetadata {}
289- if err := protoutil .Unmarshal (bytes , & idxMeta ); err != nil {
290- return nil , errors .Wrapf (err , "unmarshalling index file %s" , index .filePath )
291- }
292-
293- backups = append (backups , RestorableBackup {
307+ return RestorableBackup {
294308 ID : encodeBackupID (index .fullEnd , index .end ),
295309 EndTime : idxMeta .EndTime ,
296310 MVCCFilter : idxMeta .MVCCFilter ,
297311 RevisionStartTime : idxMeta .RevisionStartTime ,
298- })
312+ }, nil
313+ })
314+ if err != nil {
315+ return nil , false , err
299316 }
300- return backups , nil
317+
318+ return backups , exceededMax , nil
301319}
302320
303321type parsedIndex struct {
304- filePath string // path to the index relative to the backup collection root
305- fullEnd , end time.Time
322+ filePath string // path to the index relative to the backup collection root
323+ fullEnd , start , end time.Time
306324}
307325
308326// listIndexesWithinRange lists all index files whose end time falls within the
309327// specified time interval (inclusive at both ends). The store should be rooted
310328// at the default collection URI (the one that contains the `metadata/`
311- // directory). The returned index filenames are relative to the `metadata/index`
312- // directory and sorted in descending order by end time, with ties broken by
313- // ascending start time.
329+ // directory). The indexes are passed to the callback in descending end time
330+ // order, with ties broken by ascending start time order. To stop iteration
331+ // early, the callback can return cloud.ErrListingDone. Any other returned error
332+ // by the callback will be propagated back to the caller.
314333//
315334// NB: Filtering is applied to backup end times truncated to tens of
316335// milliseconds.
317336func listIndexesWithinRange (
318- ctx context.Context , store cloud.ExternalStorage , after , before time.Time ,
319- ) ([]parsedIndex , error ) {
337+ ctx context.Context ,
338+ store cloud.ExternalStorage ,
339+ newerThan , olderThan time.Time ,
340+ cb func (parsedIndex ) error ,
341+ ) error {
320342 // First, find the full backup end time prefix we begin listing from. Since
321343 // full backup end times are stored in descending order in the index, we add
322344 // ten milliseconds (the maximum granularity of the timestamp encoding) to
323345 // ensure an inclusive start.
324- maxEndTime := before .Add (10 * time .Millisecond )
346+ maxEndTime := olderThan .Add (10 * time .Millisecond )
325347 maxEndTimeSubdir , err := endTimeToIndexSubdir (maxEndTime )
326348 if err != nil {
327- return nil , err
349+ return err
328350 }
329351
330- var idxInRange []parsedIndex
352+ // We don't immediately emit an index when we see it; instead, we hold onto
353+ // it until the next index is seen. This is because we may need to swap with
354+ // the next index in order to maintain descending end tinme order. This occurs
355+ // when incremental backups are created and appended to the previous chain
356+ // while the full backup for a new chain is still being run. Note that this
357+ // swapping of the last two seen indexes only maintains a sorted order due to
358+ // the way the backup index is sorted and the invariant that the existence of
359+ // an incremental backup in a chain ensures that no backup in an older chain
360+ // can have an end time greater than or equal to the incremental's end time.
361+ var pendingEmit parsedIndex
331362 err = store .List (
332363 ctx ,
333364 backupbase .BackupIndexDirectoryPath + "/" ,
@@ -342,39 +373,52 @@ func listIndexesWithinRange(
342373 }
343374 // Once we see an *incremental* backup with an end time before `after`, we
344375 // can stop iterating as we have found all backups within the time range.
345- if ! start .IsZero () && end .Before (after ) {
376+ if ! start .IsZero () && end .Before (newerThan ) {
346377 return cloud .ErrListingDone
347378 }
348- if end .After (before ) || end .Before (after ) {
379+ if end .After (olderThan ) || end .Before (newerThan ) {
349380 return nil
350381 }
351- entry := parsedIndex {
382+ nextEntry := parsedIndex {
352383 filePath : path .Join (backupbase .BackupIndexDirectoryPath , file ),
353384 fullEnd : full ,
385+ start : start ,
354386 end : end ,
355387 }
356- // We may need to swap with the last index appended to maintain descending
357- // end time order. This occurs when incremental backups are created and
358- // appended to the previous chain while the full backup for a new chain
359- // is still being run. Note that this swapping of the last two elements
360- // only maintains a sorted order due to the way the backup index is sorted
361- // and the invariant that the existence of an incremental backup in a
362- // chain ensures that no backup in an older chain can have an end time
363- // greater than or equal to the incremental's end time.
364- if len (idxInRange ) > 0 && end .After (idxInRange [len (idxInRange )- 1 ].end ) {
365- tmp := idxInRange [len (idxInRange )- 1 ]
366- idxInRange [len (idxInRange )- 1 ] = entry
367- entry = tmp
388+ if pendingEmit == (parsedIndex {}) {
389+ pendingEmit = nextEntry
390+ return nil
391+ }
392+ if ! nextEntry .end .After (pendingEmit .end ) {
393+ // The pending emit has an end time less than or equal to the new entry,
394+ // so we can guarantee that the pending emit is the next index to be
395+ // flushed.
396+ if err := cb (pendingEmit ); err != nil {
397+ return err
398+ }
399+ pendingEmit = nextEntry
400+ } else {
401+ // This new entry does have an end time newer than the last index, so we
402+ // need to emit this one first and continue holding onto that previous
403+ // index.
404+ if err := cb (nextEntry ); err != nil {
405+ return err
406+ }
368407 }
369- idxInRange = append (idxInRange , entry )
370408 return nil
371409 },
372410 )
373411 if err != nil && ! errors .Is (err , cloud .ErrListingDone ) {
374- return nil , err
412+ return err
375413 }
376414
377- return idxInRange , nil
415+ // Loop has ended, we can flush any pending index.
416+ if pendingEmit != (parsedIndex {}) {
417+ if err := cb (pendingEmit ); err != nil && ! errors .Is (err , cloud .ErrListingDone ) {
418+ return err
419+ }
420+ }
421+ return nil
378422}
379423
380424// GetBackupTreeIndexMetadata concurrently retrieves the index metadata for all
@@ -693,6 +737,33 @@ func parseTimesFromIndexFilepath(filepath string) (fullEnd, start, end time.Time
693737 return fullEnd , start , end , nil
694738}
695739
740+ // readIndexFile reads and unmarshals the backup index file at the given path.
741+ // store should be rooted at the default collection URI (the one that contains
742+ // the `metadata/` directory). The indexFilePath is relative to the collection
743+ // URI.
744+ func readIndexFile (
745+ ctx context.Context , store cloud.ExternalStorage , indexFilePath string ,
746+ ) (backuppb.BackupIndexMetadata , error ) {
747+ reader , _ , err := store .ReadFile (ctx , indexFilePath , cloud.ReadOptions {})
748+ if err != nil {
749+ return backuppb.BackupIndexMetadata {}, errors .Wrapf (err , "reading index file %s" , indexFilePath )
750+ }
751+ defer besteffort .Error (ctx , "cleanup-index-reader" , func (ctx context.Context ) error {
752+ return reader .Close (ctx )
753+ })
754+
755+ bytes , err := ioctx .ReadAll (ctx , reader )
756+ if err != nil {
757+ return backuppb.BackupIndexMetadata {}, errors .Wrapf (err , "reading index file %s" , indexFilePath )
758+ }
759+
760+ idxMeta := backuppb.BackupIndexMetadata {}
761+ if err := protoutil .Unmarshal (bytes , & idxMeta ); err != nil {
762+ return backuppb.BackupIndexMetadata {}, errors .Wrapf (err , "unmarshalling index file %s" , indexFilePath )
763+ }
764+ return idxMeta , nil
765+ }
766+
696767// encodeBackupID generates a backup ID for a backup identified by its parent
697768// full end time and its own end time.
698769func encodeBackupID (fullEnd time.Time , backupEnd time.Time ) string {
0 commit comments