@@ -64,9 +64,14 @@ type inspectProgressTracker struct {
6464 receivedSpanCount int
6565 // lastCheckpointedSpanCount tracks the span count at the last checkpoint write.
6666 lastCheckpointedSpanCount int
67- // spanCleaners stores PTS cleanup functions for active spans (keyed by span.String()).
68- // When a span completes, we call the cleaner to remove the protected timestamp.
69- spanCleaners map [string ]jobsprotectedts.Cleaner
67+ // activeSpanTimestamps tracks the timestamp for each active span (keyed by span.String()).
68+ // Used to determine the minimum timestamp for PTS protection.
69+ activeSpanTimestamps map [string ]hlc.Timestamp
70+ // currentPTSCleaner is the cleaner for the current PTS record (if any).
71+ // We maintain at most one PTS record, protecting the minimum timestamp.
72+ currentPTSCleaner jobsprotectedts.Cleaner
73+ // currentPTSTimestamp is the timestamp currently being protected.
74+ currentPTSTimestamp hlc.Timestamp
7075 // lastLoggedPercent tracks the last percentage milestone we logged, to avoid
7176 // spamming logs with progress updates. We log at every 1% increment.
7277 lastLoggedPercent int
@@ -98,7 +103,7 @@ func newInspectProgressTracker(
98103 codec : codec ,
99104 ptsManager : ptsManager ,
100105 }
101- t .mu .spanCleaners = make (map [string ]jobsprotectedts. Cleaner )
106+ t .mu .activeSpanTimestamps = make (map [string ]hlc. Timestamp )
102107 return t
103108}
104109
@@ -200,7 +205,7 @@ func (t *inspectProgressTracker) handleProgressUpdate(
200205 return errors .Wrapf (err , "unable to unmarshal inspect progress details" )
201206 }
202207
203- // Handle span started: set up PTS protection (outside lock since it calls external services) .
208+ // Handle span started: set up PTS protection.
204209 if ! incomingProcProgress .SpanStarted .Equal (roachpb.Span {}) {
205210 t .setupSpanPTS (ctx , incomingProcProgress .SpanStarted , incomingProcProgress .StartedAt )
206211 return nil
@@ -212,7 +217,7 @@ func (t *inspectProgressTracker) handleProgressUpdate(
212217 return err
213218 }
214219
215- // Handle span completed: call cleaners (outside lock) .
220+ // Handle span completed: call cleaner and pick a new PTS if needed .
216221 for _ , completedSpan := range meta .BulkProcessorProgress .CompletedSpans {
217222 t .cleanupSpanPTS (ctx , completedSpan )
218223 }
@@ -280,97 +285,211 @@ func (t *inspectProgressTracker) updateProgressCache(
280285}
281286
282287// terminateTracker performs any necessary cleanup when the job completes or fails.
283- // This includes cleaning up any remaining span PTS cleaners .
288+ // This includes cleaning up the PTS record if one exists .
284289func (t * inspectProgressTracker ) terminateTracker (ctx context.Context ) {
285290 if t .stopFunc != nil {
286291 t .stopFunc ()
287292 t .stopFunc = nil
288293 }
289294
290- // Clean up any remaining span cleaners (in case of early termination).
291- var cleaners map [ string ] jobsprotectedts.Cleaner
295+ // Clean up the PTS record if one exists (in case of early termination).
296+ var cleaner jobsprotectedts.Cleaner
292297 func () {
293298 t .mu .Lock ()
294299 defer t .mu .Unlock ()
295- cleaners = t .mu .spanCleaners
296- t .mu .spanCleaners = make (map [string ]jobsprotectedts.Cleaner )
300+ cleaner = t .mu .currentPTSCleaner
301+ t .mu .currentPTSCleaner = nil
302+ t .mu .currentPTSTimestamp = hlc.Timestamp {}
303+ t .mu .activeSpanTimestamps = make (map [string ]hlc.Timestamp )
297304 }()
298305
299- for spanKey , cleaner := range cleaners {
306+ if cleaner != nil {
300307 if err := cleaner (ctx ); err != nil {
301- log .Dev .Warningf (ctx , "failed to clean up PTS for span %s : %v" , spanKey , err )
308+ log .Dev .Warningf (ctx , "failed to clean up PTS during termination : %v" , err )
302309 }
303310 }
304311}
305312
306313// setupSpanPTS sets up protected timestamp protection for a span that has started
307314// processing. This is called when the coordinator receives a "span started" message
308- // from a processor. Uses TryToProtectBeforeGC which waits 80% of GC TTL before
309- // actually creating the PTS, avoiding unnecessary PTS creation for quick operations.
315+ // from a processor.
316+ //
317+ // We maintain at most one PTS record, protecting the minimum timestamp across all
318+ // active spans. Since PROTECT_AFTER mode protects all data at or after the specified
319+ // timestamp, protecting at the minimum covers all active spans. When a new span
320+ // starts with an older timestamp than the current PTS, we update the PTS to protect
321+ // the new minimum.
310322func (t * inspectProgressTracker ) setupSpanPTS (
311323 ctx context.Context , spanStarted roachpb.Span , tsToProtect hlc.Timestamp ,
312324) {
313- // Use testing hook if set.
314- if t .testingPTSProtector != nil {
315- cleaner := t .testingPTSProtector (ctx , spanStarted , tsToProtect )
316- if cleaner != nil {
317- func () {
318- t .mu .Lock ()
319- defer t .mu .Unlock ()
320- t .mu .spanCleaners [spanStarted .String ()] = cleaner
321- }()
325+ spanKey := spanStarted .String ()
326+
327+ // Check if we need to update the PTS (first span or new minimum timestamp).
328+ var needsNewPTS bool
329+ var oldCleaner jobsprotectedts.Cleaner
330+ func () {
331+ t .mu .Lock ()
332+ defer t .mu .Unlock ()
333+ t .mu .activeSpanTimestamps [spanKey ] = tsToProtect
334+ if t .mu .currentPTSCleaner == nil || tsToProtect .Less (t .mu .currentPTSTimestamp ) {
335+ needsNewPTS = true
336+ oldCleaner = t .mu .currentPTSCleaner
322337 }
338+ }()
339+
340+ if ! needsNewPTS {
341+ log .VEventf (ctx , 2 , "INSPECT: span %s at %s covered by existing PTS at %s" ,
342+ spanStarted , tsToProtect , t .mu .currentPTSTimestamp )
323343 return
324344 }
325345
326- // Extract table ID from the span key prefix .
327- _ , tableID , err := t . codec . DecodeTablePrefix ( spanStarted . Key )
328- if err != nil {
329- log .Dev .Warningf (ctx , "failed to decode table ID from span %s : %v" , spanStarted , err )
330- return
346+ // Clean up old PTS before setting new one .
347+ if oldCleaner != nil {
348+ if err := oldCleaner ( ctx ); err != nil {
349+ log .Dev .Warningf (ctx , "failed to clean up old PTS : %v" , err )
350+ }
331351 }
332352
333- cleaner := t .ptsManager .TryToProtectBeforeGC (ctx , t .job , descpb .ID (tableID ), tsToProtect )
353+ // Create new PTS at the new minimum timestamp.
354+ var cleaner jobsprotectedts.Cleaner
355+ if t .testingPTSProtector != nil {
356+ cleaner = t .testingPTSProtector (ctx , spanStarted , tsToProtect )
357+ } else {
358+ // Extract table ID from the span key prefix.
359+ _ , tableID , err := t .codec .DecodeTablePrefix (spanStarted .Key )
360+ if err != nil {
361+ log .Dev .Warningf (ctx , "failed to decode table ID from span %s: %v" , spanStarted , err )
362+ return
363+ }
364+ cleaner = t .ptsManager .TryToProtectBeforeGC (ctx , t .job , descpb .ID (tableID ), tsToProtect )
365+ }
334366
335- // Store the cleaner for later cleanup when span completes.
336367 func () {
337368 t .mu .Lock ()
338369 defer t .mu .Unlock ()
339- t .mu .spanCleaners [spanStarted .String ()] = cleaner
370+ t .mu .currentPTSCleaner = cleaner
371+ t .mu .currentPTSTimestamp = tsToProtect
340372 }()
341373
342- log .VEventf (ctx , 2 , "INSPECT: set up PTS protection for span %s at %s" , spanStarted , tsToProtect )
374+ log .VEventf (ctx , 2 , "INSPECT: set up PTS protection at minimum timestamp %s (triggered by span %s)" ,
375+ tsToProtect , spanStarted )
343376}
344377
345- // cleanupSpanPTS cleans up the protected timestamp for a completed span.
378+ // cleanupSpanPTS handles PTS management when a span completes processing .
346379// This is called when the coordinator receives a "span completed" message.
380+ //
381+ // When the oldest span (with the minimum timestamp) completes, we update the PTS
382+ // to protect the new minimum timestamp among remaining active spans. This allows
383+ // GC of data between the old and new minimum timestamps.
347384func (t * inspectProgressTracker ) cleanupSpanPTS (ctx context.Context , completedSpan roachpb.Span ) {
348385 spanKey := completedSpan .String ()
349386
350- var cleaner jobsprotectedts.Cleaner
351- var ok bool
387+ // Determine what action to take based on current state.
388+ var action ptsCleanupAction
389+ var oldCleaner jobsprotectedts.Cleaner
390+ var newMinTimestamp hlc.Timestamp
352391 func () {
353392 t .mu .Lock ()
354393 defer t .mu .Unlock ()
355- cleaner , ok = t .mu .spanCleaners [spanKey ]
356- if ok {
357- delete (t .mu .spanCleaners , spanKey )
394+
395+ completedTimestamp , ok := t .mu .activeSpanTimestamps [spanKey ]
396+ if ! ok {
397+ // Span not tracked - either PTS was never set up (e.g., AsOf was specified),
398+ // or it was already cleaned up.
399+ action = ptsActionNone
400+ return
358401 }
402+ delete (t .mu .activeSpanTimestamps , spanKey )
403+
404+ if len (t .mu .activeSpanTimestamps ) == 0 {
405+ // No more active spans - clean up PTS entirely.
406+ action = ptsActionCleanup
407+ oldCleaner = t .mu .currentPTSCleaner
408+ t .mu .currentPTSCleaner = nil
409+ t .mu .currentPTSTimestamp = hlc.Timestamp {}
410+ return
411+ }
412+
413+ // Check if the completed span was the one with the minimum timestamp.
414+ if completedTimestamp .Equal (t .mu .currentPTSTimestamp ) {
415+ // Find the new minimum timestamp among remaining spans.
416+ var foundMin bool
417+ var minSpanKey string
418+ for sk , ts := range t .mu .activeSpanTimestamps {
419+ if ! foundMin || ts .Less (newMinTimestamp ) {
420+ newMinTimestamp = ts
421+ minSpanKey = sk
422+ foundMin = true
423+ }
424+ }
425+ if foundMin && newMinTimestamp .Less (t .mu .currentPTSTimestamp ) == false {
426+ // New minimum is newer than current PTS - need to update.
427+ action = ptsActionUpdate
428+ oldCleaner = t .mu .currentPTSCleaner
429+ t .mu .currentPTSCleaner = nil
430+ // We'll extract the table ID from the span key outside the lock.
431+ // Store the span key for later processing.
432+ _ = minSpanKey // Used for logging if needed.
433+ }
434+ }
435+ // If the completed span wasn't the minimum, no PTS change needed.
359436 }()
360437
361- if ! ok {
362- // No cleaner found - either PTS was never set up (e.g., AsOf was specified),
363- // or it was already cleaned up.
438+ switch action {
439+ case ptsActionNone :
364440 return
365- }
366441
367- if err := cleaner (ctx ); err != nil {
368- log .Dev .Warningf (ctx , "failed to clean up PTS for span %s: %v" , completedSpan , err )
369- } else {
370- log .VEventf (ctx , 2 , "INSPECT: cleaned up PTS protection for span %s" , completedSpan )
442+ case ptsActionCleanup :
443+ if oldCleaner != nil {
444+ if err := oldCleaner (ctx ); err != nil {
445+ log .Dev .Warningf (ctx , "failed to clean up PTS: %v" , err )
446+ } else {
447+ log .VEventf (ctx , 2 , "INSPECT: cleaned up PTS protection (no more active spans)" )
448+ }
449+ }
450+
451+ case ptsActionUpdate :
452+ // Clean up old PTS.
453+ if oldCleaner != nil {
454+ if err := oldCleaner (ctx ); err != nil {
455+ log .Dev .Warningf (ctx , "failed to clean up old PTS: %v" , err )
456+ }
457+ }
458+
459+ // Create new PTS at the new minimum timestamp.
460+ var cleaner jobsprotectedts.Cleaner
461+ if t .testingPTSProtector != nil {
462+ cleaner = t .testingPTSProtector (ctx , completedSpan , newMinTimestamp )
463+ } else {
464+ // Extract table ID from the completed span. All spans in an INSPECT job
465+ // typically belong to the same table, so this is safe.
466+ _ , tableID , err := t .codec .DecodeTablePrefix (completedSpan .Key )
467+ if err != nil {
468+ log .Dev .Warningf (ctx , "failed to decode table ID from span %s: %v" , completedSpan , err )
469+ return
470+ }
471+ cleaner = t .ptsManager .TryToProtectBeforeGC (ctx , t .job , descpb .ID (tableID ), newMinTimestamp )
472+ }
473+ func () {
474+ t .mu .Lock ()
475+ defer t .mu .Unlock ()
476+ t .mu .currentPTSCleaner = cleaner
477+ t .mu .currentPTSTimestamp = newMinTimestamp
478+ }()
479+
480+ log .VEventf (ctx , 2 , "INSPECT: updated PTS protection to new minimum timestamp %s" , newMinTimestamp )
371481 }
372482}
373483
484+ // ptsCleanupAction indicates what action to take when a span completes.
485+ type ptsCleanupAction int
486+
487+ const (
488+ ptsActionNone ptsCleanupAction = iota // No action needed.
489+ ptsActionCleanup // Clean up PTS entirely (no more active spans).
490+ ptsActionUpdate // Update PTS to new minimum timestamp.
491+ )
492+
374493// startPeriodicUpdates launches background goroutines to periodically flush
375494// progress updates at different intervals. Returns a stop function to terminate
376495// the goroutines and wait for their completion.
0 commit comments