Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 67 additions & 63 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,25 @@ func (cf *changeFrontier) noteAggregatorProgress(ctx context.Context, d rowenc.E

cf.maybeMarkJobIdle(resolvedSpans.Stats.RecentKvCount)

for _, resolved := range resolvedSpans.ResolvedSpans {
frontierChanged, err := cf.forwardFrontier(ctx, resolvedSpans.ResolvedSpans)
if err != nil {
return err
}

if err := cf.maybeCheckpoint(ctx, frontierChanged, resolvedSpans.ResolvedSpans); err != nil {
return err
}

cf.updateProgressSkewMetrics()

return nil
}

func (cf *changeFrontier) forwardFrontier(
ctx context.Context, spans []jobspb.ResolvedSpan,
) (bool, error) {
frontierChanged := false
for _, resolved := range spans {
// Inserting a timestamp less than the one the changefeed flow started at
// could potentially regress the job progress. This is not expected, but it
// was a bug at one point, so assert to prevent regressions.
Expand All @@ -1734,51 +1752,56 @@ func (cf *changeFrontier) noteAggregatorProgress(ctx context.Context, d rowenc.E
// job progress update closure, but it currently doesn't pass along the info
// we'd need to do it that way.
if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) {
logcrash.ReportOrPanic(cf.Ctx(), &cf.FlowCtx.Cfg.Settings.SV,
logcrash.ReportOrPanic(ctx, &cf.FlowCtx.Cfg.Settings.SV,
`got a span level timestamp %s for %s that is less than the initial high-water %s`,
redact.Safe(resolved.Timestamp), resolved.Span, redact.Safe(cf.highWaterAtStart))
continue
}
if err := cf.forwardFrontier(resolved); err != nil {
return err

changed, err := cf.frontier.ForwardResolvedSpan(resolved)
if err != nil {
return false, err
}
frontierChanged = frontierChanged || changed
}

cf.updateProgressSkewMetrics()

return nil
return frontierChanged, nil
}

func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
frontierChanged, err := cf.frontier.ForwardResolvedSpan(resolved)
if err != nil {
return err
func (cf *changeFrontier) maybeCheckpoint(
ctx context.Context, frontierChanged bool, spans []jobspb.ResolvedSpan,
) error {
maybeLogBehindSpan(ctx, "coordinator", cf.frontier, frontierChanged, &cf.FlowCtx.Cfg.Settings.SV)

updateCheckpoint, updateHighWater := cf.shouldCheckpoint(spans, frontierChanged)
if !(updateCheckpoint || updateHighWater) {
return nil
}

maybeLogBehindSpan(cf.Ctx(), "coordinator", cf.frontier, frontierChanged, &cf.FlowCtx.Cfg.Settings.SV)
// If the highwater has moved an empty checkpoint will be saved
var checkpoint *jobspb.TimestampSpansMap
if updateCheckpoint {
maxBytes := changefeedbase.SpanCheckpointMaxBytes.Get(&cf.FlowCtx.Cfg.Settings.SV)
checkpoint = cf.frontier.MakeCheckpoint(maxBytes, cf.sliMetrics.CheckpointMetrics)
}

checkpointed, err := cf.maybeCheckpointJob(resolved, frontierChanged)
if err != nil {
checkpointStart := timeutil.Now()
if err := cf.checkpointJobProgress(ctx, cf.frontier.Frontier(), checkpoint); err != nil {
return err
}
cf.js.checkpointCompleted(ctx, timeutil.Since(checkpointStart))

// Emit resolved timestamp only if we have checkpointed the job.
// Usually, this happens every time frontier changes, but we can skip some updates
// if we update frontier too rapidly.
if checkpointed {
// Keeping this after the checkpointJobProgress call will avoid
// some duplicates if a restart happens.
newResolved := cf.frontier.Frontier()
if err := cf.maybePersistFrontier(ctx); err != nil {
return err
}

// The feed's checkpoint is tracked in a map which is used to inform the
// checkpoint_progress metric which will return the lowest timestamp across
// all feeds in the scope.
cf.sliMetrics.setCheckpoint(cf.sliMetricsID, newResolved)
newResolved := cf.frontier.Frontier()

return cf.maybeEmitResolved(cf.Ctx(), newResolved)
}
// The feed's checkpoint is tracked in a map which is used to inform the
// checkpoint_progress metric which will return the lowest timestamp across
// all feeds in the scope.
cf.sliMetrics.setCheckpoint(cf.sliMetricsID, newResolved)

return nil
return cf.maybeEmitResolved(ctx, newResolved)
}

func (cf *changeFrontier) maybeMarkJobIdle(recentKVCount uint64) {
Expand All @@ -1799,54 +1822,35 @@ func (cf *changeFrontier) maybeMarkJobIdle(recentKVCount uint64) {
cf.js.job.MarkIdle(isIdle)
}

func (cf *changeFrontier) maybeCheckpointJob(
resolvedSpan jobspb.ResolvedSpan, frontierChanged bool,
) (bool, error) {
ctx, sp := tracing.ChildSpan(cf.Ctx(), "changefeed.frontier.maybe_checkpoint_job")
defer sp.Finish()
func (cf *changeFrontier) shouldCheckpoint(
resolvedSpans []jobspb.ResolvedSpan, frontierChanged bool,
) (updateCheckpoint bool, updateHighwater bool) {
if cf.knobs.ShouldCheckpointToJobRecord != nil && !cf.knobs.ShouldCheckpointToJobRecord(cf.frontier.Frontier()) {
return false, false
}

// When in a Backfill, the frontier remains unchanged at the backfill boundary
// as we receive spans from the scan request at the Backfill Timestamp
inBackfill := !frontierChanged && cf.frontier.InBackfill(resolvedSpan)
inBackfill := false
if !frontierChanged {
for _, span := range resolvedSpans {
inBackfill = inBackfill || cf.frontier.InBackfill(span)
}
}

// If we're not in a backfill, highwater progress and an empty checkpoint will
// be saved. This is throttled however we always persist progress to a schema
// boundary.
atBoundary, _, _ := cf.frontier.AtBoundary()
updateHighWater :=
!inBackfill && (atBoundary || cf.js.canCheckpointHighWatermark(frontierChanged))
updateHighwater = !inBackfill && (atBoundary || cf.js.canCheckpointHighWatermark(frontierChanged))

// During backfills or when some problematic spans stop advancing, the
// highwater mark remains fixed while other spans may significantly outpace
// it, therefore to avoid losing that progress on changefeed resumption we
// also store as many of those leading spans as we can in the job progress
updateCheckpoint := (inBackfill || cf.frontier.HasLaggingSpans(&cf.js.settings.SV)) && cf.js.canCheckpointSpans()

// If the highwater has moved an empty checkpoint will be saved
var checkpoint *jobspb.TimestampSpansMap
if updateCheckpoint {
maxBytes := changefeedbase.SpanCheckpointMaxBytes.Get(&cf.FlowCtx.Cfg.Settings.SV)
checkpoint = cf.frontier.MakeCheckpoint(maxBytes, cf.sliMetrics.CheckpointMetrics)
}

if updateCheckpoint || updateHighWater {
if cf.knobs.ShouldCheckpointToJobRecord != nil && !cf.knobs.ShouldCheckpointToJobRecord(cf.frontier.Frontier()) {
return false, nil
}
checkpointStart := timeutil.Now()
if err := cf.checkpointJobProgress(ctx, cf.frontier.Frontier(), checkpoint); err != nil {
return false, err
}
cf.js.checkpointCompleted(ctx, timeutil.Since(checkpointStart))
}

if err := cf.maybePersistFrontier(ctx); err != nil {
return false, err
}
updateCheckpoint = (inBackfill || cf.frontier.HasLaggingSpans(&cf.js.settings.SV)) && cf.js.canCheckpointSpans()

// TODO(#153462): Determine if this return value should return true
// only if the highwater was updated.
return updateCheckpoint || updateHighWater, nil
return updateCheckpoint, updateHighwater
}

const changefeedJobProgressTxnName = "changefeed job progress"
Expand Down