Skip to content

Commit 6f149ec

Browse files
mysql_cdc: add additional tests around parallel snapshot
1 parent 527e4e2 commit 6f149ec

1 file changed

Lines changed: 100 additions & 37 deletions

File tree

internal/impl/mysql/integration_test.go

Lines changed: 100 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1408,21 +1408,23 @@ func TestIntegrationMySQLCDCRepeatedConnectionDrops(t *testing.T) {
14081408
}
14091409
}
14101410

1411-
func TestIntegrationMySQLCDCConcurrentSnapshot(t *testing.T) {
1411+
func TestIntegrationMySQLCDCParallelSnapshot(t *testing.T) {
14121412
dsn, db := setupTestWithMySQLVersion(t, "8.0")
14131413

14141414
db.Exec(`CREATE TABLE IF NOT EXISTS snap_foo (id INT AUTO_INCREMENT PRIMARY KEY)`)
14151415
db.Exec(`CREATE TABLE IF NOT EXISTS snap_bar (id INT AUTO_INCREMENT PRIMARY KEY)`)
14161416
db.Exec(`CREATE TABLE IF NOT EXISTS snap_baz (id INT AUTO_INCREMENT PRIMARY KEY)`)
14171417

1418-
want := 3000
1419-
for range 1000 {
1418+
t.Log("Inserting snapshot records")
1419+
want := 30000
1420+
for range 10000 {
14201421
db.Exec("INSERT INTO snap_foo (id) VALUES (DEFAULT)")
14211422
db.Exec("INSERT INTO snap_bar (id) VALUES (DEFAULT)")
14221423
db.Exec("INSERT INTO snap_baz (id) VALUES (DEFAULT)")
14231424
}
14241425

1425-
template := fmt.Sprintf(`
1426+
t.Run("parralel snapshot", func(t *testing.T) {
1427+
template := fmt.Sprintf(`
14261428
mysql_cdc:
14271429
dsn: %s
14281430
stream_snapshot: true
@@ -1435,48 +1437,109 @@ mysql_cdc:
14351437
- snap_baz
14361438
`, dsn)
14371439

1438-
cacheConf := fmt.Sprintf(`
1440+
cacheConf := fmt.Sprintf(`
14391441
label: foocache
14401442
file:
14411443
directory: %s`, t.TempDir())
14421444

1443-
streamOutBuilder := service.NewStreamBuilder()
1444-
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: DEBUG`))
1445-
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
1446-
require.NoError(t, streamOutBuilder.AddInputYAML(template))
1445+
streamOutBuilder := service.NewStreamBuilder()
1446+
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`))
1447+
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
1448+
require.NoError(t, streamOutBuilder.AddInputYAML(template))
14471449

1448-
var (
1449-
outBatches []string
1450-
outBatchesMu sync.Mutex
1451-
)
1450+
var (
1451+
outBatches []string
1452+
outBatchesMu sync.Mutex
1453+
)
14521454

1453-
require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(_ context.Context, mb service.MessageBatch) error {
1454-
msgBytes, err := mb[0].AsBytes()
1455+
require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(_ context.Context, mb service.MessageBatch) error {
1456+
msgBytes, err := mb[0].AsBytes()
1457+
require.NoError(t, err)
1458+
outBatchesMu.Lock()
1459+
outBatches = append(outBatches, string(msgBytes))
1460+
outBatchesMu.Unlock()
1461+
return nil
1462+
}))
1463+
1464+
streamOut, err := streamOutBuilder.Build()
14551465
require.NoError(t, err)
1456-
outBatchesMu.Lock()
1457-
outBatches = append(outBatches, string(msgBytes))
1458-
outBatchesMu.Unlock()
1459-
return nil
1460-
}))
1466+
license.InjectTestService(streamOut.Resources())
14611467

1462-
streamOut, err := streamOutBuilder.Build()
1463-
require.NoError(t, err)
1464-
license.InjectTestService(streamOut.Resources())
1468+
go func() {
1469+
if err := streamOut.Run(t.Context()); err != nil && !errors.Is(err, context.Canceled) {
1470+
t.Error(err)
1471+
}
1472+
}()
1473+
1474+
var got int
1475+
assert.Eventually(t, func() bool {
1476+
outBatchesMu.Lock()
1477+
defer outBatchesMu.Unlock()
1478+
got = len(outBatches)
1479+
return got >= want
1480+
}, time.Minute*5, time.Second*1)
1481+
assert.Equalf(t, want, got, "Wanted %d snapshot messages but got %d", want, got)
1482+
1483+
require.NoError(t, streamOut.StopWithin(10*time.Second))
1484+
})
14651485

1466-
go func() {
1467-
if err := streamOut.Run(t.Context()); err != nil && !errors.Is(err, context.Canceled) {
1468-
t.Error(err)
1469-
}
1470-
}()
1486+
t.Run("sequential snapshot", func(t *testing.T) {
1487+
template := fmt.Sprintf(`
1488+
mysql_cdc:
1489+
dsn: %s
1490+
stream_snapshot: true
1491+
snapshot_max_batch_size: 10
1492+
max_parallel_snapshot_tables: 1
1493+
checkpoint_cache: foocache
1494+
tables:
1495+
- snap_foo
1496+
- snap_bar
1497+
- snap_baz
1498+
`, dsn)
14711499

1472-
var got int
1473-
assert.Eventually(t, func() bool {
1474-
outBatchesMu.Lock()
1475-
defer outBatchesMu.Unlock()
1476-
got = len(outBatches)
1477-
return got >= want
1478-
}, time.Minute*5, time.Second*1)
1479-
assert.Equalf(t, want, got, "Wanted %d snapshot messages but got %d", want, got)
1500+
cacheConf := fmt.Sprintf(`
1501+
label: foocache
1502+
file:
1503+
directory: %s`, t.TempDir())
14801504

1481-
require.NoError(t, streamOut.StopWithin(10*time.Second))
1505+
streamOutBuilder := service.NewStreamBuilder()
1506+
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: INFO`))
1507+
require.NoError(t, streamOutBuilder.AddCacheYAML(cacheConf))
1508+
require.NoError(t, streamOutBuilder.AddInputYAML(template))
1509+
1510+
var (
1511+
outBatches []string
1512+
outBatchesMu sync.Mutex
1513+
)
1514+
1515+
require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(_ context.Context, mb service.MessageBatch) error {
1516+
msgBytes, err := mb[0].AsBytes()
1517+
require.NoError(t, err)
1518+
outBatchesMu.Lock()
1519+
outBatches = append(outBatches, string(msgBytes))
1520+
outBatchesMu.Unlock()
1521+
return nil
1522+
}))
1523+
1524+
streamOut, err := streamOutBuilder.Build()
1525+
require.NoError(t, err)
1526+
license.InjectTestService(streamOut.Resources())
1527+
1528+
go func() {
1529+
if err := streamOut.Run(t.Context()); err != nil && !errors.Is(err, context.Canceled) {
1530+
t.Error(err)
1531+
}
1532+
}()
1533+
1534+
var got int
1535+
assert.Eventually(t, func() bool {
1536+
outBatchesMu.Lock()
1537+
defer outBatchesMu.Unlock()
1538+
got = len(outBatches)
1539+
return got >= want
1540+
}, time.Minute*5, time.Second*1)
1541+
assert.Equalf(t, want, got, "Wanted %d snapshot messages but got %d", want, got)
1542+
1543+
require.NoError(t, streamOut.StopWithin(10*time.Second))
1544+
})
14821545
}

0 commit comments

Comments
 (0)