Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ const (
compactionKindRewrite
compactionKindIngestedFlushable
compactionKindBlobFileRewrite
compactionKindPolicyEnforcement
// compactionKindVirtualRewrite must be the last compactionKind.
// If a new kind has to be added after VirtualRewrite,
// update AllCompactionKindStrings() accordingly.
Expand Down Expand Up @@ -182,6 +183,8 @@ func (k compactionKind) String() string {
return "copy"
case compactionKindBlobFileRewrite:
return "blob-file-rewrite"
case compactionKindPolicyEnforcement:
return "policy-enforcement"
case compactionKindVirtualRewrite:
return "virtual-sst-rewrite"
}
Expand Down Expand Up @@ -2048,6 +2051,7 @@ func (d *DB) makeCompactionEnvLocked() *compactionEnv {
flushing: d.mu.compact.flushing || d.passedFlushThreshold(),
rescheduleReadCompaction: &d.mu.compact.rescheduleReadCompaction,
},
policyEnforcementFiles: &d.mu.compact.spanPolicyEnforcementFiles,
}
if !d.problemSpans.IsEmpty() {
env.problemSpans = &d.problemSpans
Expand Down
32 changes: 32 additions & 0 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type compactionEnv struct {
earliestSnapshotSeqNum base.SeqNum
inProgressCompactions []compactionInfo
readCompactionEnv readCompactionEnv
// policyEnforcementFiles contains files marked for policy enforcement
// compaction by the background policy enforcer.
policyEnforcementFiles *manifest.MarkedForCompactionSet
// problemSpans is checked by the compaction picker to avoid compactions that
// overlap an active "problem span". It can be nil when there are no problem
// spans.
Expand Down Expand Up @@ -1562,6 +1565,13 @@ func (p *compactionPickerByScore) pickAutoNonScore(env compactionEnv) (pc picked
}
}

// Check for files that violate span policies (e.g., compression settings).
if env.policyEnforcementFiles != nil && env.policyEnforcementFiles.Count() > 0 {
if pc := p.pickPolicyEnforcementCompaction(env); pc != nil {
return pc
}
}

return nil
}

Expand Down Expand Up @@ -1712,6 +1722,28 @@ func (p *compactionPickerByScore) pickRewriteCompaction(
return nil
}

// pickPolicyEnforcementCompaction attempts to construct a compaction that
// rewrites a file marked for policy enforcement. This handles files that
// violate new span policies such as compression settings. The compaction
// outputs files to the same level as the input level.
func (p *compactionPickerByScore) pickPolicyEnforcementCompaction(
env compactionEnv,
) (pc *pickedTableCompaction) {
for candidate, level := range env.policyEnforcementFiles.Ascending() {
if !p.vers.Contains(level, candidate) {
env.policyEnforcementFiles.Delete(candidate, level)
continue
}
if pc := p.pickedCompactionFromCandidateFile(candidate, env, level, level, compactionKindPolicyEnforcement); pc != nil {
// Remove the file from the set since it's now being compacted.
// This prevents picking the same file again.
env.policyEnforcementFiles.Delete(candidate, level)
return pc
}
}
return nil
}

// pickVirtualRewriteCompaction looks for backing tables that have a low percentage
// of referenced data and materializes their virtual sstables.
func (p *compactionPickerByScore) pickVirtualRewriteCompaction(
Expand Down
2 changes: 2 additions & 0 deletions compaction_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func init() {
compactionOptionalAndPriority{optional: true, priority: 20}
scheduledCompactionMap[compactionKindRewrite] =
compactionOptionalAndPriority{optional: true, priority: 10}
scheduledCompactionMap[compactionKindPolicyEnforcement] =
compactionOptionalAndPriority{optional: true, priority: 5}
}

// noopGrantHandle is used in cases that don't interact with a CompactionScheduler.
Expand Down
35 changes: 35 additions & 0 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,31 @@ func runCompactionTest(
s := blobRewriteLog.String()
return s

case "scan-policy-violations":
// Run the span policy enforcer's scan to detect violations and mark files.
if d.opts.Experimental.SpanPolicyFunc == nil {
return "no span policy configured"
}
// Wait for table stats to be loaded so that table properties
// are available for violation detection.
d.waitTableStats()

enforcer := newSpanPolicyEnforcer(d, SpanPolicyEnforcerOptions{})
enforcer.scanAll()
return ""

case "pending-policy-enforcement":
// Show files pending policy enforcement compaction.
d.mu.Lock()
count := d.mu.compact.spanPolicyEnforcementFiles.Count()
var buf strings.Builder
fmt.Fprintf(&buf, "pending: %d\n", count)
for f, level := range d.mu.compact.spanPolicyEnforcementFiles.Ascending() {
fmt.Fprintf(&buf, " L%d: %s\n", level, f.TableNum)
}
d.mu.Unlock()
return buf.String()

case "set-span-policies":
var spanPolicies []SpanPolicy
for line := range crstrings.LinesSeq(td.Input) {
Expand Down Expand Up @@ -1553,6 +1578,11 @@ func runCompactionTest(
td.Fatalf(t, "parsing minimum-mvcc-garbage-size: %s", err)
}
policy.ValueStoragePolicy.MinimumMVCCGarbageSize = int(size)
case "prefer-fast-compression":
if len(parts) != 1 {
td.Fatalf(t, "expected prefer-fast-compression with no value, got: %s", arg)
}
policy.PreferFastCompression = true
default:
td.Fatalf(t, "unknown span policy arg: %s", arg)
}
Expand Down Expand Up @@ -1658,6 +1688,11 @@ func TestCompaction(t *testing.T) {
maxVersion: FormatNewest,
verbose: true,
},
"policy_enforcement": {
minVersion: FormatNewest,
maxVersion: FormatNewest,
cmp: DefaultComparer,
},
}
datadriven.Walk(t, "testdata/compaction", func(t *testing.T, path string) {
filename := filepath.Base(path)
Expand Down
28 changes: 28 additions & 0 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/blobtest"
"github.com/cockroachdb/pebble/internal/compression"
"github.com/cockroachdb/pebble/internal/humanize"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/sstable/block"
"github.com/cockroachdb/pebble/sstable/block/blockkind"
"github.com/cockroachdb/pebble/sstable/tablefilters/bloom"
"github.com/cockroachdb/pebble/valsep"
Expand Down Expand Up @@ -1895,6 +1897,32 @@ func parseDBOptionsArgs(opts *Options, args []datadriven.CmdArg) error {
Secondary: wal.Dir{FS: opts.FS, Dirname: cmdArg.Vals[0]},
}
opts.WALFailover.EnsureDefaults()
case "compression":
var profile block.CompressionProfile
switch cmdArg.Vals[0] {
case "zstd":
profile = *block.ZstdCompression
case "snappy":
profile = *block.SnappyCompression
case "none":
profile = *block.NoCompression
case "zstd-force":
// For testing: Zstd with MinReductionPercent=0 so even small
// values are stored compressed.
profile = block.CompressionProfile{
Name: "test-zstd-force",
DataBlocks: block.SimpleCompressionSetting(compression.ZstdLevel3),
ValueBlocks: block.SimpleCompressionSetting(compression.ZstdLevel3),
OtherBlocks: compression.ZstdLevel3,
MinReductionPercent: 0,
}
default:
return errors.Newf("unrecognized compression %q", cmdArg.Vals[0])
}
for i := range opts.Levels {
p := profile
opts.Levels[i].Compression = func() *block.CompressionProfile { return &p }
}
}
}
if len(spanPolicies) > 0 {
Expand Down
14 changes: 14 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ type DB struct {

compactionScheduler CompactionScheduler

// spanPolicyEnforcer is the background goroutine that scans the LSM for tables
// that violate the current span policy and marks those files for compaction.
spanPolicyEnforcer *spanPolicyEnforcer

// During an iterator close, we may asynchronously schedule read compactions.
// We want to wait for those goroutines to finish, before closing the DB.
// compactionShedulers.Wait() should not be called while the DB.mu is held.
Expand Down Expand Up @@ -467,6 +471,12 @@ type DB struct {
// compactions which we might have to perform.
readCompactions readCompactionQueue

// spanPolicyEnforcementFiles contains files that have been marked for
// span policy enforcement compaction by the background policy enforcer.
// Unlike MarkedForCompaction, this is not persisted to the manifest
// since the policy enforcer will re-scan on restart.
// TODO(xinhaoz): Create new compaction that will utilize this set.
spanPolicyEnforcementFiles manifest.MarkedForCompactionSet
// The cumulative duration of all completed compactions since Open.
// Does not include flushes.
duration time.Duration
Expand Down Expand Up @@ -1531,6 +1541,10 @@ func (d *DB) Close() error {
// CompactionScheduler will never again call a method on the DB. Note that
// this must be called without holding d.mu.
d.compactionScheduler.Unregister()
// Stop the background policy enforcer if it was started.
if d.spanPolicyEnforcer != nil {
d.spanPolicyEnforcer.Stop()
}
// Lock the commit pipeline for the duration of Close. This prevents a race
// with makeRoomForWrite. Rotating the WAL in makeRoomForWrite requires
// dropping d.mu several times for I/O. If Close only holds d.mu, an
Expand Down
68 changes: 59 additions & 9 deletions internal/manifest/scan_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func MakeScanCursor(f *TableMetadata, level int) ScanCursor {
// been processed.
//
// The cursor is positioned such that the file would be considered "before" the
// cursor (i.e., cursor.Compare(MakeScanCursorAtFile(f)) > 0).
// cursor (i.e., cursor.Compare(MakeScanCursor(f, level)) > 0).
func MakeScanCursorAfterFile(f *TableMetadata, level int) ScanCursor {
return ScanCursor{
Level: level,
Expand All @@ -81,10 +81,10 @@ func MakeScanCursorAfterFile(f *TableMetadata, level int) ScanCursor {
}
}

// FileIsAfterCursor returns true if the given file is strictly after the cursor
// FileIsAfterCursor returns true if the given file is at or after the cursor
// position. This is useful for skipping files that have already been processed.
func (c *ScanCursor) FileIsAfterCursor(cmp base.Compare, f *TableMetadata, level int) bool {
return c.Compare(cmp, MakeScanCursorAfterFile(f, level)) < 0
return c.Compare(cmp, MakeScanCursor(f, level)) <= 0
}

// NextExternalFile returns the first external file after the cursor, returning
Expand Down Expand Up @@ -131,6 +131,21 @@ func (c *ScanCursor) NextExternalFileOnLevel(
return first
}

// firstFileInLevelIter returns the first file at or after the cursor position
// in the given level iterator. It is assumed that the iterator corresponds to
// c.Level.
func (c *ScanCursor) firstFileInLevelIter(cmp base.Compare, it *LevelIterator) *TableMetadata {
if len(c.Key) == 0 {
return it.First()
}
f := it.SeekGE(cmp, c.Key)
// Skip files that are before the cursor position.
for f != nil && !c.FileIsAfterCursor(cmp, f, c.Level) {
f = it.Next()
}
return f
}

// FirstExternalFileInLevelIter finds the first external file after the cursor
// but which starts before the endBound. It is assumed that the iterator
// corresponds to cursor.Level.
Expand All @@ -140,16 +155,51 @@ func (c *ScanCursor) FirstExternalFileInLevelIter(
it LevelIterator,
endBound base.UserKeyBoundary,
) *TableMetadata {
f := it.SeekGE(cmp, c.Key)
// Skip the file if it starts before cursor.Key or is at that same key with lower
// sequence number.
for f != nil && !c.FileIsAfterCursor(cmp, f, c.Level) {
f = it.Next()
}
f := c.firstFileInLevelIter(cmp, &it)
for ; f != nil && endBound.IsUpperBoundFor(cmp, f.Smallest().UserKey); f = it.Next() {
if f.Virtual && objstorage.IsExternalTable(objProvider, f.TableBacking.DiskFileNum) {
return f
}
}
return nil
}

// NextFile returns the first file at or after the cursor, returning the file and the
// level. If no such file exists, returns nil.
func (c *ScanCursor) NextFile(cmp base.Compare, v *Version) (_ *TableMetadata, level int) {
for !c.AtEnd() {
if f := c.nextFileOnLevel(cmp, v); f != nil {
return f, c.Level
}
// Go to the next level.
c.Key = nil
c.SeqNum = 0
c.Level++
}
return nil, NumLevels
}

// nextFileOnLevel returns the first file on c.Level which is at or after the
// cursor position.
func (c *ScanCursor) nextFileOnLevel(cmp base.Compare, v *Version) *TableMetadata {
if c.Level > 0 {
it := v.Levels[c.Level].Iter()
return c.firstFileInLevelIter(cmp, &it)
}
// For L0, we look at all sublevel iterators and take the first file
// (ordered by Smallest.UserKey, then by SeqNums.High).
var first *TableMetadata
var firstCursor ScanCursor
for _, sublevel := range v.L0SublevelFiles {
it := sublevel.Iter()
f := c.firstFileInLevelIter(cmp, &it)
if f != nil {
fc := MakeScanCursor(f, c.Level)
if first == nil || fc.Compare(cmp, firstCursor) < 0 {
first = f
firstCursor = fc
}
}
}
return first
}
35 changes: 35 additions & 0 deletions internal/manifest/scan_cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ func TestScanCursor(t *testing.T) {
}
fmt.Fprintf(&buf, " %s\n", &cursor)

case "reset":
// Reset cursor to a specific level without bounds.
level := 0
if len(fields) > 1 {
fmt.Sscanf(fields[1], "level=%d", &level)
}
cursor = ScanCursor{Level: level}
fmt.Fprintf(&buf, " %s\n", &cursor)

case "next-external-file":
f, level := cursor.NextExternalFile(cmp, objProvider, bounds, vers)
if f != nil {
Expand All @@ -104,6 +113,32 @@ func TestScanCursor(t *testing.T) {
cursor = MakeScanCursorAfterFile(f, level)
}

case "next-file":
f, level := cursor.NextFile(cmp, vers)
if f != nil {
// Verify that cursor still points to this file.
f2, level2 := cursor.NextFile(cmp, vers)
if f != f2 {
td.Fatalf(t, "NextFile returned different file")
}
if level != level2 {
td.Fatalf(t, "NextFile returned different level")
}
cursor = MakeScanCursorAfterFile(f, level)
}
fmt.Fprintf(&buf, " file: %v level: %d\n", f, level)

case "iterate-files":
for {
f, level := cursor.NextFile(cmp, vers)
if f == nil {
fmt.Fprintf(&buf, " no more files\n")
break
}
fmt.Fprintf(&buf, " file: %v level: %d\n", f, level)
cursor = MakeScanCursorAfterFile(f, level)
}

default:
td.Fatalf(t, "unknown cursor command %q", cmd)
}
Expand Down
Loading