Skip to content

mysql_cdc: parallelise snapshot reads across tables#4320

Closed
ankit481 wants to merge 2 commits intoredpanda-data:mainfrom
ankit481:feat/mysql-cdc-parallel-snapshot
Closed

mysql_cdc: parallelise snapshot reads across tables#4320
ankit481 wants to merge 2 commits intoredpanda-data:mainfrom
ankit481:feat/mysql-cdc-parallel-snapshot

Conversation

@ankit481
Copy link
Copy Markdown
Contributor

Summary

Adds an opt-in snapshot_max_parallel_tables field to the mysql_cdc input. When left at its default value (1), the snapshot flow is the existing single-transaction, single-goroutine path, bit-for-bit unchanged. When set higher, the input fans out table reads across N workers sharing a single globally-consistent MySQL snapshot.

Addresses the // TODO(cdc): Process tables in parallel marker previously at input_mysql_stream.go:463.

Motivation

For pipelines that stream tens or hundreds of tables, the current sequential snapshot is the dominant bottleneck: a single *sql.Tx on a single connection iterating for _, table := range i.tables. On deployments with an IAM-token TTL (15 min on AWS RDS IAM auth, for example), a sufficiently large snapshot cannot complete within one token window no matter how small snapshot_max_batch_size is set. This change lets operators scale the snapshot phase horizontally against MySQL's natural capacity for concurrent read transactions.

Design

Consistency model is preserved, not relaxed

The key observation is that MySQL allows multiple START TRANSACTION WITH CONSISTENT SNAPSHOT transactions to be opened on different connections while a FLUSH TABLES ... WITH READ LOCK is held. Every transaction that starts inside that lock window observes the same historical state at the same binlog position. Once the lock is released, those transactions proceed independently and concurrently.

This means we keep exactly the same consistency guarantee the sequential path provides today:

  • One global consistent snapshot.
  • One binlog position driving the snapshot -> binlog-stream handoff.
  • Identical snapshotComplete signalling.

Only the number of goroutines reading through that snapshot changes.

Dispatch

startMySQLSync branches on fieldSnapshotMaxParallelTables:

Value Path Behaviour
<= 1 (default) runSequentialSnapshot Byte-identical to the pre-PR code: prepareSnapshot -> readSnapshot -> releaseSnapshot -> close, in that order.
> 1 runParallelSnapshot prepareParallelSnapshotSet -> readSnapshotParallel -> set.release -> set.close.

Extracted shared body

The per-table body of the old readSnapshot loop is extracted into readSnapshotTable(ctx, *Snapshot, table). Both paths call it with identical arguments — the extracted body is not edited, only moved. The sequential readSnapshot is a trivial for-loop over this helper, preserving previous semantics.

Parallel path internals (internal/impl/mysql/parallel_snapshot.go)

  • parallelSnapshotSet owns one *sql.DB and N *Snapshot workers. Each worker holds its own *sql.Conn and its own *sql.Tx; the set is responsible for closing all of them plus the shared db.
  • prepareParallelSnapshotSet orchestrates the lock window:
    1. Open one lock connection; FLUSH TABLES <tables> WITH READ LOCK.
    2. Open N snapshot connections; on each, BeginTx(ReadOnly, RepeatableRead) followed by START TRANSACTION WITH CONSISTENT SNAPSHOT.
    3. Capture the binlog position once (all workers are at the same state).
    4. UNLOCK TABLES, return the lock conn to the pool.
  • The function takes ownership of the caller's *sql.DB: on success the returned set closes it; on error it is closed before the function returns, and the partial state is cleaned up via errors.Join(...) so no resource leaks across failure paths.
  • distributeTablesToWorkers is the pure fan-out helper: an errgroup, one producer goroutine feeding table names into an unbuffered channel, N consumer goroutines each reading sequentially through one worker's transaction. The first worker error cancels the shared context and propagates from Wait() — matching the existing fail-halt semantics.

Why not chunk within a table?

Intra-table chunking (splitting one huge table across N workers by PK range) would help a single-table-dominated workload but is a materially larger change: it requires teaching querySnapshotTable about range bounds, computing partition boundaries cheaply, and handling non-integer PKs. This PR deliberately stops at table-level parallelism — the common case for snapshot-bound pipelines — and leaves intra-table chunking to a follow-up change behind its own config flag.

Why not table-level fault isolation (snapshot_fail_mode)?

It is a semantic change (pipeline may proceed with an incomplete snapshot) and is orthogonal to parallelism. Keeping this PR scoped to pure performance optimisation makes review and rollback trivial. A follow-up PR can add snapshot_fail_mode: continue_and_report for operators who want to trade strict completeness for fault isolation across the N workers.

Backwards compatibility

The default value of snapshot_max_parallel_tables is 1. In that mode:

  • The config spec adds one Advanced() int field. Existing YAML is unaffected.
  • startMySQLSync dispatches to runSequentialSnapshot, whose body is the original flow with no behavioural changes.
  • readSnapshot now delegates its per-table body to readSnapshotTable, a pure extract-method refactor.

There are no new dependencies, no changes to the snapshotComplete handoff, no changes to the binlog position captured, and no changes to the message schema. Existing integration tests (TestIntegrationMySQLSnapshotAndCDC, TestIntegrationMySQLSnapshotConsistency, TestIntegrationMySQLCDCWithCompositePrimaryKeys, TestIntegrationMySQLCDCSchemaMetadata) all pass unchanged — they continue to exercise the sequential path exclusively.

Tests added

Unit (internal/impl/mysql/parallel_snapshot_test.go)

Fan-out helper tested independently of MySQL:

  • CoversEveryTableExactlyOnce — parametrised over 1, 2, 3, 4, 8, 16 workers.
  • WorkerCountCappedByTableCount — extra workers are not spawned.
  • SingleWorkerIsSequentialworkerCount=1 observes at most one in-flight read.
  • ErrorPropagatesAndCancelsSiblings — errgroup cancellation semantics.
  • ContextCancellationPropagates — external cancellation produces context.Canceled.
  • ZeroWorkersRejected — explicit error for misconfiguration.
  • EmptyTablesIsNoop — no readFn invocations for an empty input.
  • WorkerIdxWithinBounds — callbacks receive a valid worker index.

Unit (internal/impl/mysql/config_test.go)

  • TestConfig_SnapshotMaxParallelTables_DefaultAndExplicit — default of 1, explicit value of 8 round-trips through the spec.
  • TestConfig_SnapshotMaxParallelTables_InvalidValuesRejected — non-positive values violate the constructor's validation predicate.

Integration (internal/impl/mysql/integration_test.go)

TestIntegrationMySQLParallelSnapshot — spins up MySQL 8.0 via testcontainers, creates four tables of 500 rows each, runs the mysql_cdc input with snapshot_max_parallel_tables: 4, verifies:

  • Every snapshot row for every table is emitted exactly once.
  • Additional rows inserted after the snapshot are picked up by the binlog stream, confirming the single shared binlog position captured under the lock window is a valid starting point.

Local test results

# Unit (whole package, race detector on)
ok  internal/impl/mysql  8.995s   (-race -shuffle=on)

# Integration - new test
PASS: TestIntegrationMySQLParallelSnapshot (36.31s)

# Integration - existing snapshot path regressions
PASS: TestIntegrationMySQLSnapshotAndCDC
PASS: TestIntegrationMySQLSnapshotConsistency
PASS: TestIntegrationMySQLCDCWithCompositePrimaryKeys
PASS: TestIntegrationMySQLCDCSchemaMetadata

Log excerpt from TestIntegrationMySQLParallelSnapshot confirming simultaneous per-table reads:

Acquiring table-level read locks for parallel snapshot (4 workers): FLUSH TABLES `foo1`, `foo2`, `foo3`, `foo4` WITH READ LOCK
Querying snapshot: SELECT * FROM foo4 ORDER BY a LIMIT ?
Querying snapshot: SELECT * FROM foo3 ORDER BY a LIMIT ?
Querying snapshot: SELECT * FROM foo2 ORDER BY a LIMIT ?
Querying snapshot: SELECT * FROM foo1 ORDER BY a LIMIT ?
...
starting MySQL CDC stream from binlog mysql-bin.000003 at offset 559037

Test plan

  • Run unit tests for internal/impl/mysql with -race -shuffle=on
  • Run new integration test TestIntegrationMySQLParallelSnapshot
  • Re-run existing sequential-path integration tests for regression
  • Verify gofmt cleanliness
  • Maintainer review of consistency-model reasoning
  • CI green across the full integration matrix

Adds an opt-in `snapshot_max_parallel_tables` field to the `mysql_cdc`
input. When left at the default (`1`) the snapshot flow is the existing
single-transaction, single-goroutine path: bit-for-bit unchanged.

When set above `1`, N REPEATABLE READ / CONSISTENT SNAPSHOT transactions
are opened on independent connections under a single brief FLUSH
TABLES ... WITH READ LOCK window. Every worker observes identical state
at the same binlog position, and the configured tables are fanned out
across the workers via an errgroup. This preserves the existing global
consistent-snapshot invariant and the existing fail-halt failure mode,
while removing the per-table serial bottleneck for pipelines with many
tables.

The inner per-table loop is extracted into readSnapshotTable so both
paths share identical semantics. The sequential path is moved into
runSequentialSnapshot (unchanged body); the parallel path lives in
runParallelSnapshot and parallel_snapshot.go.
Defense-in-depth against a mis-typed config value that would otherwise
try to open thousands of MySQL connections at snapshot time. 256 sits
well above any realistic pipeline (the existing cap at len(tables) is
the more common practical bound) and well below the range where a typo
(e.g. 10000) would cause a connection storm before MySQLs own
max_connections kicked in.

Surfaces as a clear configuration error at Connect time rather than a
runtime too-many-connections from the server.
@ankit481
Copy link
Copy Markdown
Contributor Author

Security review follow-up — independent review of the PR flagged one hardening item: snapshot_max_parallel_tables previously had no upper bound, so a typo (10000) would fan out to a connection storm before MySQL's own max_connections rejected it. Added a [1, 256] range check (commit 6b1a4faa7). The cap surfaces as a clear Connect-time configuration error rather than a runtime "too many connections" from the server. 256 sits well above any realistic pipeline while defending against obvious mis-configuration.

Also confirmed in the review:

  • No new SQL-injection surface (all new SQL is static or reuses existing validateTableName-gated identifier quoting).
  • Every error path in prepareParallelSnapshotSet releases lock conn, worker conns, worker txns, and the shared *sql.DB — no orphaned FLUSH TABLES WITH READ LOCK possible.
  • All *mysqlStreamInput fields now read from N goroutines are either immutable after construction (i.tables, i.mysqlConfig), mutex-guarded (i.tableSchemas via tableSchemasMu; i.canal via go-mysql's internal tableLock), or channel-mediated (i.rawMessageEvents).
  • distributeTablesToWorkers is cancellation-safe: producer selects on gctx.Done(), consumers range over the channel, errgroup cancels the shared context on first error.
  • No credential/DSN exposure in new log lines — only worker count and the already-sanitized lock query.
  • No new third-party dependencies (go.mod / go.sum diff is empty; golang.org/x/sync was already present).

@josephwoodward
Copy link
Copy Markdown
Contributor

Closing in favour of #4363 which takes some of the good ideas of the PR and aligns it with a similar structure we have in other connectors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants