Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 30 additions & 21 deletions internal/impl/mssqlserver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ func TestIntegration_MicrosoftSQLServerCDC_SnapshotAndStreaming(t *testing.T) {
db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES")
}

// wait for changes to propagate to change tables
time.Sleep(5 * time.Second)
db.WaitForCDCChanges(t.Context(), 1000, "test.foo", "dbo.foo", "dbo.bar")

var (
outBatches []string
Expand Down Expand Up @@ -108,13 +107,17 @@ microsoft_sql_server_cdc:

t.Log("Verifying streaming changes...")
{
// insert 3000 more for streaming changes
for range 1000 {
// insert streaming changes (reduced count to avoid CDC agent timeout under emulation)
streamingRowsPerTable := 10
streamingWant := streamingRowsPerTable * 3
for range streamingRowsPerTable {
db.MustExec("INSERT INTO test.foo DEFAULT VALUES")
db.MustExec("INSERT INTO dbo.foo DEFAULT VALUES")
db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES")
}

db.WaitForCDCChanges(t.Context(), 1000+streamingRowsPerTable, "test.foo", "dbo.foo", "dbo.bar")

outBatchesMu.Lock()
outBatches = nil
outBatchesMu.Unlock()
Expand All @@ -123,10 +126,10 @@ microsoft_sql_server_cdc:
defer outBatchesMu.Unlock()

got := len(outBatches)
if got > want {
t.Fatalf("Wanted %d streaming changes but got %d", want, got)
if got > streamingWant {
t.Fatalf("Wanted %d streaming changes but got %d", streamingWant, got)
}
return got == want
return got == streamingWant
}, time.Minute*5, time.Second*1)

}
Expand All @@ -151,8 +154,7 @@ microsoft_sql_server_cdc:
db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES")
}

// wait for changes to propagate to change tables
time.Sleep(5 * time.Second)
db.WaitForCDCChanges(t.Context(), 1000, "test.foo", "dbo.foo", "dbo.bar")

var (
outBatches []string
Expand Down Expand Up @@ -210,13 +212,17 @@ microsoft_sql_server_cdc:

t.Log("Verifying streaming changes...")
{
// insert 3000 more for streaming changes
for range 1000 {
// insert streaming changes (reduced count to avoid CDC agent timeout under emulation)
streamingRowsPerTable := 10
streamingWant := streamingRowsPerTable * 3
for range streamingRowsPerTable {
db.MustExec("INSERT INTO test.foo DEFAULT VALUES")
db.MustExec("INSERT INTO dbo.foo DEFAULT VALUES")
db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES")
}

db.WaitForCDCChanges(t.Context(), 1000+streamingRowsPerTable, "test.foo", "dbo.foo", "dbo.bar")

outBatchesMu.Lock()
outBatches = nil
outBatchesMu.Unlock()
Expand All @@ -225,10 +231,10 @@ microsoft_sql_server_cdc:
defer outBatchesMu.Unlock()

got := len(outBatches)
if got > want {
t.Fatalf("Wanted %d streaming changes but got %d", want, got)
if got > streamingWant {
t.Fatalf("Wanted %d streaming changes but got %d", streamingWant, got)
}
return got == want
return got == streamingWant
}, time.Minute*5, time.Second*1)

}
Expand All @@ -253,8 +259,7 @@ microsoft_sql_server_cdc:
db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES")
}

// wait for changes to propagate to change tables
time.Sleep(5 * time.Second)
db.WaitForCDCChanges(t.Context(), 1000, "test.foo", "dbo.foo", "dbo.bar")

var (
outBatches []string
Expand Down Expand Up @@ -317,13 +322,17 @@ file:

t.Log("Verifying streaming changes...")
{
// insert 3000 more for streaming changes
for range 1000 {
// insert streaming changes (reduced count to avoid CDC agent timeout under emulation)
streamingRowsPerTable := 10
streamingWant := streamingRowsPerTable * 3
for range streamingRowsPerTable {
db.MustExec("INSERT INTO test.foo DEFAULT VALUES")
db.MustExec("INSERT INTO dbo.foo DEFAULT VALUES")
db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES")
}

db.WaitForCDCChanges(t.Context(), 1000+streamingRowsPerTable, "test.foo", "dbo.foo", "dbo.bar")

outBatchesMu.Lock()
outBatches = nil
outBatchesMu.Unlock()
Expand All @@ -332,10 +341,10 @@ file:
defer outBatchesMu.Unlock()

got := len(outBatches)
if got > want {
t.Fatalf("Wanted %d streaming changes but got %d", want, got)
if got > streamingWant {
t.Fatalf("Wanted %d streaming changes but got %d", streamingWant, got)
}
return got == want
return got == streamingWant
}, time.Minute*5, time.Second*1)

}
Expand Down
26 changes: 26 additions & 0 deletions internal/impl/mssqlserver/mssqlservertest/mssqlservertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,32 @@ end:
db.T.Logf("Change Data Capture enabled for table %q", fullTableName)
}

// WaitForCDCChanges waits until the CDC change table for each given source table
// has at least minRows entries. Under x86 emulation on Apple Silicon the CDC
// capture agent can be very slow, so tests must poll rather than sleep.
func (db *TestDB) WaitForCDCChanges(ctx context.Context, minRows int, tables ...string) {
db.T.Helper()
for _, fullTableName := range tables {
table := strings.Split(fullTableName, ".")
if len(table) != 2 {
table = []string{"dbo", table[0]}
}
query := "SELECT COUNT(*) FROM [cdc].[" + table[0] + "_" + table[1] + "_CT]"
var lastCount int
if !assert.Eventually(db.T, func() bool {
if ctx.Err() != nil {
return false
}
if err := db.QueryRowContext(ctx, query).Scan(&lastCount); err != nil {
return false
}
return lastCount >= minRows
}, 5*time.Minute, time.Second) {
db.T.Fatalf("WaitForCDCChanges(%q): expected >= %d rows, got %d", fullTableName, minRows, lastCount)
}
}
}

// MustDisableCDC disables Change Data Capture on the specified table.
// The fullTableName should be in format "schema.table" (e.g., "dbo.all_data_types").
// If only a table name is provided, defaults to "dbo" schema.
Expand Down
Loading