Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2913,25 +2913,30 @@ func TestCompactionCorruption(t *testing.T) {
now.Store(1)
d.problemSpans.InitForTesting(manifest.NumLevels, d.cmp, func() crtime.Mono { return now.Load() })

var workloadWG sync.WaitGroup
var stopWorkload atomic.Bool
defer stopWorkload.Store(true)
startWorkload := func() {
stopWorkload.Store(false)
workloadWG.Add(1)
startWorkload := func(minKey, maxKey byte) (stop func()) {
var shouldStop atomic.Bool
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer workloadWG.Done()
for !stopWorkload.Load() {
defer wg.Done()
var valSeed [32]byte
for i := range valSeed {
valSeed[i] = byte(rand.Uint32())
}
cha := rand.NewChaCha8(valSeed)
for !shouldStop.Load() {
time.Sleep(time.Millisecond)
if m := d.Metrics(); m.Compact.NumInProgress > 0 {
// Pause the workload while there are compactions happening (we run
// the risk of compactions not keeping up).
continue
}
b := d.NewBatch()
// Write a random key of the form a012345 and flush it. This will result
// in (mostly) non-overlapping tables in L0.
var valSeed [32]byte
for i := range valSeed {
valSeed[i] = byte(rand.Uint32())
}
v := make([]byte, 1024+rand.IntN(10240))
_, _ = rand.NewChaCha8(valSeed).Read(v)
key := fmt.Sprintf("%c%06d", 'a'+byte(rand.IntN(int('z'-'a'+1))), rand.IntN(1000000))
v := make([]byte, 1+int(100*rand.ExpFloat64()))
_, _ = cha.Read(v)
key := fmt.Sprintf("%c%06d", minKey+byte(rand.IntN(int(maxKey-minKey+1))), rand.IntN(1000000))
if err := b.Set([]byte(key), v, nil); err != nil {
panic(err)
}
Expand All @@ -2941,12 +2946,21 @@ func TestCompactionCorruption(t *testing.T) {
if err := d.Flush(); err != nil {
panic(err)
}
time.Sleep(10 * time.Millisecond)
}
}()
return func() {
shouldStop.Store(true)
wg.Wait()
}
}

datadriven.RunTest(t, "testdata/compaction_corruption", func(t *testing.T, td *datadriven.TestData) string {
if arg, ok := td.Arg("workload"); ok {
if len(arg.Vals) != 2 || len(arg.Vals[0]) != 1 || len(arg.Vals[1]) != 1 {
td.Fatalf(t, "workload argument must be of the form (a,z)")
}
defer startWorkload(arg.Vals[0][0], arg.Vals[1][0])()
}
// wait until fn() returns true.
wait := func(what string, fn func() bool) {
// Decrease to 5 * time.Second to repro flakes.
Expand Down Expand Up @@ -2988,13 +3002,6 @@ func TestCompactionCorruption(t *testing.T) {
require.NoError(t, writer.Close())
return fmt.Sprintf("%s -> %s", before, after)

case "start-workload":
startWorkload()

case "stop-workload":
stopWorkload.Store(true)
workloadWG.Wait()

case "wait-for-problem-span":
wait("problem span", func() bool {
return !d.problemSpans.IsEmpty()
Expand Down
48 changes: 14 additions & 34 deletions testdata/compaction_corruption
Original file line number Diff line number Diff line change
@@ -1,73 +1,53 @@
build-remote file1
set a avalue
set b bvalue
set f cvalue
----

build-remote file2-not-there
build-remote file-not-there
set d#0 dvalue
set q#0 qvalue
set w#0 wvalue
----

build-remote file3
set x#0 xvalue
set y#0 yvalue
set z#0 zvalue
----

ingest-external
file1 bounds=(a,c0)
file2 bounds=(d,w0)
file3 bounds=(x,z0)
----

start-workload
file bounds=(d,w0)
----

# Verify that a problem span is set.
wait-for-problem-span
wait-for-problem-span workload=(d,w)
----

# Verify that compactions still go through.
wait-for-compactions
wait-for-compactions workload=(a,z)
----

# Make file2 appear.
move-remote-object file2-not-there file2
# Make file appear.
move-remote-object file-not-there file
----
file2-not-there -> file2
file-not-there -> file

# Expire spans.
expire-spans
----

# Compactions should now go through and eventually there should be no external
# files.
wait-for-no-external-files
wait-for-no-external-files workload=(d,w)
----

build-remote file4-not-there
build-remote file2-not-there
set a#0 avalue
set u#0 uvalue
set z#0 zvalue
----

ingest-external
file4 bounds=(a,z0)
file2 bounds=(a,z0)
----

# Verify that a problem span is set.
wait-for-problem-span
----

stop-workload
wait-for-problem-span workload=(a,z)
----

# Make file4 appear.
move-remote-object file4-not-there file4
# Make file2 appear.
move-remote-object file2-not-there file2
----
file4-not-there -> file4
file2-not-there -> file2

# Verify that a manual compaction goes through despite the problem span.
manual-compaction
Expand Down