Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions internal/rangedel/rangedel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package rangedel

import (
"sync"

"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
Expand Down Expand Up @@ -61,3 +63,47 @@ func DecodeIntoSpan(cmp base.Compare, ik base.InternalKey, v []byte, s *keyspan.
s.Keys = append(s.Keys, keyspan.Key{Trailer: ik.Trailer})
return nil
}

// Interleave takes a point iterator and a range deletion iterator, returning an
// iterator that interleaves range deletion boundary keys at the maximal
// sequence number among the stream of point keys.
//
// In addition, Interleave returns a function that may be used to retrieve the
// range tombstone overlapping the current iterator position, if any. If range
// deletion iterator is nil, the returned function is nil.
//
// The returned iterator must only be closed once.
func Interleave(
comparer *base.Comparer, iter base.InternalIterator, rangeDelIter keyspan.FragmentIterator,
) (base.InternalIterator, func() *keyspan.Span) {
// If there is no range deletion iterator, don't bother using an interleaving
// iterator. We can return iter verbatim and a nil func.
if rangeDelIter == nil {
return iter, nil
}

ii := interleavingIterPool.Get().(*interleavingIter)
ii.Init(comparer, iter, rangeDelIter, keyspan.InterleavingIterOpts{
InterleaveEndKeys: true,
})
return ii, ii.Span
}

var interleavingIterPool = sync.Pool{
New: func() interface{} {
return &interleavingIter{}
},
}

type interleavingIter struct {
keyspan.InterleavingIter
}

// Close closes the interleaving iterator and returns the interleaving iterator
// to the pool.
func (i *interleavingIter) Close() error {
err := i.InterleavingIter.Close()
*i = interleavingIter{}
interleavingIterPool.Put(i)
return err
}
73 changes: 23 additions & 50 deletions level_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/rangedel"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/sstable/blob"
"github.com/cockroachdb/pebble/sstable/block"
Expand Down Expand Up @@ -55,19 +56,11 @@ import (

// The per-level structure used by simpleMergingIter.
type simpleMergingIterLevel struct {
iter internalIterator
rangeDelIter keyspan.FragmentIterator

iterKV *base.InternalKV
tombstone *keyspan.Span
}

func (ml *simpleMergingIterLevel) setRangeDelIter(iter keyspan.FragmentIterator) {
ml.tombstone = nil
if ml.rangeDelIter != nil {
ml.rangeDelIter.Close()
}
ml.rangeDelIter = iter
iter internalIterator
// getTombstone returns the range deletion tombstone covering the current
// iterator position. getTombstone must not be called after iter is closed.
getTombstone func() *keyspan.Span
iterKV *base.InternalKV
}

type simpleMergingIter struct {
Expand Down Expand Up @@ -114,26 +107,6 @@ func (m *simpleMergingIter) init(
}
}
m.heap.init()

if m.heap.len() == 0 {
return
}
m.positionRangeDels()
}

// Positions all the rangedel iterators at or past the current top of the
// heap, using SeekGE().
func (m *simpleMergingIter) positionRangeDels() {
item := &m.heap.items[0]
for i := range m.levels {
l := &m.levels[i]
if l.rangeDelIter == nil {
continue
}
t, err := l.rangeDelIter.SeekGE(item.kv.K.UserKey)
m.err = firstError(m.err, err)
l.tombstone = t
}
}

// Returns true if not yet done.
Expand Down Expand Up @@ -161,6 +134,7 @@ func (m *simpleMergingIter) step() bool {
if l.iterKV == nil {
m.err = errors.CombineErrors(l.iter.Error(), l.iter.Close())
l.iter = nil
l.getTombstone = nil
m.heap.pop()
} else {
// Check point keys in an sstable are ordered. Although not required, we check
Expand Down Expand Up @@ -201,7 +175,6 @@ func (m *simpleMergingIter) step() bool {
}
return false
}
m.positionRangeDels()
return true
}

Expand Down Expand Up @@ -283,12 +256,16 @@ func (m *simpleMergingIter) handleVisiblePoint(
// iterators must be positioned at a key > item.key.
for level := item.index + 1; level < len(m.levels); level++ {
lvl := &m.levels[level]
if lvl.rangeDelIter == nil || lvl.tombstone.Empty() {
if lvl.getTombstone == nil {
continue
}
if lvl.tombstone.Contains(m.heap.cmp, item.kv.K.UserKey) && lvl.tombstone.CoversAt(m.snapshot, item.kv.K.SeqNum()) {
t := lvl.getTombstone()
if t.Empty() {
continue
}
if t.Contains(m.heap.cmp, item.kv.K.UserKey) && t.CoversAt(m.snapshot, item.kv.K.SeqNum()) {
m.err = errors.Errorf("tombstone %s in %s deletes key %s in %s",
lvl.tombstone.Pretty(m.formatKey), lvl.iter, item.kv.K.Pretty(m.formatKey),
t.Pretty(m.formatKey), lvl.iter, item.kv.K.Pretty(m.formatKey),
l.iter)
return false
}
Expand Down Expand Up @@ -598,28 +575,22 @@ func checkLevelsInternal(c *checkConfig) (err error) {
err = firstError(err, l.iter.Close())
l.iter = nil
}
if l.rangeDelIter != nil {
l.rangeDelIter.Close()
l.rangeDelIter = nil
}
}
}()

memtables := c.readState.memtables
for i := len(memtables) - 1; i >= 0; i-- {
mem := memtables[i]
var iter internalIterator
var mil simpleMergingIterLevel
// For ingestedFlushable, we need to pass the blob value fetcher to allow
// reading values from blob files.
if ingested, ok := mem.flushable.(*ingestedFlushable); ok {
iter = ingested.newIterInternal(nil, internalOpts)
mil.iter = ingested.newIterInternal(nil, internalOpts)
} else {
iter = mem.newIter(nil)
mil.iter = mem.newIter(nil)
}
mlevels = append(mlevels, simpleMergingIterLevel{
iter: iter,
rangeDelIter: mem.newRangeDelIter(nil),
})
mil.iter, mil.getTombstone = rangedel.Interleave(c.comparer, mil.iter, mem.newRangeDelIter(nil))
mlevels = append(mlevels, mil)
}

current := c.readState.current
Expand Down Expand Up @@ -651,8 +622,9 @@ func checkLevelsInternal(c *checkConfig) (err error) {
li := &levelIter{}
li.init(context.Background(), iterOpts, c.comparer, c.newIters, manifestIter,
manifest.L0Sublevel(sublevel), internalOpts)
li.initRangeDel(&mlevelAlloc[0])
li.interleaveRangeDels = true
mlevelAlloc[0].iter = li
mlevelAlloc[0].getTombstone = li.getTombstone
mlevelAlloc = mlevelAlloc[1:]
for f := range current.L0SublevelFiles[sublevel].All() {
allTables = append(allTables, f)
Expand All @@ -666,8 +638,9 @@ func checkLevelsInternal(c *checkConfig) (err error) {
li := &levelIter{}
li.init(context.Background(), iterOpts, c.comparer, c.newIters,
current.Levels[level].Iter(), manifest.Level(level), internalOpts)
li.initRangeDel(&mlevelAlloc[0])
li.interleaveRangeDels = true
mlevelAlloc[0].iter = li
mlevelAlloc[0].getTombstone = li.getTombstone
mlevelAlloc = mlevelAlloc[1:]
for f := range current.Levels[level].All() {
allTables = append(allTables, f)
Expand Down
79 changes: 52 additions & 27 deletions level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,23 @@ type levelIter struct {
files manifest.LevelIterator
err error

// interleaveRangeDels is set to true if the levelIter is configured to
// interleave range deletions among point keys, using li.interleaving.
interleaveRangeDels bool
// interleaving is used when interleaveRangeDels=true to interleave the
// boundaries of range deletions among point keys. When the level iterator
// is used by a merging iterator, this ensures that we don't advance to a
// new file until the range deletions are no longer needed by other levels.
interleaving keyspan.InterleavingIter
// When rangeDelIterSetter != nil, the caller requires that this function
// gets called with a range deletion iterator whenever the current file
// changes. The iterator is relinquished to the caller which is responsible
// changes. The iterator is relinquished to the caller which is responsible
// for closing it.
//
// When rangeDelIterSetter != nil, the levelIter will also interleave the
// boundaries of range deletions among point keys.
// When rangeDelIterSetter != nil, interleaveRangeDels must be true (but the
// inverse is not true).
rangeDelIterSetter rangeDelIterSetter

// interleaving is used when rangeDelIterFn != nil to interleave the
// boundaries of range deletions among point keys. When the leve iterator is
// used by a merging iterator, this ensures that we don't advance to a new
// file until the range deletions are no longer needed by other levels.
interleaving keyspan.InterleavingIter

// internalOpts holds the internal iterator options to pass to the table
// cache when constructing new table iterators.
internalOpts internalIterOpts
Expand Down Expand Up @@ -170,11 +172,12 @@ func (l *levelIter) init(

// initRangeDel puts the level iterator into a mode where it interleaves range
// deletion boundaries with point keys and provides a range deletion iterator
// (through rangeDelIterFn) whenever the current file changes.
// (through rangeDelIterSetter) whenever the current file changes.
//
// The range deletion iterator passed to rangeDelIterFn is relinquished to the
// implementor who is responsible for closing it.
// The range deletion iterator passed to rangeDelIterSetter is relinquished to
// the implementor who is responsible for closing it.
func (l *levelIter) initRangeDel(rangeDelSetter rangeDelIterSetter) {
l.interleaveRangeDels = true
l.rangeDelIterSetter = rangeDelSetter
}

Expand Down Expand Up @@ -571,7 +574,7 @@ func (l *levelIter) loadFile(file *manifest.TableMetadata, dir int) loadFileRetu
}

iterKinds := iterPointKeys
if l.rangeDelIterSetter != nil {
if l.interleaveRangeDels {
iterKinds |= iterRangeDeletions
}

Expand All @@ -584,30 +587,39 @@ func (l *levelIter) loadFile(file *manifest.TableMetadata, dir int) loadFileRetu
return noFileLoaded
}
l.iter = iters.Point()
if l.rangeDelIterSetter != nil && iters.rangeDeletion != nil {
if l.interleaveRangeDels && iters.rangeDeletion != nil {
// If this file has range deletions, interleave the bounds of the
// range deletions among the point keys. When used with a
// mergingIter, this ensures we don't move beyond a file with range
// deletions until its range deletions are no longer relevant.
//
// For now, we open a second range deletion iterator. Future work
// will avoid the need to open a second range deletion iterator, and
// avoid surfacing the file's range deletion iterator via rangeDelIterFn.
itersForBounds, err := l.newIters(l.ctx, l.iterFile, &l.tableOpts, l.internalOpts, iterRangeDeletions)
if err != nil {
l.iter = nil
l.err = errors.CombineErrors(err, iters.CloseAll())
return noFileLoaded
}
l.interleaving.Init(l.comparer, l.iter, itersForBounds.RangeDeletion(), keyspan.InterleavingIterOpts{
l.interleaving.Init(l.comparer, l.iter, iters.rangeDeletion, keyspan.InterleavingIterOpts{
LowerBound: l.tableOpts.LowerBound,
UpperBound: l.tableOpts.UpperBound,
InterleaveEndKeys: true,
})
l.iter = &l.interleaving

// Relinquish iters.rangeDeletion to the caller.
l.rangeDelIterSetter.setRangeDelIter(iters.rangeDeletion)
// Additionally, when interleaving range deletions, optionally the
// caller may request a copy of each range deletion iterator we
// open by providing a rangeDelIterSetter.
//
// The levelIter requires its own range deletion iterator for
// interleaving bounds, so we open a second range deletion iterator
// that's solely owned by the caller and we relinquish it to the
// caller through calling setRangeDelIter.
if l.rangeDelIterSetter != nil {
// TODO(jackson): This should be avoidable by teaching the
// merging iterator to read range deletions from
// levelIter.getTombstone() but requires some delicate
// refactoring. See the unmerged PR #3600.
itersForSetter, err := l.newIters(l.ctx, l.iterFile, &l.tableOpts, l.internalOpts, iterRangeDeletions)
if err != nil {
l.iter = nil
l.err = errors.CombineErrors(err, iters.CloseAll())
return noFileLoaded
}
l.rangeDelIterSetter.setRangeDelIter(itersForSetter.rangeDeletion)
}
}
if treesteps.Enabled && treesteps.IsRecording(l) {
treesteps.NodeUpdated(l, fmt.Sprintf("file %s loaded", l.iterFile.TableNum))
Expand Down Expand Up @@ -1027,6 +1039,19 @@ func (l *levelIter) exhaustedBackward() {
l.exhaustedDir = -1
}

// getTombstone retrieves the range tombstone covering the current iterator
// position. If there is none, or if the iterator is not configured to
// interleave range deletions, getTombstone returns nil.
//
// The returned Span's memory is guaranteed to be valid until the iterator is
// moved beyond the Span's interleaved boundary keys.
func (l *levelIter) getTombstone() *keyspan.Span {
if l.iter != &l.interleaving {
return nil
}
return l.interleaving.Span()
}

func (l *levelIter) Error() error {
if l.err != nil || l.iter == nil {
return l.err
Expand Down