diff --git a/internal/impl/mssqlserver/integration_test.go b/internal/impl/mssqlserver/integration_test.go index 77f72bb8b9..117003e880 100644 --- a/internal/impl/mssqlserver/integration_test.go +++ b/internal/impl/mssqlserver/integration_test.go @@ -50,8 +50,7 @@ func TestIntegration_MicrosoftSQLServerCDC_SnapshotAndStreaming(t *testing.T) { db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES") } - // wait for changes to propagate to change tables - time.Sleep(5 * time.Second) + db.WaitForCDCChanges(t.Context(), 1000, "test.foo", "dbo.foo", "dbo.bar") var ( outBatches []string @@ -108,13 +107,17 @@ microsoft_sql_server_cdc: t.Log("Verifying streaming changes...") { - // insert 3000 more for streaming changes - for range 1000 { + // insert streaming changes (reduced count to avoid CDC agent timeout under emulation) + streamingRowsPerTable := 10 + streamingWant := streamingRowsPerTable * 3 + for range streamingRowsPerTable { db.MustExec("INSERT INTO test.foo DEFAULT VALUES") db.MustExec("INSERT INTO dbo.foo DEFAULT VALUES") db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES") } + db.WaitForCDCChanges(t.Context(), 1000+streamingRowsPerTable, "test.foo", "dbo.foo", "dbo.bar") + outBatchesMu.Lock() outBatches = nil outBatchesMu.Unlock() @@ -123,10 +126,10 @@ microsoft_sql_server_cdc: defer outBatchesMu.Unlock() got := len(outBatches) - if got > want { - t.Fatalf("Wanted %d streaming changes but got %d", want, got) + if got > streamingWant { + t.Fatalf("Wanted %d streaming changes but got %d", streamingWant, got) } - return got == want + return got == streamingWant }, time.Minute*5, time.Second*1) } @@ -151,8 +154,7 @@ microsoft_sql_server_cdc: db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES") } - // wait for changes to propagate to change tables - time.Sleep(5 * time.Second) + db.WaitForCDCChanges(t.Context(), 1000, "test.foo", "dbo.foo", "dbo.bar") var ( outBatches []string @@ -210,13 +212,17 @@ microsoft_sql_server_cdc: t.Log("Verifying streaming changes...") { - // insert 3000 more for streaming changes - for range 1000 { + // insert streaming changes (reduced count to avoid CDC agent timeout under emulation) + streamingRowsPerTable := 10 + streamingWant := streamingRowsPerTable * 3 + for range streamingRowsPerTable { db.MustExec("INSERT INTO test.foo DEFAULT VALUES") db.MustExec("INSERT INTO dbo.foo DEFAULT VALUES") db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES") } + db.WaitForCDCChanges(t.Context(), 1000+streamingRowsPerTable, "test.foo", "dbo.foo", "dbo.bar") + outBatchesMu.Lock() outBatches = nil outBatchesMu.Unlock() @@ -225,10 +231,10 @@ microsoft_sql_server_cdc: defer outBatchesMu.Unlock() got := len(outBatches) - if got > want { - t.Fatalf("Wanted %d streaming changes but got %d", want, got) + if got > streamingWant { + t.Fatalf("Wanted %d streaming changes but got %d", streamingWant, got) } - return got == want + return got == streamingWant }, time.Minute*5, time.Second*1) } @@ -253,8 +259,7 @@ microsoft_sql_server_cdc: db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES") } - // wait for changes to propagate to change tables - time.Sleep(5 * time.Second) + db.WaitForCDCChanges(t.Context(), 1000, "test.foo", "dbo.foo", "dbo.bar") var ( outBatches []string @@ -317,13 +322,17 @@ file: t.Log("Verifying streaming changes...") { - // insert 3000 more for streaming changes - for range 1000 { + // insert streaming changes (reduced count to avoid CDC agent timeout under emulation) + streamingRowsPerTable := 10 + streamingWant := streamingRowsPerTable * 3 + for range streamingRowsPerTable { db.MustExec("INSERT INTO test.foo DEFAULT VALUES") db.MustExec("INSERT INTO dbo.foo DEFAULT VALUES") db.MustExec("INSERT INTO dbo.bar DEFAULT VALUES") } + db.WaitForCDCChanges(t.Context(), 1000+streamingRowsPerTable, "test.foo", "dbo.foo", "dbo.bar") + outBatchesMu.Lock() outBatches = nil outBatchesMu.Unlock() @@ -332,10 +341,10 @@ file: defer outBatchesMu.Unlock() got := len(outBatches) - if got > want { - t.Fatalf("Wanted %d streaming changes but got %d", want, got) + if got > streamingWant { + t.Fatalf("Wanted %d streaming changes but got %d", streamingWant, got) } - return got == want + return got == streamingWant }, time.Minute*5, time.Second*1) } diff --git a/internal/impl/mssqlserver/mssqlservertest/mssqlservertest.go b/internal/impl/mssqlserver/mssqlservertest/mssqlservertest.go index e2fb5030de..7951e983fa 100644 --- a/internal/impl/mssqlserver/mssqlservertest/mssqlservertest.go +++ b/internal/impl/mssqlserver/mssqlservertest/mssqlservertest.go @@ -91,6 +91,32 @@ end: db.T.Logf("Change Data Capture enabled for table %q", fullTableName) } +// WaitForCDCChanges waits until the CDC change table for each given source table +// has at least minRows entries. Under x86 emulation on Apple Silicon the CDC +// capture agent can be very slow, so tests must poll rather than sleep. +func (db *TestDB) WaitForCDCChanges(ctx context.Context, minRows int, tables ...string) { + db.T.Helper() + for _, fullTableName := range tables { + table := strings.Split(fullTableName, ".") + if len(table) != 2 { + table = []string{"dbo", table[0]} + } + query := "SELECT COUNT(*) FROM [cdc].[" + table[0] + "_" + table[1] + "_CT]" + var lastCount int + if !assert.Eventually(db.T, func() bool { + if ctx.Err() != nil { + return false + } + if err := db.QueryRowContext(ctx, query).Scan(&lastCount); err != nil { + return false + } + return lastCount >= minRows + }, 5*time.Minute, time.Second) { + db.T.Fatalf("WaitForCDCChanges(%q): expected >= %d rows, got %d", fullTableName, minRows, lastCount) + } + } +} + // MustDisableCDC disables Change Data Capture on the specified table. // The fullTableName should be in format "schema.table" (e.g., "dbo.all_data_types"). // If only a table name is provided, defaults to "dbo" schema.