Skip to content

Commit 20d2c08

Browse files
xxmplusclaude
andcommitted
replay: fix race between FlushEnd and refreshMetrics
527ea2b added a FlushEnd handler that closes compactionMu.ch to wake refreshMetrics when a flush completes. Unlike CompactionEnd, the handler does not increment the completed counter because that counter only tracks compactions. This creates a gap: nextCompactionCompletes has no way to detect that a flush notification was already delivered, so it can create a new channel that nobody will ever close. The race occurs in refreshMetrics between r.d.Metrics() and nextCompactionCompletes(). If a flush completes in this window: 1. r.d.Metrics() acquires d.mu, observes flushing=true (Flush.NumInProgress=1), releases d.mu. 2. The flush goroutine acquires d.mu, completes, fires FlushEnd (under d.mu). The replay handler closes compactionMu.ch and nils it. 3. nextCompactionCompletes sees ch==nil, creates a new channel. With no counter increment to detect the flush, it returns alreadyOccurred=false. 4. compactionsAppearQuiesced uses the stale metrics from step 1 (NumInProgress=1) and returns false. 5. The loop re-enters the first select with a channel nobody will close and stepsApplied==nil (blocks forever). Permanent hang. Fix this with three changes: 1. Track both flushes and compactions in the started/completed counters. Rename compactionMu to compactionOrFlushMu and add a FlushBegin handler that increments started. The FlushEnd handler now also increments completed, matching CompactionEnd. This allows nextCompactionOrFlushCompletes to detect flush completions through the counter, eliminating the race. 2. Switch compactionsAppearQuiesced to use only the started/completed counter (started == completed) instead of checking DB.Metrics().NumInProgress. There is a scheduling window between AddInProgressLocked (which increments NumInProgress under d.mu) and CompactionBegin (which fires in a separate goroutine that must re-acquire d.mu). During this window NumInProgress > 0 but started == completed. Using NumInProgress would block quiescence detection during cascading compactions. The counter does not have this window, and the 1-second quiescence confirmation handles any false positives from compactions that are scheduled but have not yet fired CompactionBegin. 3. Fix a pre-existing tight loop in refreshMetrics: when nextCompactionOrFlushCompletes detects a completion via the counter (alreadyOccurred=true), the old code skipped the first select and immediately re-acquired d.mu to collect metrics, even though the quiescence check would be skipped anyway. Under heavy compaction load this tight loop contends with event handlers for d.mu. Fix this by catching up the counter without collecting metrics when alreadyOccurred is true, then falling through to collect metrics once caught up. Fixes #5820. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3e24138 commit 20d2c08

2 files changed

Lines changed: 95 additions & 84 deletions

File tree

replay/replay.go

Lines changed: 87 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -303,10 +303,21 @@ type Runner struct {
303303
countByReason map[string]int
304304
durationByReason map[string]time.Duration
305305
}
306-
// compactionMu holds state for tracking the number of compactions
307-
// started and completed and waking waiting goroutines when a new compaction
308-
// completes. See nextCompactionCompletes.
309-
compactionMu struct {
306+
// compactionOrFlushMu holds state for tracking the number of compactions
307+
// and flushes started and completed, and waking waiting goroutines when
308+
// one completes. See nextCompactionOrFlushCompletes.
309+
//
310+
// State transitions:
311+
// FlushBegin / CompactionBegin: started++
312+
// FlushEnd / CompactionEnd: completed++; close(ch); ch = nil
313+
//
314+
// The channel ch is created on-demand by nextCompactionOrFlushCompletes
315+
// and closed by the End handlers to wake any goroutine waiting for
316+
// activity to finish. The started/completed counters allow
317+
// nextCompactionOrFlushCompletes to detect events that occurred between
318+
// the caller's last observation and the current call, without relying
319+
// solely on the channel.
320+
compactionOrFlushMu struct {
310321
sync.Mutex
311322
ch chan struct{}
312323
started int64
@@ -359,7 +370,7 @@ func (r *Runner) Run(ctx context.Context) error {
359370

360371
// Extend the user-provided Options with extensions necessary for replay
361372
// mechanics.
362-
r.compactionMu.ch = make(chan struct{})
373+
r.compactionOrFlushMu.ch = make(chan struct{})
363374
r.Opts.AddEventListener(r.eventListener())
364375
r.writeStallMetrics.countByReason = make(map[string]int)
365376
r.writeStallMetrics.durationByReason = make(map[string]time.Duration)
@@ -398,14 +409,15 @@ func (r *Runner) refreshMetrics(ctx context.Context) error {
398409
var workloadExhausted bool
399410
var workloadExhaustedAt time.Time
400411
stepsApplied := r.stepsApplied
401-
compactionCount, alreadyCompleted, compactionCh := r.nextCompactionCompletes(0)
412+
compactionCount, alreadyCompleted, compactionCh := r.nextCompactionOrFlushCompletes(0)
402413
for {
403414
if !alreadyCompleted {
404415
select {
405416
case <-ctx.Done():
406417
return ctx.Err()
407418
case <-compactionCh:
408-
// Fall through to refreshing dbMetrics.
419+
// A compaction or flush completed. Fall through to
420+
// refreshing dbMetrics.
409421
case _, ok := <-stepsApplied:
410422
if !ok {
411423
workloadExhausted = true
@@ -418,6 +430,15 @@ func (r *Runner) refreshMetrics(ctx context.Context) error {
418430
}
419431
// Fall through to refreshing dbMetrics.
420432
}
433+
} else {
434+
// One or more completions were already detected via the
435+
// counter. Catch up to the latest count without collecting
436+
// metrics (which would acquire d.mu and contend with event
437+
// handlers). Once caught up, fall through to collect fresh
438+
// metrics and check quiescence.
439+
for alreadyCompleted {
440+
compactionCount, alreadyCompleted, _ = r.nextCompactionOrFlushCompletes(compactionCount)
441+
}
421442
}
422443

423444
m := r.d.Metrics()
@@ -434,7 +455,7 @@ func (r *Runner) refreshMetrics(ctx context.Context) error {
434455
r.metrics.totalSize.record(int64(m.DiskSpaceUsage()))
435456
r.metrics.writeThroughput.record(int64(r.metrics.writeBytes.Load()))
436457

437-
compactionCount, alreadyCompleted, compactionCh = r.nextCompactionCompletes(compactionCount)
458+
compactionCount, alreadyCompleted, compactionCh = r.nextCompactionOrFlushCompletes(compactionCount)
438459
// Consider whether replaying is complete. There are two necessary
439460
// conditions:
440461
//
@@ -453,7 +474,7 @@ func (r *Runner) refreshMetrics(ctx context.Context) error {
453474
// progress). If it appears that compactions have quiesced, pause for a
454475
// fixed duration to see if a new one is scheduled. If not, consider
455476
// compactions quiesced.
456-
if workloadExhausted && !alreadyCompleted && r.compactionsAppearQuiesced(m) {
477+
if workloadExhausted && !alreadyCompleted && r.compactionsAppearQuiesced() {
457478
select {
458479
case <-compactionCh:
459480
// A new compaction just finished; compactions have not
@@ -466,7 +487,7 @@ func (r *Runner) refreshMetrics(ctx context.Context) error {
466487
// from the moment quiescence was confirmed, rather than
467488
// re-fetching (which could race with new compactions).
468489
finalM := r.d.Metrics()
469-
if r.compactionsAppearQuiesced(finalM) {
490+
if r.compactionsAppearQuiesced() {
470491
r.metrics.quiesceDuration = time.Since(workloadExhaustedAt)
471492
r.finalMetrics = finalM
472493
return nil
@@ -476,58 +497,51 @@ func (r *Runner) refreshMetrics(ctx context.Context) error {
476497
}
477498
}
478499

479-
// compactionsAppearQuiesced returns true if the database may have quiesced, and
480-
// there likely won't be additional compactions scheduled. Detecting quiescence
481-
// is a bit fraught: The various signals that Pebble makes available are
482-
// adjusted at different points in the compaction lifecycle, and database
483-
// mutexes are dropped and acquired between them. This makes it difficult to
484-
// reliably identify when compactions quiesce.
485-
//
486-
// For example, our call to DB.Metrics() may acquire the DB.mu mutex when a
487-
// compaction has just successfully completed, but before it's managed to
488-
// schedule the next compaction (DB.mu is dropped while it attempts to acquire
489-
// the manifest lock).
490-
func (r *Runner) compactionsAppearQuiesced(m *pebble.Metrics) bool {
491-
r.compactionMu.Lock()
492-
defer r.compactionMu.Unlock()
493-
if m.Flush.NumInProgress > 0 {
494-
return false
495-
} else if m.Compact.NumInProgress > 0 && r.compactionMu.started != r.compactionMu.completed {
496-
return false
497-
}
498-
return true
500+
// compactionsAppearQuiesced returns true if all flushes and compactions
501+
// that have started (FlushBegin/CompactionBegin) have also completed
502+
// (FlushEnd/CompactionEnd). This relies solely on the started/completed
503+
// counters rather than DB.Metrics().NumInProgress, because there is a
504+
// scheduling window between when a compaction is added to the in-progress
505+
// set (under d.mu) and when CompactionBegin fires (in a separate
506+
// goroutine). During this window NumInProgress > 0 but
507+
// started == completed, and using NumInProgress would prevent quiescence
508+
// detection during cascading compactions.
509+
func (r *Runner) compactionsAppearQuiesced() bool {
510+
r.compactionOrFlushMu.Lock()
511+
defer r.compactionOrFlushMu.Unlock()
512+
return r.compactionOrFlushMu.started == r.compactionOrFlushMu.completed
499513
}
500514

501-
// nextCompactionCompletes may be used to be notified when new compactions
502-
// complete. The caller is responsible for holding on to a monotonically
503-
// increasing count representing the number of compactions that have been
504-
// observed, beginning at zero.
515+
// nextCompactionOrFlushCompletes may be used to be notified when a new
516+
// compaction or flush completes. The caller is responsible for holding on to a
517+
// monotonically increasing count representing the number of completions that
518+
// have been observed, beginning at zero.
505519
//
506-
// The caller passes their current count as an argument. If a new compaction has
507-
// already completed since their provided count, nextCompactionCompletes returns
508-
// the new count and a true boolean return value. If a new compaction has not
509-
// yet completed, it returns a channel that will be closed when the next
510-
// compaction completes. This scheme allows the caller to select{...},
511-
// performing some action on every compaction completion.
512-
func (r *Runner) nextCompactionCompletes(
520+
// The caller passes their current count as an argument. If a new compaction or
521+
// flush has already completed since their provided count,
522+
// nextCompactionOrFlushCompletes returns the new count and a true boolean
523+
// return value. If neither has completed, it returns a channel that will be
524+
// closed when the next completion occurs. This scheme allows the caller to
525+
// select{...}, performing some action on every compaction or flush completion.
526+
func (r *Runner) nextCompactionOrFlushCompletes(
513527
lastObserved int64,
514528
) (count int64, alreadyOccurred bool, ch chan struct{}) {
515-
r.compactionMu.Lock()
516-
defer r.compactionMu.Unlock()
529+
r.compactionOrFlushMu.Lock()
530+
defer r.compactionOrFlushMu.Unlock()
517531

518-
if lastObserved < r.compactionMu.completed {
519-
// There has already been another compaction since the last one observed
520-
// by this caller. Return immediately.
521-
return r.compactionMu.completed, true, nil
532+
if lastObserved < r.compactionOrFlushMu.completed {
533+
// There has already been another compaction or flush since the last
534+
// one observed by this caller. Return immediately.
535+
return r.compactionOrFlushMu.completed, true, nil
522536
}
523537

524-
// The last observed compaction is still the most recent compaction.
525-
// Return a channel that the caller can wait on to be notified when the
526-
// next compaction occurs.
527-
if r.compactionMu.ch == nil {
528-
r.compactionMu.ch = make(chan struct{})
538+
// No new completions since the caller's last observation. Return a
539+
// channel that the caller can wait on to be notified when the next
540+
// compaction or flush completes.
541+
if r.compactionOrFlushMu.ch == nil {
542+
r.compactionOrFlushMu.ch = make(chan struct{})
529543
}
530-
return lastObserved, false, r.compactionMu.ch
544+
return lastObserved, false, r.compactionOrFlushMu.ch
531545
}
532546

533547
// Wait waits for the workload replay to complete. Wait returns once the entire
@@ -657,35 +671,32 @@ func (r *Runner) eventListener() pebble.EventListener {
657671
defer r.writeStallMetrics.Unlock()
658672
r.writeStallMetrics.durationByReason[writeStallReason] += time.Since(writeStallBegin)
659673
},
674+
FlushBegin: func(_ pebble.FlushInfo) {
675+
r.compactionOrFlushMu.Lock()
676+
defer r.compactionOrFlushMu.Unlock()
677+
r.compactionOrFlushMu.started++
678+
},
660679
FlushEnd: func(_ pebble.FlushInfo) {
661-
// Close compactionMu.ch to wake refreshMetrics so it can re-check
662-
// quiescence. This is necessary because compactionsAppearQuiesced
663-
// checks Flush.NumInProgress; if the last in-flight operation is a flush
664-
// and no compaction follows, refreshMetrics would block on
665-
// compactionMu.ch forever.
666-
r.compactionMu.Lock()
667-
defer r.compactionMu.Unlock()
668-
if r.compactionMu.ch != nil {
669-
close(r.compactionMu.ch)
670-
r.compactionMu.ch = nil
680+
r.compactionOrFlushMu.Lock()
681+
defer r.compactionOrFlushMu.Unlock()
682+
r.compactionOrFlushMu.completed++
683+
if r.compactionOrFlushMu.ch != nil {
684+
close(r.compactionOrFlushMu.ch)
685+
r.compactionOrFlushMu.ch = nil
671686
}
672687
},
673688
CompactionBegin: func(_ pebble.CompactionInfo) {
674-
r.compactionMu.Lock()
675-
defer r.compactionMu.Unlock()
676-
r.compactionMu.started++
689+
r.compactionOrFlushMu.Lock()
690+
defer r.compactionOrFlushMu.Unlock()
691+
r.compactionOrFlushMu.started++
677692
},
678693
CompactionEnd: func(_ pebble.CompactionInfo) {
679-
// Keep track of the number of compactions that complete and notify
680-
// anyone waiting for a compaction to complete. See the function
681-
// nextCompactionCompletes for the corresponding receiver side.
682-
r.compactionMu.Lock()
683-
defer r.compactionMu.Unlock()
684-
r.compactionMu.completed++
685-
if r.compactionMu.ch != nil {
686-
// Signal that a compaction has completed.
687-
close(r.compactionMu.ch)
688-
r.compactionMu.ch = nil
694+
r.compactionOrFlushMu.Lock()
695+
defer r.compactionOrFlushMu.Unlock()
696+
r.compactionOrFlushMu.completed++
697+
if r.compactionOrFlushMu.ch != nil {
698+
close(r.compactionOrFlushMu.ch)
699+
r.compactionOrFlushMu.ch = nil
689700
}
690701
},
691702
}

replay/replay_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -628,11 +628,11 @@ func TestCompactionsQuiesce(t *testing.T) {
628628
}
629629

630630
// TestFlushEndNotifiesRefreshMetrics is a regression test for a hang where
631-
// refreshMetrics blocks forever on compactionMu.ch when the last in-flight
632-
// operation is a flush and no compaction follows. With
633-
// DisableAutomaticCompactions, no CompactionEnd event ever fires, so the only
634-
// way for refreshMetrics to make progress is via the FlushEnd handler closing
635-
// compactionMu.ch. Without the fix, this test hangs.
631+
// refreshMetrics blocks forever when the last in-flight operation is a flush
632+
// and no compaction follows. With DisableAutomaticCompactions, no CompactionEnd
633+
// event ever fires, so the only way for refreshMetrics to make progress is via
634+
// the FlushEnd handler incrementing compactionOrFlushMu.completed and closing
635+
// the notification channel. Without the fix, this test hangs.
636636
func TestFlushEndNotifiesRefreshMetrics(t *testing.T) {
637637
// Build a workload that consists of a single flush and no compactions.
638638
workloadFS := buildFlushOnlyWorkload(t)
@@ -667,9 +667,9 @@ func TestFlushEndNotifiesRefreshMetrics(t *testing.T) {
667667
} else if invariants.Enabled {
668668
wait = 30 * time.Second
669669
}
670-
// Without the FlushEnd handler closing compactionMu.ch, Wait would hang
671-
// forever because DisableAutomaticCompactions prevents any CompactionEnd
672-
// event from ever firing.
670+
// Without the FlushEnd handler incrementing compactionOrFlushMu.completed,
671+
// Wait would hang forever because DisableAutomaticCompactions prevents any
672+
// CompactionEnd event from ever firing.
673673
require.Eventually(t, func() bool { return done.Load() },
674674
wait, time.Millisecond, "(*replay.Runner).Wait didn't terminate")
675675
require.NoError(t, err)

0 commit comments

Comments
 (0)