mysql_cdc: parallelise snapshot reads across tables#4320
Closed
ankit481 wants to merge 2 commits intoredpanda-data:mainfrom
Closed
mysql_cdc: parallelise snapshot reads across tables#4320ankit481 wants to merge 2 commits intoredpanda-data:mainfrom
ankit481 wants to merge 2 commits intoredpanda-data:mainfrom
Conversation
Adds an opt-in `snapshot_max_parallel_tables` field to the `mysql_cdc` input. When left at the default (`1`) the snapshot flow is the existing single-transaction, single-goroutine path: bit-for-bit unchanged. When set above `1`, N REPEATABLE READ / CONSISTENT SNAPSHOT transactions are opened on independent connections under a single brief FLUSH TABLES ... WITH READ LOCK window. Every worker observes identical state at the same binlog position, and the configured tables are fanned out across the workers via an errgroup. This preserves the existing global consistent-snapshot invariant and the existing fail-halt failure mode, while removing the per-table serial bottleneck for pipelines with many tables. The inner per-table loop is extracted into readSnapshotTable so both paths share identical semantics. The sequential path is moved into runSequentialSnapshot (unchanged body); the parallel path lives in runParallelSnapshot and parallel_snapshot.go.
Defense-in-depth against a mis-typed config value that would otherwise try to open thousands of MySQL connections at snapshot time. 256 sits well above any realistic pipeline (the existing cap at len(tables) is the more common practical bound) and well below the range where a typo (e.g. 10000) would cause a connection storm before MySQLs own max_connections kicked in. Surfaces as a clear configuration error at Connect time rather than a runtime too-many-connections from the server.
Contributor
Author
|
Security review follow-up — independent review of the PR flagged one hardening item: Also confirmed in the review:
|
This was referenced Apr 23, 2026
Contributor
|
Closing in favour of #4363 which takes some of the good ideas of the PR and aligns it with a similar structure we have in other connectors. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds an opt-in
snapshot_max_parallel_tablesfield to themysql_cdcinput. When left at its default value (1), the snapshot flow is the existing single-transaction, single-goroutine path, bit-for-bit unchanged. When set higher, the input fans out table reads across N workers sharing a single globally-consistent MySQL snapshot.Addresses the
// TODO(cdc): Process tables in parallelmarker previously atinput_mysql_stream.go:463.Motivation
For pipelines that stream tens or hundreds of tables, the current sequential snapshot is the dominant bottleneck: a single
*sql.Txon a single connection iteratingfor _, table := range i.tables. On deployments with an IAM-token TTL (15 min on AWS RDS IAM auth, for example), a sufficiently large snapshot cannot complete within one token window no matter how smallsnapshot_max_batch_sizeis set. This change lets operators scale the snapshot phase horizontally against MySQL's natural capacity for concurrent read transactions.Design
Consistency model is preserved, not relaxed
The key observation is that MySQL allows multiple
START TRANSACTION WITH CONSISTENT SNAPSHOTtransactions to be opened on different connections while aFLUSH TABLES ... WITH READ LOCKis held. Every transaction that starts inside that lock window observes the same historical state at the same binlog position. Once the lock is released, those transactions proceed independently and concurrently.This means we keep exactly the same consistency guarantee the sequential path provides today:
snapshotCompletesignalling.Only the number of goroutines reading through that snapshot changes.
Dispatch
startMySQLSyncbranches onfieldSnapshotMaxParallelTables:<= 1(default)runSequentialSnapshotprepareSnapshot->readSnapshot->releaseSnapshot->close, in that order.> 1runParallelSnapshotprepareParallelSnapshotSet->readSnapshotParallel->set.release->set.close.Extracted shared body
The per-table body of the old
readSnapshotloop is extracted intoreadSnapshotTable(ctx, *Snapshot, table). Both paths call it with identical arguments — the extracted body is not edited, only moved. The sequentialreadSnapshotis a trivialfor-loop over this helper, preserving previous semantics.Parallel path internals (
internal/impl/mysql/parallel_snapshot.go)parallelSnapshotSetowns one*sql.DBand N*Snapshotworkers. Each worker holds its own*sql.Connand its own*sql.Tx; the set is responsible for closing all of them plus the shared db.prepareParallelSnapshotSetorchestrates the lock window:FLUSH TABLES <tables> WITH READ LOCK.BeginTx(ReadOnly, RepeatableRead)followed bySTART TRANSACTION WITH CONSISTENT SNAPSHOT.UNLOCK TABLES, return the lock conn to the pool.*sql.DB: on success the returned set closes it; on error it is closed before the function returns, and the partial state is cleaned up viaerrors.Join(...)so no resource leaks across failure paths.distributeTablesToWorkersis the pure fan-out helper: anerrgroup, one producer goroutine feeding table names into an unbuffered channel, N consumer goroutines each reading sequentially through one worker's transaction. The first worker error cancels the shared context and propagates fromWait()— matching the existing fail-halt semantics.Why not chunk within a table?
Intra-table chunking (splitting one huge table across N workers by PK range) would help a single-table-dominated workload but is a materially larger change: it requires teaching
querySnapshotTableabout range bounds, computing partition boundaries cheaply, and handling non-integer PKs. This PR deliberately stops at table-level parallelism — the common case for snapshot-bound pipelines — and leaves intra-table chunking to a follow-up change behind its own config flag.Why not table-level fault isolation (
snapshot_fail_mode)?It is a semantic change (pipeline may proceed with an incomplete snapshot) and is orthogonal to parallelism. Keeping this PR scoped to pure performance optimisation makes review and rollback trivial. A follow-up PR can add
snapshot_fail_mode: continue_and_reportfor operators who want to trade strict completeness for fault isolation across the N workers.Backwards compatibility
The default value of
snapshot_max_parallel_tablesis1. In that mode:Advanced()int field. Existing YAML is unaffected.startMySQLSyncdispatches torunSequentialSnapshot, whose body is the original flow with no behavioural changes.readSnapshotnow delegates its per-table body toreadSnapshotTable, a pure extract-method refactor.There are no new dependencies, no changes to the
snapshotCompletehandoff, no changes to the binlog position captured, and no changes to the message schema. Existing integration tests (TestIntegrationMySQLSnapshotAndCDC,TestIntegrationMySQLSnapshotConsistency,TestIntegrationMySQLCDCWithCompositePrimaryKeys,TestIntegrationMySQLCDCSchemaMetadata) all pass unchanged — they continue to exercise the sequential path exclusively.Tests added
Unit (
internal/impl/mysql/parallel_snapshot_test.go)Fan-out helper tested independently of MySQL:
CoversEveryTableExactlyOnce— parametrised over 1, 2, 3, 4, 8, 16 workers.WorkerCountCappedByTableCount— extra workers are not spawned.SingleWorkerIsSequential—workerCount=1observes at most one in-flight read.ErrorPropagatesAndCancelsSiblings— errgroup cancellation semantics.ContextCancellationPropagates— external cancellation producescontext.Canceled.ZeroWorkersRejected— explicit error for misconfiguration.EmptyTablesIsNoop— noreadFninvocations for an empty input.WorkerIdxWithinBounds— callbacks receive a valid worker index.Unit (
internal/impl/mysql/config_test.go)TestConfig_SnapshotMaxParallelTables_DefaultAndExplicit— default of1, explicit value of8round-trips through the spec.TestConfig_SnapshotMaxParallelTables_InvalidValuesRejected— non-positive values violate the constructor's validation predicate.Integration (
internal/impl/mysql/integration_test.go)TestIntegrationMySQLParallelSnapshot— spins up MySQL 8.0 via testcontainers, creates four tables of 500 rows each, runs themysql_cdcinput withsnapshot_max_parallel_tables: 4, verifies:Local test results
Log excerpt from
TestIntegrationMySQLParallelSnapshotconfirming simultaneous per-table reads:Test plan
internal/impl/mysqlwith-race -shuffle=onTestIntegrationMySQLParallelSnapshotgofmtcleanliness